Skip to main content

post_cortex_mcp/
session.rs

1//! Session lifecycle: create, load, checkpoint, list, search, and metadata.
2
3use crate::{MCPToolResult, get_memory_system, string_to_anyhow};
4use anyhow::Result;
5use arc_swap::ArcSwap;
6use post_cortex_core::core::timeout_utils::with_storage_timeout;
7use post_cortex_core::session::active_session::ActiveSession;
8use post_cortex_memory::ConversationMemorySystem;
9use post_cortex_storage::rocksdb_storage::SessionCheckpoint;
10use std::sync::Arc;
11use tracing::{error, info, instrument};
12use uuid::Uuid;
13
14/// Create a session checkpoint using an explicit memory system reference.
15pub async fn create_session_checkpoint_with_system(
16    session_id: Uuid,
17    system: &ConversationMemorySystem,
18) -> Result<MCPToolResult> {
19    let session_arc = system
20        .get_session(session_id)
21        .await
22        .map_err(|e| anyhow::anyhow!("Failed to load session: {}", e))?;
23    let session = session_arc.load();
24
25    let checkpoint = create_comprehensive_checkpoint(&session).await?;
26
27    system
28        .storage_actor
29        .save_checkpoint(&checkpoint)
30        .await
31        .map_err(string_to_anyhow)?;
32
33    Ok(MCPToolResult::success(
34        "Checkpoint created successfully".to_string(),
35        Some(serde_json::json!({ "checkpoint_id": checkpoint.id.to_string() })),
36    ))
37}
38
39/// Load a session from a previously saved checkpoint.
40pub async fn load_session_checkpoint_with_system(
41    checkpoint_id: String,
42    session_id: Uuid,
43    system: &ConversationMemorySystem,
44) -> Result<MCPToolResult> {
45    eprintln!("Loading checkpoint - step 1: Parsing checkpoint ID");
46    let checkpoint_id = Uuid::parse_str(&checkpoint_id)?;
47
48    eprintln!("Loading checkpoint - step 2: Loading checkpoint from storage");
49    let checkpoint = system
50        .storage_actor
51        .load_checkpoint(checkpoint_id)
52        .await
53        .map_err(string_to_anyhow)?;
54
55    eprintln!("Loading checkpoint - step 3: Checkpoint loaded successfully");
56
57    eprintln!("Loading checkpoint - step 4: Creating session from checkpoint");
58    let mut session = ActiveSession::new(session_id, None, None);
59
60    eprintln!("Loading checkpoint - step 5: Restoring current state");
61    session.current_state = Arc::new(checkpoint.structured_context);
62
63    eprintln!("Loading checkpoint - step 6: Restoring incremental updates");
64    session.incremental_updates = Arc::new(checkpoint.recent_updates);
65
66    eprintln!("Loading checkpoint - step 10: Restoring code references");
67    session.code_references = Arc::new(checkpoint.code_references);
68
69    eprintln!("Loading checkpoint - step 11: Restoring change history");
70    session.change_history = Arc::new(checkpoint.change_history);
71
72    eprintln!("Loading checkpoint - step 12: Entity graph restored");
73
74    eprintln!("Loading checkpoint - step 13: Adding session to session manager");
75    system
76        .session_manager
77        .sessions
78        .put(session_id, Arc::new(ArcSwap::new(Arc::new(session))));
79
80    eprintln!("Loading checkpoint - step 14: Updated session manager");
81
82    Ok(MCPToolResult::success(
83        "Session loaded from checkpoint successfully".to_string(),
84        None,
85    ))
86}
87
88/// Create a session checkpoint using the global memory system.
89pub async fn create_session_checkpoint(session_id: Uuid) -> Result<MCPToolResult> {
90    let result = with_storage_timeout(async {
91        let system = get_memory_system().await?;
92        let session_arc = system
93            .get_session(session_id)
94            .await
95            .map_err(|e| anyhow::anyhow!("Failed to load session: {}", e))?;
96        let session = session_arc.load();
97
98        let checkpoint = create_comprehensive_checkpoint(&session).await?;
99
100        system
101            .storage_actor
102            .save_checkpoint(&checkpoint)
103            .await
104            .map_err(string_to_anyhow)?;
105
106        Ok(MCPToolResult::success(
107            "Checkpoint created successfully".to_string(),
108            Some(serde_json::json!({ "checkpoint_id": checkpoint.id.to_string() })),
109        ))
110    })
111    .await;
112
113    match result {
114        Ok(success_result) => success_result,
115        Err(timeout_error) => {
116            error!(
117                "TIMEOUT: create_session_checkpoint - session: {}, error: {}",
118                session_id, timeout_error
119            );
120            Ok(MCPToolResult::error(format!(
121                "Checkpoint creation timed out: {}",
122                timeout_error
123            )))
124        }
125    }
126}
127
128/// Load a session checkpoint using the global memory system.
129pub async fn load_session_checkpoint(
130    checkpoint_id: String,
131    session_id: Uuid,
132) -> Result<MCPToolResult> {
133    let result = with_storage_timeout(async {
134        let system = get_memory_system().await?;
135        let checkpoint_id = Uuid::parse_str(&checkpoint_id)?;
136
137        let checkpoint = system
138            .storage_actor
139            .load_checkpoint(checkpoint_id)
140            .await
141            .map_err(string_to_anyhow)?;
142
143        let mut session = ActiveSession::new(session_id, None, None);
144        session.current_state = Arc::new(checkpoint.structured_context);
145        session.incremental_updates = Arc::new(checkpoint.recent_updates);
146        session.code_references = Arc::new(checkpoint.code_references);
147        session.change_history = Arc::new(checkpoint.change_history);
148
149        system
150            .session_manager
151            .sessions
152            .put(session_id, Arc::new(ArcSwap::new(Arc::new(session))));
153
154        Ok(MCPToolResult::success(
155            "Session loaded from checkpoint successfully".to_string(),
156            None,
157        ))
158    })
159    .await;
160
161    match result {
162        Ok(success_result) => success_result,
163        Err(timeout_error) => {
164            error!(
165                "TIMEOUT: load_session_checkpoint - session: {}, error: {}",
166                session_id, timeout_error
167            );
168            Ok(MCPToolResult::error(format!(
169                "Checkpoint loading timed out: {}",
170                timeout_error
171            )))
172        }
173    }
174}
175
176/// Mark a specific context update as important within a session.
177pub async fn mark_important(session_id: Uuid, update_id: String) -> Result<MCPToolResult> {
178    let update_id = Uuid::parse_str(&update_id)?;
179    let system = get_memory_system().await?;
180    let session_arc = system
181        .get_session(session_id)
182        .await
183        .map_err(|e| anyhow::anyhow!("Failed to load session: {}", e))?;
184
185    let mut found = false;
186    session_arc.rcu(|current| {
187        let mut updated = (**current).clone();
188        let updates = Arc::make_mut(&mut updated.incremental_updates);
189        for update in updates.iter_mut() {
190            if update.id == update_id {
191                update.user_marked_important = true;
192                found = true;
193                break;
194            }
195        }
196        Arc::new(updated)
197    });
198
199    if found {
200        Ok(MCPToolResult::success(
201            "Update marked as important".to_string(),
202            None,
203        ))
204    } else {
205        Ok(MCPToolResult::error("Update not found".to_string()))
206    }
207}
208
209/// List all persisted sessions using a direct storage reference.
210pub async fn list_sessions_with_storage(
211    storage: &post_cortex_storage::rocksdb_storage::RealRocksDBStorage,
212) -> Result<MCPToolResult> {
213    match storage.list_sessions().await {
214        Ok(session_ids) => {
215            let mut sessions_info = Vec::new();
216
217            for session_id in session_ids {
218                match storage.load_session(session_id).await {
219                    Ok(session) => {
220                        sessions_info.push(serde_json::json!({
221                            "id": session_id.to_string(),
222                            "name": session.name(),
223                            "description": session.description(),
224                            "created_at": session.created_at().to_rfc3339(),
225                            "last_updated": session.last_updated.to_rfc3339(),
226                            "update_count": session.incremental_updates.len(),
227                            "entity_count": session.entity_graph.entities.len()
228                        }));
229                    }
230                    Err(_) => {
231                        sessions_info.push(serde_json::json!({
232                            "id": session_id.to_string(),
233                            "name": null,
234                            "description": null,
235                            "created_at": "unknown",
236                            "last_updated": "unknown",
237                            "update_count": 0,
238                            "entity_count": 0
239                        }));
240                    }
241                }
242            }
243            Ok(MCPToolResult::success(
244                format!("Found {} sessions", sessions_info.len()),
245                Some(serde_json::json!({
246                    "sessions": sessions_info
247                })),
248            ))
249        }
250        Err(e) => Ok(MCPToolResult::error(format!(
251            "Failed to load sessions: {e}"
252        ))),
253    }
254}
255
256/// List all sessions via the global memory system.
257pub async fn list_sessions() -> Result<MCPToolResult> {
258    info!("MCP-TOOLS: list_sessions() called");
259    let result = with_storage_timeout(async {
260        info!("MCP-TOOLS: Getting memory system for list_sessions");
261        let system = get_memory_system().await?;
262        info!("MCP-TOOLS: Got memory system, listing sessions");
263        let session_ids = system.list_sessions().await.map_err(string_to_anyhow)?;
264
265        let mut sessions_info = Vec::new();
266        for session_id in session_ids {
267            match system.get_session(session_id).await {
268                Ok(session_arc) => {
269                    let session = session_arc.load();
270                    sessions_info.push(serde_json::json!({
271                        "id": session_id.to_string(),
272                        "name": session.name(),
273                        "description": session.description(),
274                        "created_at": session.created_at().to_rfc3339(),
275                        "last_updated": session.last_updated.to_rfc3339(),
276                        "update_count": session.incremental_updates.len(),
277                        "entity_count": session.entity_graph.entities.len()
278                    }));
279                }
280                Err(_) => {
281                    sessions_info.push(serde_json::json!({
282                        "id": session_id.to_string(),
283                        "name": null,
284                        "description": null,
285                        "created_at": "unknown",
286                        "last_updated": "unknown",
287                        "update_count": 0,
288                        "entity_count": 0
289                    }));
290                }
291            }
292        }
293
294        Ok(MCPToolResult::success(
295            format!("Found {} sessions", sessions_info.len()),
296            Some(serde_json::json!({
297                "sessions": sessions_info
298            })),
299        ))
300    })
301    .await;
302
303    match result {
304        Ok(success_result) => success_result,
305        Err(timeout_error) => {
306            error!("TIMEOUT: list_sessions - error: {timeout_error}");
307            Ok(MCPToolResult::error(format!(
308                "Session listing timed out: {timeout_error}"
309            )))
310        }
311    }
312}
313
314/// Load a session by ID using an explicit memory system reference.
315pub async fn load_session_with_system(
316    session_id: Uuid,
317    system: &ConversationMemorySystem,
318) -> Result<MCPToolResult> {
319    match system.get_session(session_id).await {
320        Ok(session_arc) => {
321            let session = session_arc.load();
322            Ok(MCPToolResult::success(
323                "Session loaded successfully".to_string(),
324                Some(serde_json::json!({
325                    "session": {
326                        "id": session.id().to_string(),
327                        "created_at": session.created_at().to_rfc3339(),
328                        "last_updated": session.last_updated.to_rfc3339(),
329                        "update_count": session.incremental_updates.len(),
330                        "entity_count": session.entity_graph.entities.len(),
331                        "hot_context_size": session.hot_context.len(),
332                        "warm_context_size": session.warm_context.len(),
333                        "cold_context_size": session.cold_context.len(),
334                        "code_references": session.code_references.keys().collect::<Vec<_>>(),
335                        "change_history_count": session.change_history.len()
336                    }
337                })),
338            ))
339        }
340        Err(e) => Ok(MCPToolResult::error(
341            format!("Failed to load session: {e}",),
342        )),
343    }
344}
345
346/// Load a session by ID via the global memory system.
347pub async fn load_session(session_id: Uuid) -> Result<MCPToolResult> {
348    let result = with_storage_timeout(async {
349        let system = get_memory_system().await?;
350        load_session_with_system(session_id, &system).await
351    })
352    .await;
353
354    match result {
355        Ok(success_result) => success_result,
356        Err(timeout_error) => {
357            error!(
358                "TIMEOUT: load_session - session: {}, error: {}",
359                session_id, timeout_error
360            );
361            Ok(MCPToolResult::error(format!(
362                "Session loading timed out: {}",
363                timeout_error
364            )))
365        }
366    }
367}
368
369/// Search sessions by name or description.
370pub async fn search_sessions(query: String) -> Result<MCPToolResult> {
371    let result = with_storage_timeout(async {
372        let system = get_memory_system().await?;
373        let session_ids = system
374            .find_sessions_by_name_or_description(&query)
375            .await
376            .map_err(string_to_anyhow)?;
377
378        let mut sessions = Vec::new();
379        for session_id in session_ids {
380            if let Ok(session_arc) = system.get_session(session_id).await {
381                let session = session_arc.load();
382                sessions.push(serde_json::json!({
383                    "id": session_id.to_string(),
384                    "name": session.name(),
385                    "description": session.description()
386                }));
387            }
388        }
389
390        Ok(MCPToolResult::success(
391            format!("Found {} sessions matching '{}'", sessions.len(), query),
392            Some(serde_json::json!({
393                "sessions": sessions
394            })),
395        ))
396    })
397    .await;
398
399    match result {
400        Ok(success_result) => success_result,
401        Err(timeout_error) => {
402            error!(
403                "TIMEOUT: search_sessions - query: {}, error: {}",
404                query, timeout_error
405            );
406            Ok(MCPToolResult::error(format!(
407                "Session search timed out: {}",
408                timeout_error
409            )))
410        }
411    }
412}
413
414/// Update the name and/or description of a session.
415#[instrument(skip(session_id), fields(session_id = %session_id))]
416pub async fn update_session_metadata(
417    session_id: Uuid,
418    name: Option<String>,
419    description: Option<String>,
420) -> Result<MCPToolResult> {
421    let result = with_storage_timeout(async {
422        let system = get_memory_system().await?;
423        system
424            .update_session_metadata(session_id, name, description)
425            .await
426            .map_err(string_to_anyhow)?;
427
428        let session_arc = system
429            .get_session(session_id)
430            .await
431            .map_err(|e| anyhow::anyhow!("Failed to load session: {}", e))?;
432        let session = session_arc.load();
433        let (final_name, final_description) = session.get_metadata();
434
435        Ok(MCPToolResult::success(
436            "Session metadata updated successfully".to_string(),
437            Some(serde_json::json!({
438                "session_id": session_id.to_string(),
439                "name": final_name,
440                "description": final_description
441            })),
442        ))
443    })
444    .await;
445
446    match result {
447        Ok(success_result) => success_result,
448        Err(timeout_error) => {
449            error!(
450                "TIMEOUT: update_session_metadata - session_id: {}, error: {}",
451                session_id, timeout_error
452            );
453            Ok(MCPToolResult::error(format!(
454                "TIMEOUT: Failed to update session metadata: {}",
455                timeout_error
456            )))
457        }
458    }
459}
460
461/// Build a comprehensive checkpoint snapshot from an active session.
462async fn create_comprehensive_checkpoint(session: &ActiveSession) -> Result<SessionCheckpoint> {
463    Ok(SessionCheckpoint {
464        id: Uuid::new_v4(),
465        session_id: session.id(),
466        created_at: chrono::Utc::now(),
467        structured_context: (*session.current_state).clone(),
468        recent_updates: (*session.incremental_updates).clone(),
469        code_references: (*session.code_references).clone(),
470        change_history: (*session.change_history).clone(),
471        total_updates: session.incremental_updates.len(),
472        context_quality_score: 1.0,
473        compression_ratio: 1.0,
474    })
475}