post_cortex_daemon/daemon/
rmcp_server.rs1use post_cortex_memory::ConversationMemorySystem;
9use crate::daemon::{DaemonConfig, PostCortexService};
10use axum::{
11 Json, Router,
12 extract::{Path, State},
13 http::StatusCode,
14 routing::{delete, get, post},
15};
16use rmcp::transport::streamable_http_server::{
17 StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
18};
19use serde::{Deserialize, Serialize};
20use std::net::SocketAddr;
21use std::sync::Arc;
22use tokio_util::sync::CancellationToken;
23use tracing::{error, info};
24use uuid::Uuid;
25
26pub async fn start_rmcp_daemon(config: DaemonConfig) -> Result<(), String> {
28 let addr: SocketAddr = format!("{}:{}", config.host, config.port)
29 .parse()
30 .map_err(|e| format!("Invalid address: {}", e))?;
31
32 info!("Initializing Post-Cortex daemon with RMCP SSE transport");
33 info!(" Host: {}", config.host);
34 info!(" Port: {}", config.port);
35 info!(" Data Directory: {}", config.data_directory);
36
37 let mut system_config = post_cortex_memory::SystemConfig {
39 data_directory: config.data_directory.clone(),
40 ..Default::default()
41 };
42
43 #[cfg(feature = "embeddings")]
44 {
45 system_config.enable_embeddings = true;
46 system_config.embeddings_model_type = "MultilingualMiniLM".to_string();
47 system_config.auto_vectorize_on_update = true;
48 system_config.cross_session_search_enabled = true;
49 info!("Embeddings enabled in daemon config");
50 }
51
52 #[cfg(feature = "surrealdb-storage")]
54 {
55 use post_cortex_storage::traits::StorageBackendType;
56
57 system_config.storage_backend = match config.storage_backend.as_str() {
58 "surrealdb" => StorageBackendType::SurrealDB,
59 _ => StorageBackendType::RocksDB,
60 };
61 system_config.surrealdb_endpoint = config.surrealdb_endpoint.clone();
62 system_config.surrealdb_username = config.surrealdb_username.clone();
63 system_config.surrealdb_password = config.surrealdb_password.clone();
64 system_config.surrealdb_namespace = Some(config.surrealdb_namespace.clone());
65 system_config.surrealdb_database = Some(config.surrealdb_database.clone());
66
67 if system_config.storage_backend == StorageBackendType::SurrealDB {
68 info!(
69 "Using SurrealDB storage backend: {} (ns: {}, db: {})",
70 system_config
71 .surrealdb_endpoint
72 .as_deref()
73 .unwrap_or("not configured"),
74 config.surrealdb_namespace,
75 config.surrealdb_database
76 );
77 } else {
78 info!("Using RocksDB storage backend");
79 }
80 }
81
82 let memory_system = Arc::new(
83 ConversationMemorySystem::new(system_config)
84 .await
85 .map_err(|e| format!("Failed to initialize memory system: {}", e))?,
86 );
87
88 info!("Memory system initialized successfully");
89
90 post_cortex_mcp::inject_memory_system(memory_system.clone());
92 info!("Memory system injected into MCP tools");
93
94 if let Err(e) = memory_system.clear_query_cache().await {
96 error!("Failed to clear query cache on startup: {}", e);
97 } else {
98 info!("Query cache cleared successfully on daemon startup");
99 }
100
101 let ct = CancellationToken::new();
103
104 let mcp_service = StreamableHttpService::new(
106 {
107 let memory_system = memory_system.clone();
108 move || Ok(PostCortexService::new(memory_system.clone()))
109 },
110 LocalSessionManager::default().into(),
111 StreamableHttpServerConfig {
112 cancellation_token: ct.child_token(),
113 ..Default::default()
114 },
115 );
116
117 info!("MCP endpoint configured: http://{}/mcp", addr);
118
119 let api_state = Arc::new(ApiState {
121 memory_system: memory_system.clone(),
122 });
123
124 let api_router = Router::new()
126 .route("/health", get(api_health))
127 .route(
128 "/api/sessions",
129 get(api_list_sessions).post(api_create_session),
130 )
131 .route("/api/sessions/{id}", delete(api_delete_session))
132 .route(
133 "/api/workspaces",
134 get(api_list_workspaces).post(api_create_workspace),
135 )
136 .route("/api/workspaces/{id}", delete(api_delete_workspace))
137 .route(
138 "/api/workspaces/{workspace_id}/sessions/{session_id}",
139 post(api_attach_session),
140 )
141 .with_state(api_state);
142
143 let router = Router::new()
145 .nest_service("/mcp", mcp_service)
146 .merge(api_router);
147
148 let listener = tokio::net::TcpListener::bind(addr)
150 .await
151 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
152
153 info!("TCP listener bound to {}", addr);
154
155 let shutdown_ct = ct.clone();
157 let server = axum::serve(listener, router).with_graceful_shutdown(async move {
158 shutdown_ct.cancelled().await;
159 info!("HTTP server shutting down gracefully");
160 });
161
162 tokio::spawn(async move {
164 if let Err(e) = server.await {
165 error!("HTTP server error: {}", e);
166 }
167 });
168
169 if config.grpc_port > 0 {
171 let grpc_addr: SocketAddr = format!("{}:{}", config.host, config.grpc_port)
172 .parse()
173 .map_err(|e| format!("Invalid gRPC address: {}", e))?;
174 let grpc_memory = memory_system.clone();
175 tokio::spawn(async move {
176 if let Err(e) = crate::daemon::grpc_service::start_grpc_server(grpc_memory, grpc_addr).await {
177 error!("gRPC server error: {}", e);
178 }
179 });
180 info!("gRPC endpoint: {}:{}", config.host, config.grpc_port);
181 }
182
183 info!("Post-Cortex MCP service registered with SSE server");
184 info!("Daemon is ready to accept connections");
185 info!("Press Ctrl+C to shutdown");
186
187 tokio::select! {
189 _ = tokio::signal::ctrl_c() => {
190 info!("Received Ctrl+C, initiating shutdown");
191 }
192 _ = ct.cancelled() => {
193 info!("Service cancelled");
194 }
195 }
196
197 ct.cancel();
198 info!("Post-Cortex daemon stopped");
199 Ok(())
200}
201
202struct ApiState {
208 memory_system: Arc<ConversationMemorySystem>,
209}
210
211#[derive(Serialize)]
212struct SessionInfo {
213 id: String,
214 name: String,
215 workspace: Option<String>,
216}
217
218#[derive(Serialize)]
219struct WorkspaceInfo {
220 id: String,
221 name: String,
222 description: String,
223 session_count: usize,
224}
225
226#[derive(Deserialize)]
227struct CreateSessionRequest {
228 name: Option<String>,
229 description: Option<String>,
230}
231
232#[derive(Deserialize)]
233struct CreateWorkspaceRequest {
234 name: String,
235 description: Option<String>,
236}
237
238#[derive(Deserialize)]
239struct AttachSessionRequest {
240 role: Option<String>,
241}
242
243async fn api_health() -> Json<serde_json::Value> {
244 Json(serde_json::json!({
245 "status": "ok",
246 "service": "post-cortex"
247 }))
248}
249
250async fn api_list_sessions(
251 State(state): State<Arc<ApiState>>,
252) -> Result<Json<Vec<SessionInfo>>, (StatusCode, String)> {
253 let ids = state
254 .memory_system
255 .list_sessions()
256 .await
257 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
258
259 let workspaces = state.memory_system.workspace_manager.list_workspaces();
260 let mut session_workspace_map = std::collections::HashMap::new();
261 for ws in workspaces {
262 for (session_id, _role) in ws.get_all_sessions() {
263 session_workspace_map.insert(session_id, ws.name.clone());
264 }
265 }
266
267 let mut sessions = Vec::new();
268 for id in ids {
269 let name = match state.memory_system.get_session(id).await {
270 Ok(session_arc) => {
271 let session = session_arc.load();
272 session.name().unwrap_or_else(|| "Unnamed".to_string())
273 }
274 Err(_) => "Error loading".to_string(),
275 };
276
277 sessions.push(SessionInfo {
278 id: id.to_string(),
279 name,
280 workspace: session_workspace_map.get(&id).cloned(),
281 });
282 }
283
284 Ok(Json(sessions))
285}
286
287async fn api_create_session(
288 State(state): State<Arc<ApiState>>,
289 Json(req): Json<CreateSessionRequest>,
290) -> Result<Json<SessionInfo>, (StatusCode, String)> {
291 let id = state
292 .memory_system
293 .create_session(req.name.clone(), req.description)
294 .await
295 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
296
297 Ok(Json(SessionInfo {
298 id: id.to_string(),
299 name: req.name.unwrap_or_else(|| "Unnamed".to_string()),
300 workspace: None,
301 }))
302}
303
304async fn api_delete_session(
305 State(state): State<Arc<ApiState>>,
306 Path(id): Path<String>,
307) -> Result<StatusCode, (StatusCode, String)> {
308 let uuid = Uuid::parse_str(&id)
309 .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid UUID: {}", e)))?;
310
311 state
312 .memory_system
313 .get_storage()
314 .delete_session(uuid)
315 .await
316 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
317
318 Ok(StatusCode::NO_CONTENT)
319}
320
321async fn api_list_workspaces(
322 State(state): State<Arc<ApiState>>,
323) -> Result<Json<Vec<WorkspaceInfo>>, (StatusCode, String)> {
324 let workspaces = state
325 .memory_system
326 .get_storage()
327 .list_all_workspaces()
328 .await
329 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
330
331 let existing_sessions: std::collections::HashSet<_> = state
333 .memory_system
334 .list_sessions()
335 .await
336 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?
337 .into_iter()
338 .collect();
339
340 let result: Vec<WorkspaceInfo> = workspaces
341 .into_iter()
342 .map(|ws| {
343 let actual_count = ws
345 .sessions
346 .iter()
347 .filter(|(id, _)| existing_sessions.contains(id))
348 .count();
349 WorkspaceInfo {
350 id: ws.id.to_string(),
351 name: ws.name,
352 description: ws.description,
353 session_count: actual_count,
354 }
355 })
356 .collect();
357
358 Ok(Json(result))
359}
360
361async fn api_create_workspace(
362 State(state): State<Arc<ApiState>>,
363 Json(req): Json<CreateWorkspaceRequest>,
364) -> Result<Json<WorkspaceInfo>, (StatusCode, String)> {
365 let id = Uuid::new_v4();
366 let description = req.description.unwrap_or_default();
367
368 state
369 .memory_system
370 .get_storage()
371 .save_workspace_metadata(id, &req.name, &description, &[])
372 .await
373 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
374
375 state.memory_system.workspace_manager.restore_workspace(
377 id,
378 req.name.clone(),
379 description.clone(),
380 vec![],
381 );
382
383 Ok(Json(WorkspaceInfo {
384 id: id.to_string(),
385 name: req.name,
386 description,
387 session_count: 0,
388 }))
389}
390
391async fn api_delete_workspace(
392 State(state): State<Arc<ApiState>>,
393 Path(id): Path<String>,
394) -> Result<StatusCode, (StatusCode, String)> {
395 let uuid = Uuid::parse_str(&id)
396 .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid UUID: {}", e)))?;
397
398 state
399 .memory_system
400 .get_storage()
401 .delete_workspace(uuid)
402 .await
403 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
404
405 state
407 .memory_system
408 .workspace_manager
409 .delete_workspace(&uuid);
410
411 Ok(StatusCode::NO_CONTENT)
412}
413
414async fn api_attach_session(
415 State(state): State<Arc<ApiState>>,
416 Path((workspace_id, session_id)): Path<(String, String)>,
417 Json(req): Json<AttachSessionRequest>,
418) -> Result<StatusCode, (StatusCode, String)> {
419 let ws_id = Uuid::parse_str(&workspace_id).map_err(|e| {
420 (
421 StatusCode::BAD_REQUEST,
422 format!("Invalid workspace UUID: {}", e),
423 )
424 })?;
425 let sess_id = Uuid::parse_str(&session_id).map_err(|e| {
426 (
427 StatusCode::BAD_REQUEST,
428 format!("Invalid session UUID: {}", e),
429 )
430 })?;
431
432 let role = match req.role.as_deref().unwrap_or("related") {
433 "primary" => post_cortex_core::workspace::SessionRole::Primary,
434 "related" => post_cortex_core::workspace::SessionRole::Related,
435 "dependency" => post_cortex_core::workspace::SessionRole::Dependency,
436 "shared" => post_cortex_core::workspace::SessionRole::Shared,
437 other => return Err((StatusCode::BAD_REQUEST, format!("Invalid role: {}", other))),
438 };
439
440 state
441 .memory_system
442 .get_storage()
443 .add_session_to_workspace(ws_id, sess_id, role)
444 .await
445 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
446
447 let _ = state
449 .memory_system
450 .workspace_manager
451 .add_session_to_workspace(&ws_id, sess_id, role);
452
453 Ok(StatusCode::OK)
454}