Skip to main content

post_cortex_daemon/daemon/
rmcp_server.rs

1// Copyright (c) 2025 Julius ML
2// MIT License
3
4//! RMCP-based SSE Server for Post-Cortex daemon
5//!
6//! Provides SSE transport using rmcp library following official shuttle patterns.
7
8use crate::daemon::{DaemonConfig, PostCortexService};
9use axum::{
10    Json, Router,
11    extract::{Path, State},
12    http::StatusCode,
13    routing::{delete, get, post},
14};
15use post_cortex_memory::ConversationMemorySystem;
16use rmcp::transport::streamable_http_server::{
17    StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
18};
19use serde::{Deserialize, Serialize};
20use std::net::SocketAddr;
21use std::sync::Arc;
22use tokio_util::sync::CancellationToken;
23use tracing::{error, info};
24use uuid::Uuid;
25
26/// Start RMCP-based SSE server
27pub async fn start_rmcp_daemon(config: DaemonConfig) -> Result<(), String> {
28    let addr: SocketAddr = format!("{}:{}", config.host, config.port)
29        .parse()
30        .map_err(|e| format!("Invalid address: {}", e))?;
31
32    info!("Initializing Post-Cortex daemon with RMCP SSE transport");
33    info!("  Host: {}", config.host);
34    info!("  Port: {}", config.port);
35    info!("  Data Directory: {}", config.data_directory);
36
37    // Create memory system with embeddings enabled
38    let mut system_config = post_cortex_memory::SystemConfig {
39        data_directory: config.data_directory.clone(),
40        ..Default::default()
41    };
42
43    #[cfg(feature = "embeddings")]
44    {
45        // Inherit `embeddings_model_type` + `vector_dimension` from
46        // `SystemConfig::default()` (PotionMultilingual / 256-dim as of
47        // 0.3.0) — hard-coding `"MultilingualMiniLM"` here used to
48        // silently downgrade the embedding model regardless of the
49        // workspace default and pinned the HNSW index to 384-dim,
50        // mismatching the runtime vectoriser.
51        system_config.enable_embeddings = true;
52        system_config.auto_vectorize_on_update = true;
53        system_config.cross_session_search_enabled = true;
54        info!(
55            "Embeddings enabled in daemon config (model={}, dim={})",
56            system_config.embeddings_model_type, system_config.vector_dimension,
57        );
58    }
59
60    // Configure storage backend if surrealdb-storage feature is enabled
61    #[cfg(feature = "surrealdb-storage")]
62    {
63        use post_cortex_storage::traits::StorageBackendType;
64
65        system_config.storage_backend = match config.storage_backend.as_str() {
66            "surrealdb" => StorageBackendType::SurrealDB,
67            _ => StorageBackendType::RocksDB,
68        };
69        system_config.surrealdb_endpoint = config.surrealdb_endpoint.clone();
70        system_config.surrealdb_username = config.surrealdb_username.clone();
71        system_config.surrealdb_password = config.surrealdb_password.clone();
72        system_config.surrealdb_namespace = Some(config.surrealdb_namespace.clone());
73        system_config.surrealdb_database = Some(config.surrealdb_database.clone());
74
75        if system_config.storage_backend == StorageBackendType::SurrealDB {
76            info!(
77                "Using SurrealDB storage backend: {} (ns: {}, db: {})",
78                system_config
79                    .surrealdb_endpoint
80                    .as_deref()
81                    .unwrap_or("not configured"),
82                config.surrealdb_namespace,
83                config.surrealdb_database
84            );
85        } else {
86            info!("Using RocksDB storage backend");
87        }
88    }
89
90    let memory_system = Arc::new(
91        ConversationMemorySystem::new(system_config)
92            .await
93            .map_err(|e| format!("Failed to initialize memory system: {}", e))?,
94    );
95
96    info!("Memory system initialized successfully");
97
98    // Inject memory system into MCP tools so they use shared instance
99    post_cortex_mcp::inject_memory_system(memory_system.clone());
100    info!("Memory system injected into MCP tools");
101
102    // Clear query cache to prevent stale vector IDs from previous runs
103    if let Err(e) = memory_system.clear_query_cache().await {
104        error!("Failed to clear query cache on startup: {}", e);
105    } else {
106        info!("Query cache cleared successfully on daemon startup");
107    }
108
109    // Create cancellation token for graceful shutdown
110    let ct = CancellationToken::new();
111
112    // Create MCP service using streamable HTTP transport
113    let mcp_service = StreamableHttpService::new(
114        {
115            let memory_system = memory_system.clone();
116            move || Ok(PostCortexService::new(memory_system.clone()))
117        },
118        LocalSessionManager::default().into(),
119        StreamableHttpServerConfig {
120            cancellation_token: ct.child_token(),
121            ..Default::default()
122        },
123    );
124
125    info!("MCP endpoint configured: http://{}/mcp", addr);
126
127    // Create API state
128    let api_state = Arc::new(ApiState {
129        memory_system: memory_system.clone(),
130    });
131
132    // Create API router with its own state
133    let api_router = Router::new()
134        .route("/health", get(api_health))
135        .route(
136            "/api/sessions",
137            get(api_list_sessions).post(api_create_session),
138        )
139        .route("/api/sessions/{id}", delete(api_delete_session))
140        .route(
141            "/api/workspaces",
142            get(api_list_workspaces).post(api_create_workspace),
143        )
144        .route("/api/workspaces/{id}", delete(api_delete_workspace))
145        .route(
146            "/api/workspaces/{workspace_id}/sessions/{session_id}",
147            post(api_attach_session),
148        )
149        .with_state(api_state);
150
151    // Create router with MCP service and API routes
152    let router = Router::new()
153        .nest_service("/mcp", mcp_service)
154        .merge(api_router);
155
156    // Create TCP listener
157    let listener = tokio::net::TcpListener::bind(addr)
158        .await
159        .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
160
161    info!("TCP listener bound to {}", addr);
162
163    // Setup graceful shutdown
164    let shutdown_ct = ct.clone();
165    let server = axum::serve(listener, router).with_graceful_shutdown(async move {
166        shutdown_ct.cancelled().await;
167        info!("HTTP server shutting down gracefully");
168    });
169
170    // Start HTTP server in background
171    tokio::spawn(async move {
172        if let Err(e) = server.await {
173            error!("HTTP server error: {}", e);
174        }
175    });
176
177    // Start gRPC server alongside HTTP if configured
178    if config.grpc_port > 0 {
179        let grpc_addr: SocketAddr = format!("{}:{}", config.host, config.grpc_port)
180            .parse()
181            .map_err(|e| format!("Invalid gRPC address: {}", e))?;
182        let grpc_memory = memory_system.clone();
183        tokio::spawn(async move {
184            if let Err(e) =
185                crate::daemon::grpc_service::start_grpc_server(grpc_memory, grpc_addr).await
186            {
187                error!("gRPC server error: {}", e);
188            }
189        });
190        info!("gRPC endpoint: {}:{}", config.host, config.grpc_port);
191    }
192
193    info!("Post-Cortex MCP service registered with SSE server");
194    info!("Daemon is ready to accept connections");
195    info!("Press Ctrl+C to shutdown");
196
197    // Wait for shutdown signal
198    tokio::select! {
199        _ = tokio::signal::ctrl_c() => {
200            info!("Received Ctrl+C, initiating shutdown");
201        }
202        _ = ct.cancelled() => {
203            info!("Service cancelled");
204        }
205    }
206
207    ct.cancel();
208    info!("Post-Cortex daemon stopped");
209    Ok(())
210}
211
212// ============================================================================
213// REST API for CLI
214// ============================================================================
215
216/// Shared state for API handlers
217struct ApiState {
218    memory_system: Arc<ConversationMemorySystem>,
219}
220
221#[derive(Serialize)]
222struct SessionInfo {
223    id: String,
224    name: String,
225    workspace: Option<String>,
226}
227
228#[derive(Serialize)]
229struct WorkspaceInfo {
230    id: String,
231    name: String,
232    description: String,
233    session_count: usize,
234}
235
236#[derive(Deserialize)]
237struct CreateSessionRequest {
238    name: Option<String>,
239    description: Option<String>,
240}
241
242#[derive(Deserialize)]
243struct CreateWorkspaceRequest {
244    name: String,
245    description: Option<String>,
246}
247
248#[derive(Deserialize)]
249struct AttachSessionRequest {
250    role: Option<String>,
251}
252
253async fn api_health() -> Json<serde_json::Value> {
254    Json(serde_json::json!({
255        "status": "ok",
256        "service": "post-cortex"
257    }))
258}
259
260async fn api_list_sessions(
261    State(state): State<Arc<ApiState>>,
262) -> Result<Json<Vec<SessionInfo>>, (StatusCode, String)> {
263    let ids = state
264        .memory_system
265        .list_sessions()
266        .await
267        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
268
269    let workspaces = state.memory_system.workspace_manager.list_workspaces();
270    let mut session_workspace_map = std::collections::HashMap::new();
271    for ws in workspaces {
272        for (session_id, _role) in ws.get_all_sessions() {
273            session_workspace_map.insert(session_id, ws.name.clone());
274        }
275    }
276
277    let mut sessions = Vec::new();
278    for id in ids {
279        let name = match state.memory_system.get_session(id).await {
280            Ok(session_arc) => {
281                let session = session_arc.load();
282                session.name().unwrap_or_else(|| "Unnamed".to_string())
283            }
284            Err(_) => "Error loading".to_string(),
285        };
286
287        sessions.push(SessionInfo {
288            id: id.to_string(),
289            name,
290            workspace: session_workspace_map.get(&id).cloned(),
291        });
292    }
293
294    Ok(Json(sessions))
295}
296
297async fn api_create_session(
298    State(state): State<Arc<ApiState>>,
299    Json(req): Json<CreateSessionRequest>,
300) -> Result<Json<SessionInfo>, (StatusCode, String)> {
301    let id = state
302        .memory_system
303        .create_session(req.name.clone(), req.description)
304        .await
305        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
306
307    Ok(Json(SessionInfo {
308        id: id.to_string(),
309        name: req.name.unwrap_or_else(|| "Unnamed".to_string()),
310        workspace: None,
311    }))
312}
313
314async fn api_delete_session(
315    State(state): State<Arc<ApiState>>,
316    Path(id): Path<String>,
317) -> Result<StatusCode, (StatusCode, String)> {
318    let uuid = Uuid::parse_str(&id)
319        .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid UUID: {}", e)))?;
320
321    state
322        .memory_system
323        .get_storage()
324        .delete_session(uuid)
325        .await
326        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
327
328    Ok(StatusCode::NO_CONTENT)
329}
330
331async fn api_list_workspaces(
332    State(state): State<Arc<ApiState>>,
333) -> Result<Json<Vec<WorkspaceInfo>>, (StatusCode, String)> {
334    let workspaces = state
335        .memory_system
336        .get_storage()
337        .list_all_workspaces()
338        .await
339        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
340
341    // Get list of existing sessions to filter out deleted ones
342    let existing_sessions: std::collections::HashSet<_> = state
343        .memory_system
344        .list_sessions()
345        .await
346        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?
347        .into_iter()
348        .collect();
349
350    let result: Vec<WorkspaceInfo> = workspaces
351        .into_iter()
352        .map(|ws| {
353            // Count only sessions that still exist
354            let actual_count = ws
355                .sessions
356                .iter()
357                .filter(|(id, _)| existing_sessions.contains(id))
358                .count();
359            WorkspaceInfo {
360                id: ws.id.to_string(),
361                name: ws.name,
362                description: ws.description,
363                session_count: actual_count,
364            }
365        })
366        .collect();
367
368    Ok(Json(result))
369}
370
371async fn api_create_workspace(
372    State(state): State<Arc<ApiState>>,
373    Json(req): Json<CreateWorkspaceRequest>,
374) -> Result<Json<WorkspaceInfo>, (StatusCode, String)> {
375    let id = Uuid::new_v4();
376    let description = req.description.unwrap_or_default();
377
378    state
379        .memory_system
380        .get_storage()
381        .save_workspace_metadata(id, &req.name, &description, &[])
382        .await
383        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
384
385    // Update in-memory workspace manager
386    state.memory_system.workspace_manager.restore_workspace(
387        id,
388        req.name.clone(),
389        description.clone(),
390        vec![],
391    );
392
393    Ok(Json(WorkspaceInfo {
394        id: id.to_string(),
395        name: req.name,
396        description,
397        session_count: 0,
398    }))
399}
400
401async fn api_delete_workspace(
402    State(state): State<Arc<ApiState>>,
403    Path(id): Path<String>,
404) -> Result<StatusCode, (StatusCode, String)> {
405    let uuid = Uuid::parse_str(&id)
406        .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid UUID: {}", e)))?;
407
408    state
409        .memory_system
410        .get_storage()
411        .delete_workspace(uuid)
412        .await
413        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
414
415    // Update in-memory workspace manager
416    state
417        .memory_system
418        .workspace_manager
419        .delete_workspace(&uuid);
420
421    Ok(StatusCode::NO_CONTENT)
422}
423
424async fn api_attach_session(
425    State(state): State<Arc<ApiState>>,
426    Path((workspace_id, session_id)): Path<(String, String)>,
427    Json(req): Json<AttachSessionRequest>,
428) -> Result<StatusCode, (StatusCode, String)> {
429    let ws_id = Uuid::parse_str(&workspace_id).map_err(|e| {
430        (
431            StatusCode::BAD_REQUEST,
432            format!("Invalid workspace UUID: {}", e),
433        )
434    })?;
435    let sess_id = Uuid::parse_str(&session_id).map_err(|e| {
436        (
437            StatusCode::BAD_REQUEST,
438            format!("Invalid session UUID: {}", e),
439        )
440    })?;
441
442    let role = match req.role.as_deref().unwrap_or("related") {
443        "primary" => post_cortex_core::workspace::SessionRole::Primary,
444        "related" => post_cortex_core::workspace::SessionRole::Related,
445        "dependency" => post_cortex_core::workspace::SessionRole::Dependency,
446        "shared" => post_cortex_core::workspace::SessionRole::Shared,
447        other => return Err((StatusCode::BAD_REQUEST, format!("Invalid role: {}", other))),
448    };
449
450    state
451        .memory_system
452        .get_storage()
453        .add_session_to_workspace(ws_id, sess_id, role)
454        .await
455        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
456
457    // Update in-memory workspace manager to keep it in sync
458    let _ = state
459        .memory_system
460        .workspace_manager
461        .add_session_to_workspace(&ws_id, sess_id, role);
462
463    Ok(StatusCode::OK)
464}