post_cortex_daemon/daemon/
rmcp_server.rs1use crate::daemon::{DaemonConfig, PostCortexService};
9use axum::{
10 Json, Router,
11 extract::{Path, State},
12 http::StatusCode,
13 routing::{delete, get, post},
14};
15use post_cortex_memory::ConversationMemorySystem;
16use rmcp::transport::streamable_http_server::{
17 StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
18};
19use serde::{Deserialize, Serialize};
20use std::net::SocketAddr;
21use std::sync::Arc;
22use tokio_util::sync::CancellationToken;
23use tracing::{error, info};
24use uuid::Uuid;
25
26pub async fn start_rmcp_daemon(config: DaemonConfig) -> Result<(), String> {
28 let addr: SocketAddr = format!("{}:{}", config.host, config.port)
29 .parse()
30 .map_err(|e| format!("Invalid address: {}", e))?;
31
32 info!("Initializing Post-Cortex daemon with RMCP SSE transport");
33 info!(" Host: {}", config.host);
34 info!(" Port: {}", config.port);
35 info!(" Data Directory: {}", config.data_directory);
36
37 let mut system_config = post_cortex_memory::SystemConfig {
39 data_directory: config.data_directory.clone(),
40 ..Default::default()
41 };
42
43 #[cfg(feature = "embeddings")]
44 {
45 system_config.enable_embeddings = true;
52 system_config.auto_vectorize_on_update = true;
53 system_config.cross_session_search_enabled = true;
54 info!(
55 "Embeddings enabled in daemon config (model={}, dim={})",
56 system_config.embeddings_model_type, system_config.vector_dimension,
57 );
58 }
59
60 #[cfg(feature = "surrealdb-storage")]
62 {
63 use post_cortex_storage::traits::StorageBackendType;
64
65 system_config.storage_backend = match config.storage_backend.as_str() {
66 "surrealdb" => StorageBackendType::SurrealDB,
67 _ => StorageBackendType::RocksDB,
68 };
69 system_config.surrealdb_endpoint = config.surrealdb_endpoint.clone();
70 system_config.surrealdb_username = config.surrealdb_username.clone();
71 system_config.surrealdb_password = config.surrealdb_password.clone();
72 system_config.surrealdb_namespace = Some(config.surrealdb_namespace.clone());
73 system_config.surrealdb_database = Some(config.surrealdb_database.clone());
74
75 if system_config.storage_backend == StorageBackendType::SurrealDB {
76 info!(
77 "Using SurrealDB storage backend: {} (ns: {}, db: {})",
78 system_config
79 .surrealdb_endpoint
80 .as_deref()
81 .unwrap_or("not configured"),
82 config.surrealdb_namespace,
83 config.surrealdb_database
84 );
85 } else {
86 info!("Using RocksDB storage backend");
87 }
88 }
89
90 let memory_system = Arc::new(
91 ConversationMemorySystem::new(system_config)
92 .await
93 .map_err(|e| format!("Failed to initialize memory system: {}", e))?,
94 );
95
96 info!("Memory system initialized successfully");
97
98 post_cortex_mcp::inject_memory_system(memory_system.clone());
100 info!("Memory system injected into MCP tools");
101
102 if let Err(e) = memory_system.clear_query_cache().await {
104 error!("Failed to clear query cache on startup: {}", e);
105 } else {
106 info!("Query cache cleared successfully on daemon startup");
107 }
108
109 let ct = CancellationToken::new();
111
112 let mcp_service = StreamableHttpService::new(
114 {
115 let memory_system = memory_system.clone();
116 move || Ok(PostCortexService::new(memory_system.clone()))
117 },
118 LocalSessionManager::default().into(),
119 StreamableHttpServerConfig {
120 cancellation_token: ct.child_token(),
121 ..Default::default()
122 },
123 );
124
125 info!("MCP endpoint configured: http://{}/mcp", addr);
126
127 let api_state = Arc::new(ApiState {
129 memory_system: memory_system.clone(),
130 });
131
132 let api_router = Router::new()
134 .route("/health", get(api_health))
135 .route(
136 "/api/sessions",
137 get(api_list_sessions).post(api_create_session),
138 )
139 .route("/api/sessions/{id}", delete(api_delete_session))
140 .route(
141 "/api/workspaces",
142 get(api_list_workspaces).post(api_create_workspace),
143 )
144 .route("/api/workspaces/{id}", delete(api_delete_workspace))
145 .route(
146 "/api/workspaces/{workspace_id}/sessions/{session_id}",
147 post(api_attach_session),
148 )
149 .with_state(api_state);
150
151 let router = Router::new()
153 .nest_service("/mcp", mcp_service)
154 .merge(api_router);
155
156 let listener = tokio::net::TcpListener::bind(addr)
158 .await
159 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
160
161 info!("TCP listener bound to {}", addr);
162
163 let shutdown_ct = ct.clone();
165 let server = axum::serve(listener, router).with_graceful_shutdown(async move {
166 shutdown_ct.cancelled().await;
167 info!("HTTP server shutting down gracefully");
168 });
169
170 tokio::spawn(async move {
172 if let Err(e) = server.await {
173 error!("HTTP server error: {}", e);
174 }
175 });
176
177 if config.grpc_port > 0 {
179 let grpc_addr: SocketAddr = format!("{}:{}", config.host, config.grpc_port)
180 .parse()
181 .map_err(|e| format!("Invalid gRPC address: {}", e))?;
182 let grpc_memory = memory_system.clone();
183 tokio::spawn(async move {
184 if let Err(e) =
185 crate::daemon::grpc_service::start_grpc_server(grpc_memory, grpc_addr).await
186 {
187 error!("gRPC server error: {}", e);
188 }
189 });
190 info!("gRPC endpoint: {}:{}", config.host, config.grpc_port);
191 }
192
193 info!("Post-Cortex MCP service registered with SSE server");
194 info!("Daemon is ready to accept connections");
195 info!("Press Ctrl+C to shutdown");
196
197 tokio::select! {
199 _ = tokio::signal::ctrl_c() => {
200 info!("Received Ctrl+C, initiating shutdown");
201 }
202 _ = ct.cancelled() => {
203 info!("Service cancelled");
204 }
205 }
206
207 ct.cancel();
208 info!("Post-Cortex daemon stopped");
209 Ok(())
210}
211
212struct ApiState {
218 memory_system: Arc<ConversationMemorySystem>,
219}
220
221#[derive(Serialize)]
222struct SessionInfo {
223 id: String,
224 name: String,
225 workspace: Option<String>,
226}
227
228#[derive(Serialize)]
229struct WorkspaceInfo {
230 id: String,
231 name: String,
232 description: String,
233 session_count: usize,
234}
235
236#[derive(Deserialize)]
237struct CreateSessionRequest {
238 name: Option<String>,
239 description: Option<String>,
240}
241
242#[derive(Deserialize)]
243struct CreateWorkspaceRequest {
244 name: String,
245 description: Option<String>,
246}
247
248#[derive(Deserialize)]
249struct AttachSessionRequest {
250 role: Option<String>,
251}
252
253async fn api_health() -> Json<serde_json::Value> {
254 Json(serde_json::json!({
255 "status": "ok",
256 "service": "post-cortex"
257 }))
258}
259
260async fn api_list_sessions(
261 State(state): State<Arc<ApiState>>,
262) -> Result<Json<Vec<SessionInfo>>, (StatusCode, String)> {
263 let ids = state
264 .memory_system
265 .list_sessions()
266 .await
267 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
268
269 let workspaces = state.memory_system.workspace_manager.list_workspaces();
270 let mut session_workspace_map = std::collections::HashMap::new();
271 for ws in workspaces {
272 for (session_id, _role) in ws.get_all_sessions() {
273 session_workspace_map.insert(session_id, ws.name.clone());
274 }
275 }
276
277 let mut sessions = Vec::new();
278 for id in ids {
279 let name = match state.memory_system.get_session(id).await {
280 Ok(session_arc) => {
281 let session = session_arc.load();
282 session.name().unwrap_or_else(|| "Unnamed".to_string())
283 }
284 Err(_) => "Error loading".to_string(),
285 };
286
287 sessions.push(SessionInfo {
288 id: id.to_string(),
289 name,
290 workspace: session_workspace_map.get(&id).cloned(),
291 });
292 }
293
294 Ok(Json(sessions))
295}
296
297async fn api_create_session(
298 State(state): State<Arc<ApiState>>,
299 Json(req): Json<CreateSessionRequest>,
300) -> Result<Json<SessionInfo>, (StatusCode, String)> {
301 let id = state
302 .memory_system
303 .create_session(req.name.clone(), req.description)
304 .await
305 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
306
307 Ok(Json(SessionInfo {
308 id: id.to_string(),
309 name: req.name.unwrap_or_else(|| "Unnamed".to_string()),
310 workspace: None,
311 }))
312}
313
314async fn api_delete_session(
315 State(state): State<Arc<ApiState>>,
316 Path(id): Path<String>,
317) -> Result<StatusCode, (StatusCode, String)> {
318 let uuid = Uuid::parse_str(&id)
319 .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid UUID: {}", e)))?;
320
321 state
322 .memory_system
323 .get_storage()
324 .delete_session(uuid)
325 .await
326 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
327
328 Ok(StatusCode::NO_CONTENT)
329}
330
331async fn api_list_workspaces(
332 State(state): State<Arc<ApiState>>,
333) -> Result<Json<Vec<WorkspaceInfo>>, (StatusCode, String)> {
334 let workspaces = state
335 .memory_system
336 .get_storage()
337 .list_all_workspaces()
338 .await
339 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
340
341 let existing_sessions: std::collections::HashSet<_> = state
343 .memory_system
344 .list_sessions()
345 .await
346 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?
347 .into_iter()
348 .collect();
349
350 let result: Vec<WorkspaceInfo> = workspaces
351 .into_iter()
352 .map(|ws| {
353 let actual_count = ws
355 .sessions
356 .iter()
357 .filter(|(id, _)| existing_sessions.contains(id))
358 .count();
359 WorkspaceInfo {
360 id: ws.id.to_string(),
361 name: ws.name,
362 description: ws.description,
363 session_count: actual_count,
364 }
365 })
366 .collect();
367
368 Ok(Json(result))
369}
370
371async fn api_create_workspace(
372 State(state): State<Arc<ApiState>>,
373 Json(req): Json<CreateWorkspaceRequest>,
374) -> Result<Json<WorkspaceInfo>, (StatusCode, String)> {
375 let id = Uuid::new_v4();
376 let description = req.description.unwrap_or_default();
377
378 state
379 .memory_system
380 .get_storage()
381 .save_workspace_metadata(id, &req.name, &description, &[])
382 .await
383 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
384
385 state.memory_system.workspace_manager.restore_workspace(
387 id,
388 req.name.clone(),
389 description.clone(),
390 vec![],
391 );
392
393 Ok(Json(WorkspaceInfo {
394 id: id.to_string(),
395 name: req.name,
396 description,
397 session_count: 0,
398 }))
399}
400
401async fn api_delete_workspace(
402 State(state): State<Arc<ApiState>>,
403 Path(id): Path<String>,
404) -> Result<StatusCode, (StatusCode, String)> {
405 let uuid = Uuid::parse_str(&id)
406 .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid UUID: {}", e)))?;
407
408 state
409 .memory_system
410 .get_storage()
411 .delete_workspace(uuid)
412 .await
413 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
414
415 state
417 .memory_system
418 .workspace_manager
419 .delete_workspace(&uuid);
420
421 Ok(StatusCode::NO_CONTENT)
422}
423
424async fn api_attach_session(
425 State(state): State<Arc<ApiState>>,
426 Path((workspace_id, session_id)): Path<(String, String)>,
427 Json(req): Json<AttachSessionRequest>,
428) -> Result<StatusCode, (StatusCode, String)> {
429 let ws_id = Uuid::parse_str(&workspace_id).map_err(|e| {
430 (
431 StatusCode::BAD_REQUEST,
432 format!("Invalid workspace UUID: {}", e),
433 )
434 })?;
435 let sess_id = Uuid::parse_str(&session_id).map_err(|e| {
436 (
437 StatusCode::BAD_REQUEST,
438 format!("Invalid session UUID: {}", e),
439 )
440 })?;
441
442 let role = match req.role.as_deref().unwrap_or("related") {
443 "primary" => post_cortex_core::workspace::SessionRole::Primary,
444 "related" => post_cortex_core::workspace::SessionRole::Related,
445 "dependency" => post_cortex_core::workspace::SessionRole::Dependency,
446 "shared" => post_cortex_core::workspace::SessionRole::Shared,
447 other => return Err((StatusCode::BAD_REQUEST, format!("Invalid role: {}", other))),
448 };
449
450 state
451 .memory_system
452 .get_storage()
453 .add_session_to_workspace(ws_id, sess_id, role)
454 .await
455 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
456
457 let _ = state
459 .memory_system
460 .workspace_manager
461 .add_session_to_workspace(&ws_id, sess_id, role);
462
463 Ok(StatusCode::OK)
464}