Skip to main content

post_cortex_daemon/daemon/server/
mod.rs

1// Copyright (c) 2025, 2026 Julius ML
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in all
11// copies or substantial portions of the Software.
12
13//! Lock-free HTTP daemon server.
14//!
15//! Provides:
16//! - MCP JSON-RPC endpoint (`/message`) + Streamable-HTTP SSE (`/sse`)
17//! - REST API (`/api/*`) for the local `pcx` CLI
18//! - Health (`/health`) and stats (`/stats`)
19//!
20//! Tool-call business logic lives in `tools::mcp::*`; this module is a thin
21//! transport layer that maps JSON payloads onto those calls.
22
23use post_cortex_memory::ConversationMemorySystem;
24use crate::daemon::sse::SSEBroadcaster;
25use axum::{
26    Json, Router,
27    extract::State,
28    response::{
29        IntoResponse,
30        sse::{Event, Sse},
31    },
32    routing::{delete, get, post},
33};
34use dashmap::DashMap;
35use futures::stream::{self};
36use serde::{Deserialize, Serialize};
37use std::net::SocketAddr;
38use std::sync::Arc;
39use std::sync::atomic::{AtomicU64, Ordering};
40use std::time::SystemTime;
41use tower_http::cors::CorsLayer;
42use tracing::{debug, info};
43use uuid::Uuid;
44
45use super::config::DaemonConfig;
46
47mod handlers;
48mod rest;
49
50/// Connection information (lock-free)
51#[allow(dead_code)] // Will be used for connection tracking in future
52struct ConnectionInfo {
53    id: Uuid,
54    connected_at: SystemTime,
55    last_request: Arc<AtomicU64>,
56    request_count: Arc<AtomicU64>,
57}
58
59/// Lock-free daemon server
60pub struct DaemonServer {
61    /// Core memory system (already lock-free)
62    pub(super) memory_system: Arc<ConversationMemorySystem>,
63
64    /// Lock-free connection tracking
65    active_connections: Arc<DashMap<Uuid, ConnectionInfo>>,
66
67    /// Lock-free SSE broadcaster
68    sse_broadcaster: Arc<SSEBroadcaster>,
69
70    /// Map session IDs to SSE client IDs for routing responses
71    session_to_client: Arc<DashMap<String, Uuid>>,
72
73    /// Atomic metrics
74    connection_counter: Arc<AtomicU64>,
75    total_requests: Arc<AtomicU64>,
76    config: DaemonConfig,
77}
78
79impl DaemonServer {
80    /// Create a new daemon server, initialising the memory system from `config`.
81    pub async fn new(config: DaemonConfig) -> Result<Self, String> {
82        info!(
83            "Initializing lock-free daemon server on {}:{}",
84            config.host, config.port
85        );
86
87        // Create memory system with storage backend from config
88        #[allow(unused_mut)]
89        let mut system_config = post_cortex_memory::SystemConfig {
90            data_directory: config.data_directory.clone(),
91            ..Default::default()
92        };
93
94        // Configure storage backend if surrealdb-storage feature is enabled
95        #[cfg(feature = "surrealdb-storage")]
96        {
97            use post_cortex_storage::traits::StorageBackendType;
98
99            system_config.storage_backend = match config.storage_backend.as_str() {
100                "surrealdb" => StorageBackendType::SurrealDB,
101                _ => StorageBackendType::RocksDB,
102            };
103            system_config.surrealdb_endpoint = config.surrealdb_endpoint.clone();
104            system_config.surrealdb_username = config.surrealdb_username.clone();
105            system_config.surrealdb_password = config.surrealdb_password.clone();
106            system_config.surrealdb_namespace = Some(config.surrealdb_namespace.clone());
107            system_config.surrealdb_database = Some(config.surrealdb_database.clone());
108
109            if system_config.storage_backend == StorageBackendType::SurrealDB {
110                info!(
111                    "Using SurrealDB storage backend: {} (ns: {}, db: {})",
112                    system_config
113                        .surrealdb_endpoint
114                        .as_deref()
115                        .unwrap_or("not configured"),
116                    config.surrealdb_namespace,
117                    config.surrealdb_database
118                );
119            } else {
120                info!("Using RocksDB storage backend");
121            }
122        }
123
124        let memory_system = Arc::new(
125            ConversationMemorySystem::new(system_config)
126                .await
127                .map_err(|e| format!("Failed to initialize memory system: {}", e))?,
128        );
129
130        // Inject memory system into MCP tools global singleton
131        // This enables all MCP tool functions to use daemon's shared memory system
132        post_cortex_mcp::inject_memory_system(memory_system.clone());
133
134        info!("Memory system initialized and injected successfully");
135
136        Ok(Self {
137            memory_system,
138            active_connections: Arc::new(DashMap::new()),
139            sse_broadcaster: Arc::new(SSEBroadcaster::new()),
140            session_to_client: Arc::new(DashMap::new()),
141            connection_counter: Arc::new(AtomicU64::new(0)),
142            total_requests: Arc::new(AtomicU64::new(0)),
143            config,
144        })
145    }
146
147    /// Build Axum router for testing without starting TCP server.
148    ///
149    /// Exposes the router for in-memory HTTP testing, eliminating the need
150    /// for real TCP ports in tests.
151    pub fn build_router(self) -> Router {
152        let server = Arc::new(self);
153
154        Router::new()
155            .route("/health", get(health_check))
156            .route("/sse", get(handle_sse_stream))
157            .route("/message", post(handle_mcp_request))
158            .route("/stats", get(get_stats))
159            // REST API for CLI
160            .route(
161                "/api/sessions",
162                get(rest::api_list_sessions).post(rest::api_create_session),
163            )
164            .route("/api/sessions/{id}", delete(rest::api_delete_session))
165            .route(
166                "/api/workspaces",
167                get(rest::api_list_workspaces).post(rest::api_create_workspace),
168            )
169            .route("/api/workspaces/{id}", delete(rest::api_delete_workspace))
170            .route(
171                "/api/workspaces/{workspace_id}/sessions/{session_id}",
172                post(rest::api_attach_session),
173            )
174            .layer(CorsLayer::permissive())
175            .with_state(server)
176    }
177
178    /// Start HTTP server
179    pub async fn start(self) -> Result<(), String> {
180        let addr: SocketAddr = format!("{}:{}", self.config.host, self.config.port)
181            .parse()
182            .map_err(|e| format!("Invalid address: {}", e))?;
183
184        let app = self.build_router();
185
186        info!("Starting HTTP server on {}", addr);
187
188        let listener = tokio::net::TcpListener::bind(addr)
189            .await
190            .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?;
191
192        info!("HTTP server listening on {}", addr);
193
194        axum::serve(listener, app)
195            .await
196            .map_err(|e| format!("Server error: {}", e))?;
197
198        Ok(())
199    }
200
201    /// Get server statistics
202    fn get_statistics(&self) -> ServerStats {
203        ServerStats {
204            active_connections: self.active_connections.len() as u64,
205            total_connections: self.connection_counter.load(Ordering::Relaxed),
206            total_requests: self.total_requests.load(Ordering::Relaxed),
207            active_sse_clients: self.sse_broadcaster.active_clients(),
208            total_sse_events: self.sse_broadcaster.total_events(),
209            workspace_count: self.memory_system.workspace_manager.total_workspaces(),
210        }
211    }
212}
213
214// =============================================================================
215// MCP JSON-RPC types
216// =============================================================================
217
218#[derive(Debug, Deserialize)]
219struct MCPRequest {
220    #[allow(dead_code)] // Used for validation
221    jsonrpc: String,
222    id: serde_json::Value,
223    method: String,
224    #[allow(dead_code)] // Will be used for tool calls
225    params: Option<serde_json::Value>,
226}
227
228#[derive(Debug, Serialize)]
229struct MCPResponse {
230    jsonrpc: String,
231    id: serde_json::Value,
232    result: Option<serde_json::Value>,
233    error: Option<MCPError>,
234}
235
236#[derive(Debug, Serialize)]
237struct MCPError {
238    code: i32,
239    message: String,
240}
241
242/// Server statistics
243#[derive(Debug, Serialize)]
244struct ServerStats {
245    active_connections: u64,
246    total_connections: u64,
247    total_requests: u64,
248    active_sse_clients: u64,
249    total_sse_events: u64,
250    workspace_count: u64,
251}
252
253// =============================================================================
254// Health & stats endpoints
255// =============================================================================
256
257async fn health_check() -> impl IntoResponse {
258    Json(serde_json::json!({
259        "status": "ok",
260        "service": "post-cortex-daemon"
261    }))
262}
263
264async fn get_stats(State(server): State<Arc<DaemonServer>>) -> impl IntoResponse {
265    Json(server.get_statistics())
266}
267
268// =============================================================================
269// SSE stream endpoint for Streamable HTTP transport
270// =============================================================================
271
272async fn handle_sse_stream(State(server): State<Arc<DaemonServer>>) -> impl IntoResponse {
273    use axum::http::header::{HeaderMap, HeaderName, HeaderValue};
274
275    let client_id = Uuid::new_v4();
276    let session_id = Uuid::new_v4().to_string();
277    let rx = server.sse_broadcaster.register_client(client_id);
278
279    // Map session ID to client ID for routing POST responses
280    server
281        .session_to_client
282        .insert(session_id.clone(), client_id);
283
284    info!(
285        "SSE stream connected: {} (session: {})",
286        client_id, session_id
287    );
288
289    let stream = stream::unfold(
290        (rx, client_id, session_id.clone(), server.clone(), true),
291        |(mut rx, client_id, session_id, server, first)| async move {
292            // Send initial endpoint event
293            if first {
294                let endpoint_event = Event::default()
295                    .event("endpoint")
296                    .id("0")
297                    .json_data(serde_json::json!({"uri": "/message"}))
298                    .ok()?;
299                return Some((
300                    Ok::<_, std::convert::Infallible>(endpoint_event),
301                    (rx, client_id, session_id, server, false),
302                ));
303            }
304
305            // Wait for events from broadcaster
306            match rx.recv().await {
307                Some(event) => {
308                    let sse_event = Event::default()
309                        .event(&event.event_type)
310                        .id(event.id)
311                        .json_data(&event.data)
312                        .ok()?;
313                    Some((
314                        Ok::<_, std::convert::Infallible>(sse_event),
315                        (rx, client_id, session_id, server, false),
316                    ))
317                }
318                None => {
319                    // Cleanup on disconnect
320                    server.sse_broadcaster.unregister_client(&client_id);
321                    server.session_to_client.remove(&session_id);
322                    info!(
323                        "SSE stream disconnected: {} (session: {})",
324                        client_id, session_id
325                    );
326                    None
327                }
328            }
329        },
330    );
331
332    let mut headers = HeaderMap::new();
333    headers.insert(
334        HeaderName::from_static("mcp-session-id"),
335        HeaderValue::from_str(&session_id).unwrap(),
336    );
337
338    (headers, Sse::new(stream))
339}
340
341// =============================================================================
342// MCP JSON-RPC request handler
343// =============================================================================
344
345/// MCP request handler — Streamable HTTP transport (2025-03-26).
346/// Returns response directly as JSON (not via SSE).
347async fn handle_mcp_request(
348    State(server): State<Arc<DaemonServer>>,
349    Json(request): Json<MCPRequest>,
350) -> impl IntoResponse {
351    debug!("Handling MCP request: {}", request.method);
352    server.total_requests.fetch_add(1, Ordering::Relaxed);
353
354    // Route to appropriate handler
355    let result = match request.method.as_str() {
356        "initialize" => handle_initialize(),
357        "tools/list" => handle_tools_list(&server),
358        "tools/call" => handle_tool_call(&server, &request).await,
359        _ => Err(format!("Unknown method: {}", request.method)),
360    };
361
362    // Build and return JSON-RPC response
363    Json(match result {
364        Ok(result_data) => MCPResponse {
365            jsonrpc: "2.0".to_string(),
366            id: request.id,
367            result: Some(result_data),
368            error: None,
369        },
370        Err(error_msg) => MCPResponse {
371            jsonrpc: "2.0".to_string(),
372            id: request.id,
373            result: None,
374            error: Some(MCPError {
375                code: -32603,
376                message: error_msg,
377            }),
378        },
379    })
380}
381
382fn handle_initialize() -> Result<serde_json::Value, String> {
383    Ok(serde_json::json!({
384        "protocolVersion": "2025-03-26",
385        "capabilities": {
386            "tools": {}
387        },
388        "serverInfo": {
389            "name": "post-cortex-daemon",
390            "version": env!("CARGO_PKG_VERSION")
391        }
392    }))
393}
394
395fn handle_tools_list(_server: &Arc<DaemonServer>) -> Result<serde_json::Value, String> {
396    Ok(serde_json::json!({
397        "tools": [
398            {
399                "name": "create_session",
400                "description": "Create a new conversation session with optional name and description",
401                "inputSchema": {
402                    "type": "object",
403                    "properties": {
404                        "name": {"type": "string", "description": "Optional name for the session"},
405                        "description": {"type": "string", "description": "Optional description for the session"}
406                    }
407                }
408            },
409            {
410                "name": "update_conversation_context",
411                "description": "Add new interaction context to a session",
412                "inputSchema": {
413                    "type": "object",
414                    "properties": {
415                        "session_id": {"type": "string", "description": "UUID of the session"},
416                        "interaction_type": {"type": "string", "description": "Type: qa, code_change, problem_solved, decision_made, requirement_added, concept_defined"},
417                        "content": {"type": "object", "description": "Content object with interaction data"}
418                    },
419                    "required": ["session_id", "interaction_type", "content"]
420                }
421            }
422        ]
423    }))
424}
425
426async fn handle_tool_call(
427    server: &Arc<DaemonServer>,
428    request: &MCPRequest,
429) -> Result<serde_json::Value, String> {
430    let params = request
431        .params
432        .as_ref()
433        .ok_or_else(|| "Missing params in tool call".to_string())?;
434
435    let tool_name = params["name"]
436        .as_str()
437        .ok_or_else(|| "Missing tool name".to_string())?;
438
439    let arguments = &params["arguments"];
440
441    debug!("Tool call: {} with args: {:?}", tool_name, arguments);
442
443    use handlers::*;
444    match tool_name {
445        // Session Management
446        "create_session" => handle_create_session(server, arguments).await,
447        "load_session" => handle_load_session(server, arguments).await,
448        "list_sessions" => handle_list_sessions(server).await,
449        "search_sessions" => handle_search_sessions(server, arguments).await,
450        "update_session_metadata" => handle_update_session_metadata(server, arguments).await,
451
452        // Context Operations
453        "update_conversation_context" => handle_update_context(server, arguments).await,
454        "query_conversation_context" => handle_query_context(server, arguments).await,
455        "bulk_update_conversation_context" => handle_bulk_update_context(server, arguments).await,
456        "create_session_checkpoint" => handle_create_checkpoint(server, arguments).await,
457
458        // Semantic Search
459        "semantic_search_session" => handle_semantic_search(server, arguments).await,
460        "semantic_search_global" => handle_semantic_search_global(server, arguments).await,
461        "find_related_content" => handle_find_related_content(server, arguments).await,
462        "vectorize_session" => handle_vectorize_session(server, arguments).await,
463        "get_vectorization_stats" => handle_get_vectorization_stats(server).await,
464
465        // Analysis & Insights
466        "get_structured_summary" => handle_get_summary(server, arguments).await,
467        "get_key_decisions" => handle_get_key_decisions(server, arguments).await,
468        "get_key_insights" => handle_get_key_insights(server, arguments).await,
469        "get_entity_importance_analysis" => handle_get_entity_importance(server, arguments).await,
470        "get_entity_network_view" => handle_get_entity_network(server, arguments).await,
471        "get_session_statistics" => handle_get_session_statistics(server, arguments).await,
472        "get_tool_catalog" => handle_get_tool_catalog(server).await,
473
474        // Workspace Management
475        "create_workspace" => handle_create_workspace(server, arguments).await,
476        "get_workspace" => handle_get_workspace(server, arguments).await,
477        "list_workspaces" => handle_list_workspaces(server).await,
478        "delete_workspace" => handle_delete_workspace(server, arguments).await,
479        "add_session_to_workspace" => handle_add_session_to_workspace(server, arguments).await,
480        "remove_session_from_workspace" => {
481            handle_remove_session_from_workspace(server, arguments).await
482        }
483
484        _ => Err(format!("Unknown tool: {}", tool_name)),
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491
492    fn test_config() -> DaemonConfig {
493        DaemonConfig {
494            host: "127.0.0.1".to_string(),
495            port: 0, // Random port
496            grpc_port: 0,
497            data_directory: tempfile::tempdir()
498                .unwrap()
499                .path()
500                .to_str()
501                .unwrap()
502                .to_string(),
503            storage_backend: "rocksdb".to_string(),
504            surrealdb_endpoint: None,
505            surrealdb_username: None,
506            surrealdb_password: None,
507            surrealdb_namespace: "post_cortex".to_string(),
508            surrealdb_database: "main".to_string(),
509        }
510    }
511
512    #[tokio::test]
513    async fn test_daemon_server_creation() {
514        let server = DaemonServer::new(test_config()).await;
515        assert!(server.is_ok());
516
517        let server = server.unwrap();
518        assert_eq!(server.active_connections.len(), 0);
519        assert_eq!(server.total_requests.load(Ordering::Relaxed), 0);
520    }
521
522    #[tokio::test]
523    async fn test_server_statistics() {
524        let server = DaemonServer::new(test_config()).await.unwrap();
525
526        let stats = server.get_statistics();
527        assert_eq!(stats.active_connections, 0);
528        assert_eq!(stats.total_requests, 0);
529        assert_eq!(stats.workspace_count, 0);
530    }
531}