Skip to main content

post_cortex_daemon/daemon/
rmcp_server.rs

1// Copyright (c) 2025 Julius ML
2// MIT License
3
4//! RMCP-based SSE Server for Post-Cortex daemon
5//!
6//! Provides SSE transport using rmcp library following official shuttle patterns.
7
8use post_cortex_memory::ConversationMemorySystem;
9use crate::daemon::{DaemonConfig, PostCortexService};
10use axum::{
11    Json, Router,
12    extract::{Path, State},
13    http::StatusCode,
14    routing::{delete, get, post},
15};
16use rmcp::transport::streamable_http_server::{
17    StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
18};
19use serde::{Deserialize, Serialize};
20use std::net::SocketAddr;
21use std::sync::Arc;
22use tokio_util::sync::CancellationToken;
23use tracing::{error, info};
24use uuid::Uuid;
25
26/// Start RMCP-based SSE server
27pub async fn start_rmcp_daemon(config: DaemonConfig) -> Result<(), String> {
28    let addr: SocketAddr = format!("{}:{}", config.host, config.port)
29        .parse()
30        .map_err(|e| format!("Invalid address: {}", e))?;
31
32    info!("Initializing Post-Cortex daemon with RMCP SSE transport");
33    info!("  Host: {}", config.host);
34    info!("  Port: {}", config.port);
35    info!("  Data Directory: {}", config.data_directory);
36
37    // Create memory system with embeddings enabled
38    let mut system_config = post_cortex_memory::SystemConfig {
39        data_directory: config.data_directory.clone(),
40        ..Default::default()
41    };
42
43    #[cfg(feature = "embeddings")]
44    {
45        system_config.enable_embeddings = true;
46        system_config.embeddings_model_type = "MultilingualMiniLM".to_string();
47        system_config.auto_vectorize_on_update = true;
48        system_config.cross_session_search_enabled = true;
49        info!("Embeddings enabled in daemon config");
50    }
51
52    // Configure storage backend if surrealdb-storage feature is enabled
53    #[cfg(feature = "surrealdb-storage")]
54    {
55        use post_cortex_storage::traits::StorageBackendType;
56
57        system_config.storage_backend = match config.storage_backend.as_str() {
58            "surrealdb" => StorageBackendType::SurrealDB,
59            _ => StorageBackendType::RocksDB,
60        };
61        system_config.surrealdb_endpoint = config.surrealdb_endpoint.clone();
62        system_config.surrealdb_username = config.surrealdb_username.clone();
63        system_config.surrealdb_password = config.surrealdb_password.clone();
64        system_config.surrealdb_namespace = Some(config.surrealdb_namespace.clone());
65        system_config.surrealdb_database = Some(config.surrealdb_database.clone());
66
67        if system_config.storage_backend == StorageBackendType::SurrealDB {
68            info!(
69                "Using SurrealDB storage backend: {} (ns: {}, db: {})",
70                system_config
71                    .surrealdb_endpoint
72                    .as_deref()
73                    .unwrap_or("not configured"),
74                config.surrealdb_namespace,
75                config.surrealdb_database
76            );
77        } else {
78            info!("Using RocksDB storage backend");
79        }
80    }
81
82    let memory_system = Arc::new(
83        ConversationMemorySystem::new(system_config)
84            .await
85            .map_err(|e| format!("Failed to initialize memory system: {}", e))?,
86    );
87
88    info!("Memory system initialized successfully");
89
90    // Inject memory system into MCP tools so they use shared instance
91    post_cortex_mcp::inject_memory_system(memory_system.clone());
92    info!("Memory system injected into MCP tools");
93
94    // Clear query cache to prevent stale vector IDs from previous runs
95    if let Err(e) = memory_system.clear_query_cache().await {
96        error!("Failed to clear query cache on startup: {}", e);
97    } else {
98        info!("Query cache cleared successfully on daemon startup");
99    }
100
101    // Create cancellation token for graceful shutdown
102    let ct = CancellationToken::new();
103
104    // Create MCP service using streamable HTTP transport
105    let mcp_service = StreamableHttpService::new(
106        {
107            let memory_system = memory_system.clone();
108            move || Ok(PostCortexService::new(memory_system.clone()))
109        },
110        LocalSessionManager::default().into(),
111        StreamableHttpServerConfig {
112            cancellation_token: ct.child_token(),
113            ..Default::default()
114        },
115    );
116
117    info!("MCP endpoint configured: http://{}/mcp", addr);
118
119    // Create API state
120    let api_state = Arc::new(ApiState {
121        memory_system: memory_system.clone(),
122    });
123
124    // Create API router with its own state
125    let api_router = Router::new()
126        .route("/health", get(api_health))
127        .route(
128            "/api/sessions",
129            get(api_list_sessions).post(api_create_session),
130        )
131        .route("/api/sessions/{id}", delete(api_delete_session))
132        .route(
133            "/api/workspaces",
134            get(api_list_workspaces).post(api_create_workspace),
135        )
136        .route("/api/workspaces/{id}", delete(api_delete_workspace))
137        .route(
138            "/api/workspaces/{workspace_id}/sessions/{session_id}",
139            post(api_attach_session),
140        )
141        .with_state(api_state);
142
143    // Create router with MCP service and API routes
144    let router = Router::new()
145        .nest_service("/mcp", mcp_service)
146        .merge(api_router);
147
148    // Create TCP listener
149    let listener = tokio::net::TcpListener::bind(addr)
150        .await
151        .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
152
153    info!("TCP listener bound to {}", addr);
154
155    // Setup graceful shutdown
156    let shutdown_ct = ct.clone();
157    let server = axum::serve(listener, router).with_graceful_shutdown(async move {
158        shutdown_ct.cancelled().await;
159        info!("HTTP server shutting down gracefully");
160    });
161
162    // Start HTTP server in background
163    tokio::spawn(async move {
164        if let Err(e) = server.await {
165            error!("HTTP server error: {}", e);
166        }
167    });
168
169    // Start gRPC server alongside HTTP if configured
170    if config.grpc_port > 0 {
171        let grpc_addr: SocketAddr = format!("{}:{}", config.host, config.grpc_port)
172            .parse()
173            .map_err(|e| format!("Invalid gRPC address: {}", e))?;
174        let grpc_memory = memory_system.clone();
175        tokio::spawn(async move {
176            if let Err(e) = crate::daemon::grpc_service::start_grpc_server(grpc_memory, grpc_addr).await {
177                error!("gRPC server error: {}", e);
178            }
179        });
180        info!("gRPC endpoint: {}:{}", config.host, config.grpc_port);
181    }
182
183    info!("Post-Cortex MCP service registered with SSE server");
184    info!("Daemon is ready to accept connections");
185    info!("Press Ctrl+C to shutdown");
186
187    // Wait for shutdown signal
188    tokio::select! {
189        _ = tokio::signal::ctrl_c() => {
190            info!("Received Ctrl+C, initiating shutdown");
191        }
192        _ = ct.cancelled() => {
193            info!("Service cancelled");
194        }
195    }
196
197    ct.cancel();
198    info!("Post-Cortex daemon stopped");
199    Ok(())
200}
201
202// ============================================================================
203// REST API for CLI
204// ============================================================================
205
206/// Shared state for API handlers
207struct ApiState {
208    memory_system: Arc<ConversationMemorySystem>,
209}
210
211#[derive(Serialize)]
212struct SessionInfo {
213    id: String,
214    name: String,
215    workspace: Option<String>,
216}
217
218#[derive(Serialize)]
219struct WorkspaceInfo {
220    id: String,
221    name: String,
222    description: String,
223    session_count: usize,
224}
225
226#[derive(Deserialize)]
227struct CreateSessionRequest {
228    name: Option<String>,
229    description: Option<String>,
230}
231
232#[derive(Deserialize)]
233struct CreateWorkspaceRequest {
234    name: String,
235    description: Option<String>,
236}
237
238#[derive(Deserialize)]
239struct AttachSessionRequest {
240    role: Option<String>,
241}
242
243async fn api_health() -> Json<serde_json::Value> {
244    Json(serde_json::json!({
245        "status": "ok",
246        "service": "post-cortex"
247    }))
248}
249
250async fn api_list_sessions(
251    State(state): State<Arc<ApiState>>,
252) -> Result<Json<Vec<SessionInfo>>, (StatusCode, String)> {
253    let ids = state
254        .memory_system
255        .list_sessions()
256        .await
257        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
258
259    let workspaces = state.memory_system.workspace_manager.list_workspaces();
260    let mut session_workspace_map = std::collections::HashMap::new();
261    for ws in workspaces {
262        for (session_id, _role) in ws.get_all_sessions() {
263            session_workspace_map.insert(session_id, ws.name.clone());
264        }
265    }
266
267    let mut sessions = Vec::new();
268    for id in ids {
269        let name = match state.memory_system.get_session(id).await {
270            Ok(session_arc) => {
271                let session = session_arc.load();
272                session.name().unwrap_or_else(|| "Unnamed".to_string())
273            }
274            Err(_) => "Error loading".to_string(),
275        };
276
277        sessions.push(SessionInfo {
278            id: id.to_string(),
279            name,
280            workspace: session_workspace_map.get(&id).cloned(),
281        });
282    }
283
284    Ok(Json(sessions))
285}
286
287async fn api_create_session(
288    State(state): State<Arc<ApiState>>,
289    Json(req): Json<CreateSessionRequest>,
290) -> Result<Json<SessionInfo>, (StatusCode, String)> {
291    let id = state
292        .memory_system
293        .create_session(req.name.clone(), req.description)
294        .await
295        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
296
297    Ok(Json(SessionInfo {
298        id: id.to_string(),
299        name: req.name.unwrap_or_else(|| "Unnamed".to_string()),
300        workspace: None,
301    }))
302}
303
304async fn api_delete_session(
305    State(state): State<Arc<ApiState>>,
306    Path(id): Path<String>,
307) -> Result<StatusCode, (StatusCode, String)> {
308    let uuid = Uuid::parse_str(&id)
309        .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid UUID: {}", e)))?;
310
311    state
312        .memory_system
313        .get_storage()
314        .delete_session(uuid)
315        .await
316        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
317
318    Ok(StatusCode::NO_CONTENT)
319}
320
321async fn api_list_workspaces(
322    State(state): State<Arc<ApiState>>,
323) -> Result<Json<Vec<WorkspaceInfo>>, (StatusCode, String)> {
324    let workspaces = state
325        .memory_system
326        .get_storage()
327        .list_all_workspaces()
328        .await
329        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
330
331    // Get list of existing sessions to filter out deleted ones
332    let existing_sessions: std::collections::HashSet<_> = state
333        .memory_system
334        .list_sessions()
335        .await
336        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?
337        .into_iter()
338        .collect();
339
340    let result: Vec<WorkspaceInfo> = workspaces
341        .into_iter()
342        .map(|ws| {
343            // Count only sessions that still exist
344            let actual_count = ws
345                .sessions
346                .iter()
347                .filter(|(id, _)| existing_sessions.contains(id))
348                .count();
349            WorkspaceInfo {
350                id: ws.id.to_string(),
351                name: ws.name,
352                description: ws.description,
353                session_count: actual_count,
354            }
355        })
356        .collect();
357
358    Ok(Json(result))
359}
360
361async fn api_create_workspace(
362    State(state): State<Arc<ApiState>>,
363    Json(req): Json<CreateWorkspaceRequest>,
364) -> Result<Json<WorkspaceInfo>, (StatusCode, String)> {
365    let id = Uuid::new_v4();
366    let description = req.description.unwrap_or_default();
367
368    state
369        .memory_system
370        .get_storage()
371        .save_workspace_metadata(id, &req.name, &description, &[])
372        .await
373        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
374
375    // Update in-memory workspace manager
376    state.memory_system.workspace_manager.restore_workspace(
377        id,
378        req.name.clone(),
379        description.clone(),
380        vec![],
381    );
382
383    Ok(Json(WorkspaceInfo {
384        id: id.to_string(),
385        name: req.name,
386        description,
387        session_count: 0,
388    }))
389}
390
391async fn api_delete_workspace(
392    State(state): State<Arc<ApiState>>,
393    Path(id): Path<String>,
394) -> Result<StatusCode, (StatusCode, String)> {
395    let uuid = Uuid::parse_str(&id)
396        .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid UUID: {}", e)))?;
397
398    state
399        .memory_system
400        .get_storage()
401        .delete_workspace(uuid)
402        .await
403        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
404
405    // Update in-memory workspace manager
406    state
407        .memory_system
408        .workspace_manager
409        .delete_workspace(&uuid);
410
411    Ok(StatusCode::NO_CONTENT)
412}
413
414async fn api_attach_session(
415    State(state): State<Arc<ApiState>>,
416    Path((workspace_id, session_id)): Path<(String, String)>,
417    Json(req): Json<AttachSessionRequest>,
418) -> Result<StatusCode, (StatusCode, String)> {
419    let ws_id = Uuid::parse_str(&workspace_id).map_err(|e| {
420        (
421            StatusCode::BAD_REQUEST,
422            format!("Invalid workspace UUID: {}", e),
423        )
424    })?;
425    let sess_id = Uuid::parse_str(&session_id).map_err(|e| {
426        (
427            StatusCode::BAD_REQUEST,
428            format!("Invalid session UUID: {}", e),
429        )
430    })?;
431
432    let role = match req.role.as_deref().unwrap_or("related") {
433        "primary" => post_cortex_core::workspace::SessionRole::Primary,
434        "related" => post_cortex_core::workspace::SessionRole::Related,
435        "dependency" => post_cortex_core::workspace::SessionRole::Dependency,
436        "shared" => post_cortex_core::workspace::SessionRole::Shared,
437        other => return Err((StatusCode::BAD_REQUEST, format!("Invalid role: {}", other))),
438    };
439
440    state
441        .memory_system
442        .get_storage()
443        .add_session_to_workspace(ws_id, sess_id, role)
444        .await
445        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
446
447    // Update in-memory workspace manager to keep it in sync
448    let _ = state
449        .memory_system
450        .workspace_manager
451        .add_session_to_workspace(&ws_id, sess_id, role);
452
453    Ok(StatusCode::OK)
454}