1use crate::daemon::sse::SSEBroadcaster;
24use axum::{
25 Json, Router,
26 extract::State,
27 response::{
28 IntoResponse,
29 sse::{Event, Sse},
30 },
31 routing::{delete, get, post},
32};
33use dashmap::DashMap;
34use futures::stream::{self};
35use post_cortex_memory::ConversationMemorySystem;
36use serde::{Deserialize, Serialize};
37use std::net::SocketAddr;
38use std::sync::Arc;
39use std::sync::atomic::{AtomicU64, Ordering};
40use std::time::SystemTime;
41use tower_http::cors::CorsLayer;
42use tracing::{debug, info};
43use uuid::Uuid;
44
45use super::config::DaemonConfig;
46
47mod handlers;
48mod rest;
49
50#[allow(dead_code)] struct ConnectionInfo {
53 id: Uuid,
54 connected_at: SystemTime,
55 last_request: Arc<AtomicU64>,
56 request_count: Arc<AtomicU64>,
57}
58
59pub struct DaemonServer {
61 pub(super) memory_system: Arc<ConversationMemorySystem>,
63
64 active_connections: Arc<DashMap<Uuid, ConnectionInfo>>,
66
67 sse_broadcaster: Arc<SSEBroadcaster>,
69
70 session_to_client: Arc<DashMap<String, Uuid>>,
72
73 connection_counter: Arc<AtomicU64>,
75 total_requests: Arc<AtomicU64>,
76 config: DaemonConfig,
77}
78
79impl DaemonServer {
80 pub async fn new(config: DaemonConfig) -> Result<Self, String> {
82 info!(
83 "Initializing lock-free daemon server on {}:{}",
84 config.host, config.port
85 );
86
87 #[allow(unused_mut)]
89 let mut system_config = post_cortex_memory::SystemConfig {
90 data_directory: config.data_directory.clone(),
91 ..Default::default()
92 };
93
94 #[cfg(feature = "surrealdb-storage")]
96 {
97 use post_cortex_storage::traits::StorageBackendType;
98
99 system_config.storage_backend = match config.storage_backend.as_str() {
100 "surrealdb" => StorageBackendType::SurrealDB,
101 _ => StorageBackendType::RocksDB,
102 };
103 system_config.surrealdb_endpoint = config.surrealdb_endpoint.clone();
104 system_config.surrealdb_username = config.surrealdb_username.clone();
105 system_config.surrealdb_password = config.surrealdb_password.clone();
106 system_config.surrealdb_namespace = Some(config.surrealdb_namespace.clone());
107 system_config.surrealdb_database = Some(config.surrealdb_database.clone());
108
109 if system_config.storage_backend == StorageBackendType::SurrealDB {
110 info!(
111 "Using SurrealDB storage backend: {} (ns: {}, db: {})",
112 system_config
113 .surrealdb_endpoint
114 .as_deref()
115 .unwrap_or("not configured"),
116 config.surrealdb_namespace,
117 config.surrealdb_database
118 );
119 } else {
120 info!("Using RocksDB storage backend");
121 }
122 }
123
124 let memory_system = Arc::new(
125 ConversationMemorySystem::new(system_config)
126 .await
127 .map_err(|e| format!("Failed to initialize memory system: {}", e))?,
128 );
129
130 post_cortex_mcp::inject_memory_system(memory_system.clone());
133
134 info!("Memory system initialized and injected successfully");
135
136 Ok(Self {
137 memory_system,
138 active_connections: Arc::new(DashMap::new()),
139 sse_broadcaster: Arc::new(SSEBroadcaster::new()),
140 session_to_client: Arc::new(DashMap::new()),
141 connection_counter: Arc::new(AtomicU64::new(0)),
142 total_requests: Arc::new(AtomicU64::new(0)),
143 config,
144 })
145 }
146
147 pub fn build_router(self) -> Router {
152 let server = Arc::new(self);
153
154 Router::new()
155 .route("/health", get(health_check))
156 .route("/sse", get(handle_sse_stream))
157 .route("/message", post(handle_mcp_request))
158 .route("/stats", get(get_stats))
159 .route(
161 "/api/sessions",
162 get(rest::api_list_sessions).post(rest::api_create_session),
163 )
164 .route("/api/sessions/{id}", delete(rest::api_delete_session))
165 .route(
166 "/api/workspaces",
167 get(rest::api_list_workspaces).post(rest::api_create_workspace),
168 )
169 .route("/api/workspaces/{id}", delete(rest::api_delete_workspace))
170 .route(
171 "/api/workspaces/{workspace_id}/sessions/{session_id}",
172 post(rest::api_attach_session),
173 )
174 .layer(CorsLayer::permissive())
175 .with_state(server)
176 }
177
178 pub async fn start(self) -> Result<(), String> {
180 let addr: SocketAddr = format!("{}:{}", self.config.host, self.config.port)
181 .parse()
182 .map_err(|e| format!("Invalid address: {}", e))?;
183
184 let app = self.build_router();
185
186 info!("Starting HTTP server on {}", addr);
187
188 let listener = tokio::net::TcpListener::bind(addr)
189 .await
190 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
191
192 info!("HTTP server listening on {}", addr);
193
194 axum::serve(listener, app)
195 .await
196 .map_err(|e| format!("Server error: {}", e))?;
197
198 Ok(())
199 }
200
201 fn get_statistics(&self) -> ServerStats {
203 ServerStats {
204 active_connections: self.active_connections.len() as u64,
205 total_connections: self.connection_counter.load(Ordering::Relaxed),
206 total_requests: self.total_requests.load(Ordering::Relaxed),
207 active_sse_clients: self.sse_broadcaster.active_clients(),
208 total_sse_events: self.sse_broadcaster.total_events(),
209 workspace_count: self.memory_system.workspace_manager.total_workspaces(),
210 }
211 }
212}
213
214#[derive(Debug, Deserialize)]
219struct MCPRequest {
220 #[allow(dead_code)] jsonrpc: String,
222 id: serde_json::Value,
223 method: String,
224 #[allow(dead_code)] params: Option<serde_json::Value>,
226}
227
228#[derive(Debug, Serialize)]
229struct MCPResponse {
230 jsonrpc: String,
231 id: serde_json::Value,
232 result: Option<serde_json::Value>,
233 error: Option<MCPError>,
234}
235
236#[derive(Debug, Serialize)]
237struct MCPError {
238 code: i32,
239 message: String,
240}
241
242#[derive(Debug, Serialize)]
244struct ServerStats {
245 active_connections: u64,
246 total_connections: u64,
247 total_requests: u64,
248 active_sse_clients: u64,
249 total_sse_events: u64,
250 workspace_count: u64,
251}
252
253async fn health_check() -> impl IntoResponse {
258 Json(serde_json::json!({
259 "status": "ok",
260 "service": "post-cortex-daemon"
261 }))
262}
263
264async fn get_stats(State(server): State<Arc<DaemonServer>>) -> impl IntoResponse {
265 Json(server.get_statistics())
266}
267
268async fn handle_sse_stream(State(server): State<Arc<DaemonServer>>) -> impl IntoResponse {
273 use axum::http::header::{HeaderMap, HeaderName, HeaderValue};
274
275 let client_id = Uuid::new_v4();
276 let session_id = Uuid::new_v4().to_string();
277 let rx = server.sse_broadcaster.register_client(client_id);
278
279 server
281 .session_to_client
282 .insert(session_id.clone(), client_id);
283
284 info!(
285 "SSE stream connected: {} (session: {})",
286 client_id, session_id
287 );
288
289 let stream = stream::unfold(
290 (rx, client_id, session_id.clone(), server.clone(), true),
291 |(mut rx, client_id, session_id, server, first)| async move {
292 if first {
294 let endpoint_event = Event::default()
295 .event("endpoint")
296 .id("0")
297 .json_data(serde_json::json!({"uri": "/message"}))
298 .ok()?;
299 return Some((
300 Ok::<_, std::convert::Infallible>(endpoint_event),
301 (rx, client_id, session_id, server, false),
302 ));
303 }
304
305 match rx.recv().await {
307 Some(event) => {
308 let sse_event = Event::default()
309 .event(&event.event_type)
310 .id(event.id)
311 .json_data(&event.data)
312 .ok()?;
313 Some((
314 Ok::<_, std::convert::Infallible>(sse_event),
315 (rx, client_id, session_id, server, false),
316 ))
317 }
318 None => {
319 server.sse_broadcaster.unregister_client(&client_id);
321 server.session_to_client.remove(&session_id);
322 info!(
323 "SSE stream disconnected: {} (session: {})",
324 client_id, session_id
325 );
326 None
327 }
328 }
329 },
330 );
331
332 let mut headers = HeaderMap::new();
333 headers.insert(
334 HeaderName::from_static("mcp-session-id"),
335 HeaderValue::from_str(&session_id).unwrap(),
336 );
337
338 (headers, Sse::new(stream))
339}
340
341async fn handle_mcp_request(
348 State(server): State<Arc<DaemonServer>>,
349 Json(request): Json<MCPRequest>,
350) -> impl IntoResponse {
351 debug!("Handling MCP request: {}", request.method);
352 server.total_requests.fetch_add(1, Ordering::Relaxed);
353
354 let result = match request.method.as_str() {
356 "initialize" => handle_initialize(),
357 "tools/list" => handle_tools_list(&server),
358 "tools/call" => handle_tool_call(&server, &request).await,
359 _ => Err(format!("Unknown method: {}", request.method)),
360 };
361
362 Json(match result {
364 Ok(result_data) => MCPResponse {
365 jsonrpc: "2.0".to_string(),
366 id: request.id,
367 result: Some(result_data),
368 error: None,
369 },
370 Err(error_msg) => MCPResponse {
371 jsonrpc: "2.0".to_string(),
372 id: request.id,
373 result: None,
374 error: Some(MCPError {
375 code: -32603,
376 message: error_msg,
377 }),
378 },
379 })
380}
381
382fn handle_initialize() -> Result<serde_json::Value, String> {
383 Ok(serde_json::json!({
384 "protocolVersion": "2025-03-26",
385 "capabilities": {
386 "tools": {}
387 },
388 "serverInfo": {
389 "name": "post-cortex-daemon",
390 "version": env!("CARGO_PKG_VERSION")
391 }
392 }))
393}
394
395fn handle_tools_list(_server: &Arc<DaemonServer>) -> Result<serde_json::Value, String> {
396 Ok(serde_json::json!({
397 "tools": [
398 {
399 "name": "create_session",
400 "description": "Create a new conversation session with optional name and description",
401 "inputSchema": {
402 "type": "object",
403 "properties": {
404 "name": {"type": "string", "description": "Optional name for the session"},
405 "description": {"type": "string", "description": "Optional description for the session"}
406 }
407 }
408 },
409 {
410 "name": "update_conversation_context",
411 "description": "Add new interaction context to a session",
412 "inputSchema": {
413 "type": "object",
414 "properties": {
415 "session_id": {"type": "string", "description": "UUID of the session"},
416 "interaction_type": {"type": "string", "description": "Type: qa, code_change, problem_solved, decision_made, requirement_added, concept_defined"},
417 "content": {"type": "object", "description": "Content object with interaction data"}
418 },
419 "required": ["session_id", "interaction_type", "content"]
420 }
421 }
422 ]
423 }))
424}
425
426async fn handle_tool_call(
427 server: &Arc<DaemonServer>,
428 request: &MCPRequest,
429) -> Result<serde_json::Value, String> {
430 let params = request
431 .params
432 .as_ref()
433 .ok_or_else(|| "Missing params in tool call".to_string())?;
434
435 let tool_name = params["name"]
436 .as_str()
437 .ok_or_else(|| "Missing tool name".to_string())?;
438
439 let arguments = ¶ms["arguments"];
440
441 debug!("Tool call: {} with args: {:?}", tool_name, arguments);
442
443 use handlers::*;
444 match tool_name {
445 "create_session" => handle_create_session(server, arguments).await,
447 "load_session" => handle_load_session(server, arguments).await,
448 "list_sessions" => handle_list_sessions(server).await,
449 "search_sessions" => handle_search_sessions(server, arguments).await,
450 "update_session_metadata" => handle_update_session_metadata(server, arguments).await,
451
452 "update_conversation_context" => handle_update_context(server, arguments).await,
454 "query_conversation_context" => handle_query_context(server, arguments).await,
455 "bulk_update_conversation_context" => handle_bulk_update_context(server, arguments).await,
456 "create_session_checkpoint" => handle_create_checkpoint(server, arguments).await,
457
458 "semantic_search_session" => handle_semantic_search(server, arguments).await,
460 "semantic_search_global" => handle_semantic_search_global(server, arguments).await,
461 "find_related_content" => handle_find_related_content(server, arguments).await,
462 "vectorize_session" => handle_vectorize_session(server, arguments).await,
463 "get_vectorization_stats" => handle_get_vectorization_stats(server).await,
464
465 "get_structured_summary" => handle_get_summary(server, arguments).await,
467 "get_key_decisions" => handle_get_key_decisions(server, arguments).await,
468 "get_key_insights" => handle_get_key_insights(server, arguments).await,
469 "get_entity_importance_analysis" => handle_get_entity_importance(server, arguments).await,
470 "get_entity_network_view" => handle_get_entity_network(server, arguments).await,
471 "get_session_statistics" => handle_get_session_statistics(server, arguments).await,
472 "get_tool_catalog" => handle_get_tool_catalog(server).await,
473
474 "create_workspace" => handle_create_workspace(server, arguments).await,
476 "get_workspace" => handle_get_workspace(server, arguments).await,
477 "list_workspaces" => handle_list_workspaces(server).await,
478 "delete_workspace" => handle_delete_workspace(server, arguments).await,
479 "add_session_to_workspace" => handle_add_session_to_workspace(server, arguments).await,
480 "remove_session_from_workspace" => {
481 handle_remove_session_from_workspace(server, arguments).await
482 }
483
484 _ => Err(format!("Unknown tool: {}", tool_name)),
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 fn test_config() -> DaemonConfig {
493 DaemonConfig {
494 host: "127.0.0.1".to_string(),
495 port: 0, grpc_port: 0,
497 data_directory: tempfile::tempdir()
498 .unwrap()
499 .path()
500 .to_str()
501 .unwrap()
502 .to_string(),
503 storage_backend: "rocksdb".to_string(),
504 surrealdb_endpoint: None,
505 surrealdb_username: None,
506 surrealdb_password: None,
507 surrealdb_namespace: "post_cortex".to_string(),
508 surrealdb_database: "main".to_string(),
509 }
510 }
511
512 #[tokio::test]
513 async fn test_daemon_server_creation() {
514 let server = DaemonServer::new(test_config()).await;
515 assert!(server.is_ok());
516
517 let server = server.unwrap();
518 assert_eq!(server.active_connections.len(), 0);
519 assert_eq!(server.total_requests.load(Ordering::Relaxed), 0);
520 }
521
522 #[tokio::test]
523 async fn test_server_statistics() {
524 let server = DaemonServer::new(test_config()).await.unwrap();
525
526 let stats = server.get_statistics();
527 assert_eq!(stats.active_connections, 0);
528 assert_eq!(stats.total_requests, 0);
529 assert_eq!(stats.workspace_count, 0);
530 }
531}