Skip to main content

lean_ctx/core/context_os/
shared_sessions.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use tokio::sync::RwLock;
7
8use crate::core::project_hash;
9use crate::core::session::SessionState;
10
11const MAX_CACHED_SESSIONS: usize = 8;
12
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub struct SharedSessionKey {
15    pub project_hash: String,
16    pub workspace_id: String,
17    pub channel_id: String,
18}
19
20impl SharedSessionKey {
21    pub fn new(project_root: &str, workspace_id: &str, channel_id: &str) -> Self {
22        Self {
23            project_hash: project_hash::hash_project_root(project_root),
24            workspace_id: normalize_id(workspace_id, "default"),
25            channel_id: normalize_id(channel_id, "default"),
26        }
27    }
28}
29
30struct SessionEntry {
31    session: Arc<RwLock<SessionState>>,
32    project_root: String,
33    last_accessed: Instant,
34}
35
36pub struct SharedSessionStore {
37    sessions: Mutex<HashMap<SharedSessionKey, SessionEntry>>,
38}
39
40impl Default for SharedSessionStore {
41    fn default() -> Self {
42        Self {
43            sessions: Mutex::new(HashMap::new()),
44        }
45    }
46}
47
48impl SharedSessionStore {
49    pub fn new() -> Self {
50        Self::default()
51    }
52
53    pub fn get_or_load(
54        &self,
55        project_root: &str,
56        workspace_id: &str,
57        channel_id: &str,
58    ) -> Arc<RwLock<SessionState>> {
59        let key = SharedSessionKey::new(project_root, workspace_id, channel_id);
60        let disk_key = key.clone();
61        let root = project_root.to_string();
62        let mut map = self
63            .sessions
64            .lock()
65            .unwrap_or_else(std::sync::PoisonError::into_inner);
66
67        if let Some(entry) = map.get_mut(&key) {
68            entry.last_accessed = Instant::now();
69            return entry.session.clone();
70        }
71
72        Self::evict_lru_if_needed(&mut map);
73
74        let mut loaded = load_session_from_disk(&root, &disk_key)
75            .or_else(|| SessionState::load_latest_for_project_root(&root))
76            .unwrap_or_default();
77        loaded.project_root = Some(root.clone());
78        let session = Arc::new(RwLock::new(loaded));
79
80        map.insert(
81            key,
82            SessionEntry {
83                session: session.clone(),
84                project_root: root,
85                last_accessed: Instant::now(),
86            },
87        );
88
89        session
90    }
91
92    fn evict_lru_if_needed(map: &mut HashMap<SharedSessionKey, SessionEntry>) {
93        if map.len() < MAX_CACHED_SESSIONS {
94            return;
95        }
96
97        let lru_key = map
98            .iter()
99            .min_by_key(|(_, e)| e.last_accessed)
100            .map(|(k, _)| k.clone());
101
102        if let Some(key) = lru_key {
103            if let Some(entry) = map.remove(&key) {
104                if let Ok(session) = entry.session.try_read() {
105                    persist_session_to_disk(&key, &entry.project_root, &session);
106                }
107            }
108        }
109    }
110
111    pub fn active_count(&self) -> usize {
112        self.sessions
113            .lock()
114            .unwrap_or_else(std::sync::PoisonError::into_inner)
115            .len()
116    }
117
118    /// Returns the max sessions limit for diagnostics.
119    pub fn max_sessions() -> usize {
120        MAX_CACHED_SESSIONS
121    }
122
123    pub fn persist_best_effort(
124        &self,
125        project_root: &str,
126        workspace_id: &str,
127        channel_id: &str,
128        session: &SessionState,
129    ) {
130        let key = SharedSessionKey::new(project_root, workspace_id, channel_id);
131        persist_session_to_disk(&key, project_root, session);
132    }
133}
134
135fn persist_session_to_disk(key: &SharedSessionKey, _project_root: &str, session: &SessionState) {
136    let Some(dir) = shared_session_dir(key) else {
137        return;
138    };
139    let _ = std::fs::create_dir_all(&dir);
140    let state_path = dir.join("session.json");
141    let tmp = dir.join("session.json.tmp");
142
143    if let Ok(json) = serde_json::to_string_pretty(session) {
144        let _ = std::fs::write(&tmp, json);
145        let _ = std::fs::rename(&tmp, &state_path);
146    }
147
148    if session.task.is_some() {
149        let snapshot = session.build_compaction_snapshot();
150        let _ = std::fs::write(dir.join("snapshot.txt"), snapshot);
151    }
152}
153
154fn normalize_id(s: &str, fallback: &str) -> String {
155    let t = s.trim();
156    if t.is_empty() {
157        fallback.to_string()
158    } else {
159        // Keep IDs URL/header safe.
160        t.chars()
161            .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
162            .collect::<String>()
163    }
164}
165
166fn shared_session_dir(key: &SharedSessionKey) -> Option<PathBuf> {
167    let data = crate::core::data_dir::lean_ctx_data_dir().ok()?;
168    Some(
169        data.join("context-os")
170            .join("sessions")
171            .join(&key.project_hash)
172            .join(&key.workspace_id)
173            .join(&key.channel_id),
174    )
175}
176
177fn load_session_from_disk(project_root: &str, key: &SharedSessionKey) -> Option<SessionState> {
178    let dir = shared_session_dir(key)?;
179    let state_path = dir.join("session.json");
180    let json = std::fs::read_to_string(&state_path).ok()?;
181    let mut session: SessionState = serde_json::from_str(&json).ok()?;
182    // Safety: enforce project_root from the current server.
183    session.project_root = Some(project_root.to_string());
184    Some(session)
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[test]
192    fn normalize_id_filters_weird_chars() {
193        assert_eq!(normalize_id("  ", "x"), "x");
194        assert_eq!(normalize_id("abc-123_DEF", "x"), "abc-123_DEF");
195        assert_eq!(normalize_id("a b$c", "x"), "abc");
196    }
197
198    #[test]
199    fn key_is_stable() {
200        let k1 = SharedSessionKey::new("/tmp/proj", "ws", "ch");
201        let k2 = SharedSessionKey::new("/tmp/proj", "ws", "ch");
202        assert_eq!(k1, k2);
203    }
204
205    #[tokio::test]
206    async fn concurrent_session_access_no_data_race() {
207        let store = Arc::new(SharedSessionStore::new());
208        let n_tasks: usize = 8;
209
210        let mut handles = vec![];
211        for task_idx in 0..n_tasks {
212            let store = Arc::clone(&store);
213            handles.push(tokio::spawn(async move {
214                let project_root = "/tmp/test-concurrent";
215                for i in 0..10 {
216                    let session_arc = store.get_or_load(project_root, "ws-shared", "ch-shared");
217                    let mut s = session_arc.write().await;
218                    s.files_touched.push(crate::core::session::FileTouched {
219                        path: format!("file-{task_idx}-{i}.rs"),
220                        file_ref: None,
221                        read_count: 1,
222                        modified: false,
223                        last_mode: "full".to_string(),
224                        tokens: 0,
225                        stale: false,
226                        context_item_id: None,
227                        summary: None,
228                    });
229                }
230            }));
231        }
232
233        for h in handles {
234            h.await.unwrap();
235        }
236
237        let final_arc = store.get_or_load("/tmp/test-concurrent", "ws-shared", "ch-shared");
238        let final_session = final_arc.read().await;
239        assert_eq!(
240            final_session.files_touched.len(),
241            n_tasks * 10,
242            "all concurrent mutations must be persisted"
243        );
244    }
245
246    #[tokio::test]
247    async fn different_workspace_channels_are_isolated() {
248        let store = SharedSessionStore::new();
249
250        {
251            let arc_a = store.get_or_load("/tmp/proj-iso", "ws-a", "ch-1");
252            arc_a
253                .write()
254                .await
255                .files_touched
256                .push(crate::core::session::FileTouched {
257                    path: "fileA.rs".to_string(),
258                    file_ref: None,
259                    read_count: 1,
260                    modified: false,
261                    last_mode: "full".to_string(),
262                    tokens: 0,
263                    stale: false,
264                    context_item_id: None,
265                    summary: None,
266                });
267        }
268        {
269            let arc_b = store.get_or_load("/tmp/proj-iso", "ws-b", "ch-1");
270            arc_b
271                .write()
272                .await
273                .files_touched
274                .push(crate::core::session::FileTouched {
275                    path: "fileB.rs".to_string(),
276                    file_ref: None,
277                    read_count: 1,
278                    modified: false,
279                    last_mode: "full".to_string(),
280                    tokens: 0,
281                    stale: false,
282                    summary: None,
283                    context_item_id: None,
284                });
285        }
286
287        let session_a = store.get_or_load("/tmp/proj-iso", "ws-a", "ch-1");
288        let session_b = store.get_or_load("/tmp/proj-iso", "ws-b", "ch-1");
289
290        assert_eq!(session_a.read().await.files_touched.len(), 1);
291        assert_eq!(session_a.read().await.files_touched[0].path, "fileA.rs");
292        assert_eq!(session_b.read().await.files_touched.len(), 1);
293        assert_eq!(session_b.read().await.files_touched[0].path, "fileB.rs");
294    }
295}