lean_ctx/core/context_os/
shared_sessions.rs1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use tokio::sync::RwLock;
7
8use crate::core::project_hash;
9use crate::core::session::SessionState;
10
11const MAX_CACHED_SESSIONS: usize = 8;
12
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub struct SharedSessionKey {
15 pub project_hash: String,
16 pub workspace_id: String,
17 pub channel_id: String,
18}
19
20impl SharedSessionKey {
21 pub fn new(project_root: &str, workspace_id: &str, channel_id: &str) -> Self {
22 Self {
23 project_hash: project_hash::hash_project_root(project_root),
24 workspace_id: normalize_id(workspace_id, "default"),
25 channel_id: normalize_id(channel_id, "default"),
26 }
27 }
28}
29
30struct SessionEntry {
31 session: Arc<RwLock<SessionState>>,
32 project_root: String,
33 last_accessed: Instant,
34}
35
36pub struct SharedSessionStore {
37 sessions: Mutex<HashMap<SharedSessionKey, SessionEntry>>,
38}
39
40impl Default for SharedSessionStore {
41 fn default() -> Self {
42 Self {
43 sessions: Mutex::new(HashMap::new()),
44 }
45 }
46}
47
48impl SharedSessionStore {
49 pub fn new() -> Self {
50 Self::default()
51 }
52
53 pub fn get_or_load(
54 &self,
55 project_root: &str,
56 workspace_id: &str,
57 channel_id: &str,
58 ) -> Arc<RwLock<SessionState>> {
59 let key = SharedSessionKey::new(project_root, workspace_id, channel_id);
60 let disk_key = key.clone();
61 let root = project_root.to_string();
62 let mut map = self
63 .sessions
64 .lock()
65 .unwrap_or_else(std::sync::PoisonError::into_inner);
66
67 if let Some(entry) = map.get_mut(&key) {
68 entry.last_accessed = Instant::now();
69 return entry.session.clone();
70 }
71
72 Self::evict_lru_if_needed(&mut map);
73
74 let mut loaded = load_session_from_disk(&root, &disk_key)
75 .or_else(|| SessionState::load_latest_for_project_root(&root))
76 .unwrap_or_default();
77 loaded.project_root = Some(root.clone());
78 let session = Arc::new(RwLock::new(loaded));
79
80 map.insert(
81 key,
82 SessionEntry {
83 session: session.clone(),
84 project_root: root,
85 last_accessed: Instant::now(),
86 },
87 );
88
89 session
90 }
91
92 fn evict_lru_if_needed(map: &mut HashMap<SharedSessionKey, SessionEntry>) {
93 if map.len() < MAX_CACHED_SESSIONS {
94 return;
95 }
96
97 let lru_key = map
98 .iter()
99 .min_by_key(|(_, e)| e.last_accessed)
100 .map(|(k, _)| k.clone());
101
102 if let Some(key) = lru_key {
103 if let Some(entry) = map.remove(&key) {
104 if let Ok(session) = entry.session.try_read() {
105 persist_session_to_disk(&key, &entry.project_root, &session);
106 }
107 }
108 }
109 }
110
111 pub fn active_count(&self) -> usize {
112 self.sessions
113 .lock()
114 .unwrap_or_else(std::sync::PoisonError::into_inner)
115 .len()
116 }
117
118 pub fn max_sessions() -> usize {
120 MAX_CACHED_SESSIONS
121 }
122
123 pub fn persist_best_effort(
124 &self,
125 project_root: &str,
126 workspace_id: &str,
127 channel_id: &str,
128 session: &SessionState,
129 ) {
130 let key = SharedSessionKey::new(project_root, workspace_id, channel_id);
131 persist_session_to_disk(&key, project_root, session);
132 }
133}
134
135fn persist_session_to_disk(key: &SharedSessionKey, _project_root: &str, session: &SessionState) {
136 let Some(dir) = shared_session_dir(key) else {
137 return;
138 };
139 let _ = std::fs::create_dir_all(&dir);
140 let state_path = dir.join("session.json");
141 let tmp = dir.join("session.json.tmp");
142
143 if let Ok(json) = serde_json::to_string_pretty(session) {
144 let _ = std::fs::write(&tmp, json);
145 let _ = std::fs::rename(&tmp, &state_path);
146 }
147
148 if session.task.is_some() {
149 let snapshot = session.build_compaction_snapshot();
150 let _ = std::fs::write(dir.join("snapshot.txt"), snapshot);
151 }
152}
153
154fn normalize_id(s: &str, fallback: &str) -> String {
155 let t = s.trim();
156 if t.is_empty() {
157 fallback.to_string()
158 } else {
159 t.chars()
161 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
162 .collect::<String>()
163 }
164}
165
166fn shared_session_dir(key: &SharedSessionKey) -> Option<PathBuf> {
167 let data = crate::core::data_dir::lean_ctx_data_dir().ok()?;
168 Some(
169 data.join("context-os")
170 .join("sessions")
171 .join(&key.project_hash)
172 .join(&key.workspace_id)
173 .join(&key.channel_id),
174 )
175}
176
177fn load_session_from_disk(project_root: &str, key: &SharedSessionKey) -> Option<SessionState> {
178 let dir = shared_session_dir(key)?;
179 let state_path = dir.join("session.json");
180 let json = std::fs::read_to_string(&state_path).ok()?;
181 let mut session: SessionState = serde_json::from_str(&json).ok()?;
182 session.project_root = Some(project_root.to_string());
184 Some(session)
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn normalize_id_filters_weird_chars() {
193 assert_eq!(normalize_id(" ", "x"), "x");
194 assert_eq!(normalize_id("abc-123_DEF", "x"), "abc-123_DEF");
195 assert_eq!(normalize_id("a b$c", "x"), "abc");
196 }
197
198 #[test]
199 fn key_is_stable() {
200 let k1 = SharedSessionKey::new("/tmp/proj", "ws", "ch");
201 let k2 = SharedSessionKey::new("/tmp/proj", "ws", "ch");
202 assert_eq!(k1, k2);
203 }
204
205 #[tokio::test]
206 async fn concurrent_session_access_no_data_race() {
207 let store = Arc::new(SharedSessionStore::new());
208 let n_tasks: usize = 8;
209
210 let mut handles = vec![];
211 for task_idx in 0..n_tasks {
212 let store = Arc::clone(&store);
213 handles.push(tokio::spawn(async move {
214 let project_root = "/tmp/test-concurrent";
215 for i in 0..10 {
216 let session_arc = store.get_or_load(project_root, "ws-shared", "ch-shared");
217 let mut s = session_arc.write().await;
218 s.files_touched.push(crate::core::session::FileTouched {
219 path: format!("file-{task_idx}-{i}.rs"),
220 file_ref: None,
221 read_count: 1,
222 modified: false,
223 last_mode: "full".to_string(),
224 tokens: 0,
225 stale: false,
226 context_item_id: None,
227 });
228 }
229 }));
230 }
231
232 for h in handles {
233 h.await.unwrap();
234 }
235
236 let final_arc = store.get_or_load("/tmp/test-concurrent", "ws-shared", "ch-shared");
237 let final_session = final_arc.read().await;
238 assert_eq!(
239 final_session.files_touched.len(),
240 n_tasks * 10,
241 "all concurrent mutations must be persisted"
242 );
243 }
244
245 #[tokio::test]
246 async fn different_workspace_channels_are_isolated() {
247 let store = SharedSessionStore::new();
248
249 {
250 let arc_a = store.get_or_load("/tmp/proj-iso", "ws-a", "ch-1");
251 arc_a
252 .write()
253 .await
254 .files_touched
255 .push(crate::core::session::FileTouched {
256 path: "fileA.rs".to_string(),
257 file_ref: None,
258 read_count: 1,
259 modified: false,
260 last_mode: "full".to_string(),
261 tokens: 0,
262 stale: false,
263 context_item_id: None,
264 });
265 }
266 {
267 let arc_b = store.get_or_load("/tmp/proj-iso", "ws-b", "ch-1");
268 arc_b
269 .write()
270 .await
271 .files_touched
272 .push(crate::core::session::FileTouched {
273 path: "fileB.rs".to_string(),
274 file_ref: None,
275 read_count: 1,
276 modified: false,
277 last_mode: "full".to_string(),
278 tokens: 0,
279 stale: false,
280 context_item_id: None,
281 });
282 }
283
284 let session_a = store.get_or_load("/tmp/proj-iso", "ws-a", "ch-1");
285 let session_b = store.get_or_load("/tmp/proj-iso", "ws-b", "ch-1");
286
287 assert_eq!(session_a.read().await.files_touched.len(), 1);
288 assert_eq!(session_a.read().await.files_touched[0].path, "fileA.rs");
289 assert_eq!(session_b.read().await.files_touched.len(), 1);
290 assert_eq!(session_b.read().await.files_touched[0].path, "fileB.rs");
291 }
292}