1use crate::config::Config;
22use crate::mcp_server::progress::McpProgressSink;
23use crate::mcp_server::registry::{
24 SkippedEntry, build_default_tools, build_tools_with_config, build_tools_with_runtime,
25};
26use crate::mcp_server::runtime::RuntimeHandles;
27use crate::mcp_server::session::{ProgressEvent, SessionStore, SharedSessionStore};
28use crate::tools::Tool;
29use crate::tools::mcp_protocol::{
30 INTERNAL_ERROR, INVALID_PARAMS, INVALID_REQUEST, JSONRPC_VERSION, JsonRpcError,
31 MCP_PROTOCOL_VERSION, METHOD_NOT_FOUND,
32};
33use axum::{
34 Json, Router,
35 extract::{Path, State},
36 http::{HeaderMap, StatusCode, header},
37 response::{
38 IntoResponse, Response,
39 sse::{Event, KeepAlive, Sse},
40 },
41 routing::{get, post},
42};
43use chrono::{DateTime, Utc};
44use serde::{Deserialize, Serialize};
45use serde_json::{Value, json};
46use std::collections::HashMap;
47use std::convert::Infallible;
48use std::net::SocketAddr;
49use std::path::PathBuf;
50use std::sync::Arc;
51use std::sync::OnceLock;
52use std::time::Instant;
53use tokio::net::TcpListener;
54use tokio::sync::broadcast;
55use tokio::sync::mpsc;
56use tokio_stream::StreamExt;
57use tokio_stream::wrappers::BroadcastStream;
58use tokio_stream::wrappers::UnboundedReceiverStream;
59
60#[derive(Clone)]
63pub struct AppState {
64 pub sessions: SharedSessionStore,
65 pub tools: Arc<HashMap<String, Arc<dyn Tool>>>,
66}
67
68struct StartTime {
76 instant: Instant,
77 wall: DateTime<Utc>,
78}
79
80static START_TIME: OnceLock<StartTime> = OnceLock::new();
81
82fn start_time() -> &'static StartTime {
83 START_TIME.get_or_init(|| StartTime {
84 instant: Instant::now(),
85 wall: Utc::now(),
86 })
87}
88
89#[derive(Debug, Serialize)]
90struct HealthResponse {
91 status: &'static str,
92 pid: u32,
93 uptime_seconds: u64,
94 started_at: String,
95 protocol_version: &'static str,
96}
97
98fn build_health_response() -> HealthResponse {
99 let st = start_time();
100 HealthResponse {
101 status: "ok",
102 pid: std::process::id(),
103 uptime_seconds: st.instant.elapsed().as_secs(),
104 started_at: st.wall.to_rfc3339(),
105 protocol_version: MCP_PROTOCOL_VERSION,
106 }
107}
108
109pub struct McpServerHandle {
114 pub addr: SocketAddr,
115 pub shutdown: tokio::sync::oneshot::Sender<()>,
116 pub joined: tokio::task::JoinHandle<()>,
117}
118
119#[must_use]
121pub fn build_router(state: AppState) -> Router {
122 Router::new()
123 .route("/health", get(health_handler))
124 .route("/session", post(create_session_handler))
125 .route("/session/{session_id}/events", get(session_events_handler))
126 .route("/mcp", post(mcp_handler))
127 .with_state(state)
128}
129
130#[must_use]
135pub fn default_state(workspace_dir: &std::path::Path) -> (AppState, Vec<SkippedEntry>) {
136 let (tools, skipped) = build_default_tools(workspace_dir);
137 (build_app_state(tools), skipped)
138}
139
140#[must_use]
144pub fn state_from_config(
145 workspace_dir: &std::path::Path,
146 config: &Config,
147) -> (AppState, Vec<SkippedEntry>) {
148 let (tools, skipped) = build_tools_with_config(workspace_dir, config);
149 (build_app_state(tools), skipped)
150}
151
152#[must_use]
156pub fn state_from_runtime(
157 workspace_dir: &std::path::Path,
158 config: &Config,
159 runtime: &RuntimeHandles,
160) -> (AppState, Vec<SkippedEntry>) {
161 let (tools, skipped) = build_tools_with_runtime(workspace_dir, config, runtime);
162 (build_app_state(tools), skipped)
163}
164
165fn build_app_state(tools: Vec<Arc<dyn Tool>>) -> AppState {
166 let map: HashMap<String, Arc<dyn Tool>> = tools
167 .into_iter()
168 .map(|t| (t.name().to_string(), t))
169 .collect();
170 AppState {
171 sessions: Arc::new(SessionStore::new()),
172 tools: Arc::new(map),
173 }
174}
175
176pub fn state_with_tools(tools: Vec<Arc<dyn Tool>>) -> AppState {
178 let map: HashMap<String, Arc<dyn Tool>> = tools
179 .into_iter()
180 .map(|t| (t.name().to_string(), t))
181 .collect();
182 AppState {
183 sessions: Arc::new(SessionStore::new()),
184 tools: Arc::new(map),
185 }
186}
187
188pub async fn serve_on(addr: SocketAddr, state: AppState) -> anyhow::Result<McpServerHandle> {
191 let listener = TcpListener::bind(addr).await?;
192 let bound = listener.local_addr()?;
193 let router = build_router(state);
194 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
195
196 let joined = tokio::spawn(async move {
197 let res = axum::serve(listener, router)
198 .with_graceful_shutdown(async move {
199 let _ = rx.await;
200 })
201 .await;
202 if let Err(e) = res {
203 tracing::error!("construct-mcp server exited: {e}");
204 }
205 });
206
207 Ok(McpServerHandle {
208 addr: bound,
209 shutdown: tx,
210 joined,
211 })
212}
213
214pub async fn run_daemon(workspace_dir: PathBuf) -> anyhow::Result<()> {
225 let _ = start_time();
226
227 let (state, skipped) = match Box::pin(Config::load_or_init()).await {
228 Ok(config) => {
229 tracing::info!(
230 "mcp-server: loaded Construct config from {}",
231 config.config_path.display()
232 );
233 state_from_config(&workspace_dir, &config)
234 }
235 Err(err) => {
236 tracing::warn!(
237 "mcp-server: failed to load Construct config ({err}) — continuing with baseline registry"
238 );
239 default_state(&workspace_dir)
240 }
241 };
242
243 for (name, reason) in &skipped {
244 tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
245 }
246 tracing::info!("mcp-server: advertising {} tools", state.tools.len());
247
248 let handle = serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), state).await?;
249 let url = format!("http://{}/mcp", handle.addr);
250 write_discovery_file(&url)?;
251 tracing::info!("mcp-server: listening on {url}");
252
253 let _ = tokio::signal::ctrl_c().await;
254 let _ = handle.shutdown.send(());
255 let _ = handle.joined.await;
256 cleanup_discovery_file();
257 Ok(())
258}
259
260#[must_use]
262pub fn discovery_path() -> Option<PathBuf> {
263 directories::UserDirs::new().map(|u| u.home_dir().join(".construct").join("mcp.json"))
264}
265
266pub fn write_discovery_file(url: &str) -> anyhow::Result<()> {
270 let Some(path) = discovery_path() else {
271 anyhow::bail!("could not resolve home directory");
272 };
273 if let Some(parent) = path.parent() {
274 std::fs::create_dir_all(parent)?;
275 }
276 let payload = json!({
277 "url": url,
278 "pid": std::process::id(),
279 "started_at": Utc::now().to_rfc3339(),
280 });
281 let bytes = serde_json::to_vec_pretty(&payload)?;
282
283 let parent = path
287 .parent()
288 .ok_or_else(|| anyhow::anyhow!("discovery path has no parent"))?;
289 let tmp_name = format!(
290 ".mcp.json.{}.{}",
291 std::process::id(),
292 Utc::now().timestamp_nanos_opt().unwrap_or(0)
293 );
294 let tmp_path = parent.join(tmp_name);
295 std::fs::write(&tmp_path, &bytes)?;
296 std::fs::rename(&tmp_path, &path)?;
297 Ok(())
298}
299
300pub fn cleanup_discovery_file() {
303 if let Some(path) = discovery_path() {
304 let _ = std::fs::remove_file(path);
305 }
306}
307
308async fn health_handler() -> Response {
311 (StatusCode::OK, Json(build_health_response())).into_response()
312}
313
314#[derive(Debug, Deserialize, Default)]
315struct CreateSessionBody {
316 cwd: Option<String>,
317 label: Option<String>,
318}
319
320#[derive(Debug, Serialize)]
321struct CreateSessionResponse {
322 session_id: String,
323 token: String,
324 cwd: String,
325}
326
327async fn create_session_handler(
328 State(state): State<AppState>,
329 body: Option<Json<CreateSessionBody>>,
330) -> Response {
331 let body = body.map(|Json(b)| b).unwrap_or_default();
332 let cwd = resolve_cwd(body.cwd.as_deref());
333 let sess = state.sessions.create(cwd.clone(), body.label).await;
334 let resp = CreateSessionResponse {
335 session_id: sess.id,
336 token: sess.token,
337 cwd: cwd.to_string_lossy().into_owned(),
338 };
339 (StatusCode::OK, Json(resp)).into_response()
340}
341
342fn resolve_cwd(supplied: Option<&str>) -> PathBuf {
343 if let Some(s) = supplied {
344 let p = PathBuf::from(shellexpand::tilde(s).into_owned());
345 if let Ok(canon) = p.canonicalize() {
346 return canon;
347 }
348 return p;
349 }
350 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
351}
352
353async fn session_events_handler(
364 State(state): State<AppState>,
365 Path(session_id): Path<String>,
366 headers: HeaderMap,
367) -> Response {
368 let Some(token) = headers
371 .get(header::AUTHORIZATION)
372 .and_then(|v| v.to_str().ok())
373 .and_then(|s| s.strip_prefix("Bearer "))
374 .map(str::trim)
375 else {
376 return (
377 StatusCode::UNAUTHORIZED,
378 "missing Authorization: Bearer <token>",
379 )
380 .into_response();
381 };
382
383 let Some(_session) = state.sessions.authenticate(&session_id, token).await else {
384 return (StatusCode::UNAUTHORIZED, "invalid session or token").into_response();
385 };
386
387 let Some(sender) = state.sessions.event_sender(&session_id).await else {
390 return (StatusCode::NOT_FOUND, "session vanished").into_response();
391 };
392
393 let rx = sender.subscribe();
394 let stream = BroadcastStream::new(rx).filter_map(|msg| match msg {
395 Ok(ev) => Some(Ok::<_, Infallible>(
396 Event::default().data(serde_json::to_string(&ev).unwrap_or_else(|_| "{}".into())),
397 )),
398 Err(_) => None,
401 });
402
403 Sse::new(stream)
404 .keep_alive(KeepAlive::default())
405 .into_response()
406}
407
408fn auth_or_401(headers: &HeaderMap) -> Result<(String, String), Response> {
409 let bearer = headers
410 .get(header::AUTHORIZATION)
411 .and_then(|v| v.to_str().ok())
412 .and_then(|s| s.strip_prefix("Bearer "))
413 .map(str::trim);
414 let session_id = headers
415 .get("x-construct-session")
416 .and_then(|v| v.to_str().ok())
417 .map(str::trim);
418 match (bearer, session_id) {
419 (Some(t), Some(s)) if !t.is_empty() && !s.is_empty() => Ok((s.to_string(), t.to_string())),
420 _ => Err((
421 StatusCode::UNAUTHORIZED,
422 "missing Authorization: Bearer <token> or X-Construct-Session header",
423 )
424 .into_response()),
425 }
426}
427
428async fn mcp_handler(
429 State(state): State<AppState>,
430 headers: HeaderMap,
431 Json(req): Json<Value>,
432) -> Response {
433 let (session_id, token) = match auth_or_401(&headers) {
434 Ok(pair) => pair,
435 Err(resp) => return resp,
436 };
437 let Some(session) = state.sessions.authenticate(&session_id, &token).await else {
438 return (StatusCode::UNAUTHORIZED, "invalid session or token").into_response();
439 };
440
441 let id = req.get("id").cloned();
444 let method = req
445 .get("method")
446 .and_then(Value::as_str)
447 .unwrap_or("")
448 .to_string();
449 let params = req.get("params").cloned().unwrap_or(Value::Null);
450
451 match method.as_str() {
452 "initialize" => plain_ok(id, initialize_result()),
453 "tools/list" => plain_ok(id, tools_list_result(&state)),
454 "tools/call" => stream_tool_call(state, session.events.clone(), id, params),
455 "notifications/initialized" | "notifications/cancelled" => {
456 StatusCode::ACCEPTED.into_response()
458 }
459 "" => plain_err(id, INVALID_REQUEST, "missing method"),
460 other => plain_err(id, METHOD_NOT_FOUND, &format!("unknown method: {other}")),
461 }
462}
463
464fn initialize_result() -> Value {
465 json!({
466 "protocolVersion": MCP_PROTOCOL_VERSION,
467 "capabilities": {
468 "tools": { "listChanged": false }
469 },
470 "serverInfo": {
471 "name": "construct-mcp",
472 "version": env!("CARGO_PKG_VERSION"),
473 }
474 })
475}
476
477fn tools_list_result(state: &AppState) -> Value {
478 let mut tools: Vec<Value> = state
479 .tools
480 .values()
481 .map(|t| {
482 json!({
483 "name": t.name(),
484 "description": t.description(),
485 "inputSchema": t.parameters_schema(),
486 })
487 })
488 .collect();
489 tools.sort_by(|a, b| a["name"].as_str().cmp(&b["name"].as_str()));
490 json!({ "tools": tools })
491}
492
493fn plain_ok(id: Option<Value>, result: Value) -> Response {
494 let body = json!({
495 "jsonrpc": JSONRPC_VERSION,
496 "id": id.unwrap_or(Value::Null),
497 "result": result,
498 });
499 (StatusCode::OK, Json(body)).into_response()
500}
501
502fn plain_err(id: Option<Value>, code: i32, msg: &str) -> Response {
503 let err = JsonRpcError {
504 code,
505 message: msg.to_string(),
506 data: None,
507 };
508 let body = json!({
509 "jsonrpc": JSONRPC_VERSION,
510 "id": id.unwrap_or(Value::Null),
511 "error": err,
512 });
513 (StatusCode::OK, Json(body)).into_response()
514}
515
516fn stream_tool_call(
517 state: AppState,
518 session_events: broadcast::Sender<ProgressEvent>,
519 id: Option<Value>,
520 params: Value,
521) -> Response {
522 let name = params
523 .get("name")
524 .and_then(Value::as_str)
525 .unwrap_or("")
526 .to_string();
527 let args = params.get("arguments").cloned().unwrap_or(Value::Null);
528 let meta_token = params
529 .get("_meta")
530 .and_then(|m| m.get("progressToken"))
531 .and_then(Value::as_u64);
532
533 let Some(tool) = state.tools.get(&name).cloned() else {
534 return plain_err(id, INVALID_PARAMS, &format!("unknown tool: {name}"));
535 };
536
537 let (tx, rx) = mpsc::unbounded_channel::<Value>();
538 let sink = Arc::new(McpProgressSink::with_session(
539 tx.clone(),
540 meta_token,
541 session_events,
542 name.clone(),
543 ));
544
545 let tx_final = tx.clone();
548 let id_for_task = id.clone();
549 tokio::spawn(async move {
550 let result = tool.execute_with_progress(args, sink.as_ref()).await;
551 let final_msg = match result {
552 Ok(tool_result) => {
553 let content = tool_result_to_content(&tool_result);
554 let payload = json!({
555 "content": content,
556 "isError": !tool_result.success,
557 });
558 json!({
559 "jsonrpc": JSONRPC_VERSION,
560 "id": id_for_task.unwrap_or(Value::Null),
561 "result": payload,
562 })
563 }
564 Err(err) => {
565 json!({
566 "jsonrpc": JSONRPC_VERSION,
567 "id": id_for_task.unwrap_or(Value::Null),
568 "error": {
569 "code": INTERNAL_ERROR,
570 "message": err.to_string(),
571 }
572 })
573 }
574 };
575 let _ = tx_final.send(final_msg);
576 });
578
579 let event_stream = UnboundedReceiverStream::new(rx)
580 .map(|msg| Ok::<_, Infallible>(Event::default().data(msg.to_string())));
581
582 Sse::new(event_stream)
583 .keep_alive(KeepAlive::default())
584 .into_response()
585}
586
587fn tool_result_to_content(result: &crate::tools::traits::ToolResult) -> Value {
588 let text = if result.success {
589 result.output.clone()
590 } else {
591 result
592 .error
593 .clone()
594 .unwrap_or_else(|| result.output.clone())
595 };
596 json!([{ "type": "text", "text": text }])
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602
603 #[test]
604 fn health_response_shape() {
605 let h = build_health_response();
606 let v = serde_json::to_value(&h).unwrap();
609 assert_eq!(v["status"], "ok");
610 assert!(v["pid"].as_u64().is_some());
611 assert!(v["uptime_seconds"].as_u64().is_some());
612 assert!(
613 v["started_at"].as_str().is_some_and(|s| !s.is_empty()),
614 "started_at should be a non-empty rfc3339 string"
615 );
616 assert_eq!(v["protocol_version"], MCP_PROTOCOL_VERSION);
617 }
618
619 #[tokio::test]
620 async fn health_handler_returns_200_json() {
621 let resp = health_handler().await;
622 assert_eq!(resp.status(), StatusCode::OK);
623 let ct = resp
625 .headers()
626 .get(header::CONTENT_TYPE)
627 .and_then(|v| v.to_str().ok())
628 .unwrap_or("");
629 assert!(
630 ct.contains("application/json"),
631 "expected JSON content-type, got: {ct}"
632 );
633 }
634
635 fn fresh_app_state() -> AppState {
638 let tmp = std::env::temp_dir();
639 let (state, _) = default_state(&tmp);
640 state
641 }
642
643 #[tokio::test]
644 async fn session_events_rejects_missing_auth() {
645 let state = fresh_app_state();
646 let sess = state.sessions.create(std::env::temp_dir(), None).await;
647 let headers = HeaderMap::new();
648 let resp = session_events_handler(State(state), Path(sess.id.clone()), headers).await;
649 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
650 }
651
652 #[tokio::test]
653 async fn session_events_rejects_wrong_token() {
654 let state = fresh_app_state();
655 let sess = state.sessions.create(std::env::temp_dir(), None).await;
656 let mut headers = HeaderMap::new();
657 headers.insert(
658 header::AUTHORIZATION,
659 "Bearer not-the-token".parse().unwrap(),
660 );
661 let resp = session_events_handler(State(state), Path(sess.id.clone()), headers).await;
662 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
663 }
664
665 #[tokio::test]
666 async fn session_events_accepts_correct_token() {
667 let state = fresh_app_state();
668 let sess = state.sessions.create(std::env::temp_dir(), None).await;
669 let mut headers = HeaderMap::new();
670 headers.insert(
671 header::AUTHORIZATION,
672 format!("Bearer {}", sess.token).parse().unwrap(),
673 );
674 let resp = session_events_handler(State(state), Path(sess.id.clone()), headers).await;
675 assert_eq!(resp.status(), StatusCode::OK);
676 let ct = resp
678 .headers()
679 .get(header::CONTENT_TYPE)
680 .and_then(|v| v.to_str().ok())
681 .unwrap_or("");
682 assert!(
683 ct.contains("text/event-stream"),
684 "expected SSE content-type, got: {ct}"
685 );
686 }
687
688 #[tokio::test]
689 async fn session_broadcast_round_trip_through_store() {
690 let state = fresh_app_state();
691 let sess = state.sessions.create(std::env::temp_dir(), None).await;
692
693 let sender = state
695 .sessions
696 .event_sender(&sess.id)
697 .await
698 .expect("session present");
699 let mut rx = sender.subscribe();
700
701 let ev = ProgressEvent::new(
703 5,
704 2,
705 Some(4),
706 Some("half way".into()),
707 Some("notion".into()),
708 );
709 sender.send(ev).expect("send ok");
710
711 let got = rx.recv().await.expect("recv ok");
712 assert_eq!(got.progress, 2);
713 assert_eq!(got.tool.as_deref(), Some("notion"));
714 }
715}