lean_ctx/core/context_os/
shared_sessions.rs1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use tokio::sync::RwLock;
7
8use crate::core::project_hash;
9use crate::core::session::SessionState;
10
11const MAX_CACHED_SESSIONS: usize = 64;
12
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub struct SharedSessionKey {
15 pub project_hash: String,
16 pub workspace_id: String,
17 pub channel_id: String,
18}
19
20impl SharedSessionKey {
21 pub fn new(project_root: &str, workspace_id: &str, channel_id: &str) -> Self {
22 Self {
23 project_hash: project_hash::hash_project_root(project_root),
24 workspace_id: normalize_id(workspace_id, "default"),
25 channel_id: normalize_id(channel_id, "default"),
26 }
27 }
28}
29
30struct SessionEntry {
31 session: Arc<RwLock<SessionState>>,
32 project_root: String,
33 last_accessed: Instant,
34}
35
36pub struct SharedSessionStore {
37 sessions: Mutex<HashMap<SharedSessionKey, SessionEntry>>,
38}
39
40impl Default for SharedSessionStore {
41 fn default() -> Self {
42 Self {
43 sessions: Mutex::new(HashMap::new()),
44 }
45 }
46}
47
48impl SharedSessionStore {
49 pub fn new() -> Self {
50 Self::default()
51 }
52
53 pub fn get_or_load(
54 &self,
55 project_root: &str,
56 workspace_id: &str,
57 channel_id: &str,
58 ) -> Arc<RwLock<SessionState>> {
59 let key = SharedSessionKey::new(project_root, workspace_id, channel_id);
60 let disk_key = key.clone();
61 let root = project_root.to_string();
62 let mut map = self
63 .sessions
64 .lock()
65 .unwrap_or_else(std::sync::PoisonError::into_inner);
66
67 if let Some(entry) = map.get_mut(&key) {
68 entry.last_accessed = Instant::now();
69 return entry.session.clone();
70 }
71
72 Self::evict_lru_if_needed(&mut map);
73
74 let mut loaded = load_session_from_disk(&root, &disk_key)
75 .or_else(|| SessionState::load_latest_for_project_root(&root))
76 .unwrap_or_default();
77 loaded.project_root = Some(root.clone());
78 let session = Arc::new(RwLock::new(loaded));
79
80 map.insert(
81 key,
82 SessionEntry {
83 session: session.clone(),
84 project_root: root,
85 last_accessed: Instant::now(),
86 },
87 );
88
89 session
90 }
91
92 fn evict_lru_if_needed(map: &mut HashMap<SharedSessionKey, SessionEntry>) {
93 if map.len() < MAX_CACHED_SESSIONS {
94 return;
95 }
96
97 let lru_key = map
98 .iter()
99 .min_by_key(|(_, e)| e.last_accessed)
100 .map(|(k, _)| k.clone());
101
102 if let Some(key) = lru_key {
103 if let Some(entry) = map.remove(&key) {
104 if let Ok(session) = entry.session.try_read() {
105 persist_session_to_disk(&key, &entry.project_root, &session);
106 }
107 }
108 }
109 }
110
111 pub fn active_count(&self) -> usize {
112 self.sessions
113 .lock()
114 .unwrap_or_else(std::sync::PoisonError::into_inner)
115 .len()
116 }
117
118 pub fn persist_best_effort(
119 &self,
120 project_root: &str,
121 workspace_id: &str,
122 channel_id: &str,
123 session: &SessionState,
124 ) {
125 let key = SharedSessionKey::new(project_root, workspace_id, channel_id);
126 persist_session_to_disk(&key, project_root, session);
127 }
128}
129
130fn persist_session_to_disk(key: &SharedSessionKey, _project_root: &str, session: &SessionState) {
131 let Some(dir) = shared_session_dir(key) else {
132 return;
133 };
134 let _ = std::fs::create_dir_all(&dir);
135 let state_path = dir.join("session.json");
136 let tmp = dir.join("session.json.tmp");
137
138 if let Ok(json) = serde_json::to_string_pretty(session) {
139 let _ = std::fs::write(&tmp, json);
140 let _ = std::fs::rename(&tmp, &state_path);
141 }
142
143 if session.task.is_some() {
144 let snapshot = session.build_compaction_snapshot();
145 let _ = std::fs::write(dir.join("snapshot.txt"), snapshot);
146 }
147}
148
149fn normalize_id(s: &str, fallback: &str) -> String {
150 let t = s.trim();
151 if t.is_empty() {
152 fallback.to_string()
153 } else {
154 t.chars()
156 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
157 .collect::<String>()
158 }
159}
160
161fn shared_session_dir(key: &SharedSessionKey) -> Option<PathBuf> {
162 let data = crate::core::data_dir::lean_ctx_data_dir().ok()?;
163 Some(
164 data.join("context-os")
165 .join("sessions")
166 .join(&key.project_hash)
167 .join(&key.workspace_id)
168 .join(&key.channel_id),
169 )
170}
171
172fn load_session_from_disk(project_root: &str, key: &SharedSessionKey) -> Option<SessionState> {
173 let dir = shared_session_dir(key)?;
174 let state_path = dir.join("session.json");
175 let json = std::fs::read_to_string(&state_path).ok()?;
176 let mut session: SessionState = serde_json::from_str(&json).ok()?;
177 session.project_root = Some(project_root.to_string());
179 Some(session)
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn normalize_id_filters_weird_chars() {
188 assert_eq!(normalize_id(" ", "x"), "x");
189 assert_eq!(normalize_id("abc-123_DEF", "x"), "abc-123_DEF");
190 assert_eq!(normalize_id("a b$c", "x"), "abc");
191 }
192
193 #[test]
194 fn key_is_stable() {
195 let k1 = SharedSessionKey::new("/tmp/proj", "ws", "ch");
196 let k2 = SharedSessionKey::new("/tmp/proj", "ws", "ch");
197 assert_eq!(k1, k2);
198 }
199
200 #[tokio::test]
201 async fn concurrent_session_access_no_data_race() {
202 let store = Arc::new(SharedSessionStore::new());
203 let n_tasks: usize = 8;
204
205 let mut handles = vec![];
206 for task_idx in 0..n_tasks {
207 let store = Arc::clone(&store);
208 handles.push(tokio::spawn(async move {
209 let project_root = "/tmp/test-concurrent";
210 for i in 0..10 {
211 let session_arc = store.get_or_load(project_root, "ws-shared", "ch-shared");
212 let mut s = session_arc.write().await;
213 s.files_touched.push(crate::core::session::FileTouched {
214 path: format!("file-{task_idx}-{i}.rs"),
215 file_ref: None,
216 read_count: 1,
217 modified: false,
218 last_mode: "full".to_string(),
219 tokens: 0,
220 stale: false,
221 context_item_id: None,
222 });
223 }
224 }));
225 }
226
227 for h in handles {
228 h.await.unwrap();
229 }
230
231 let final_arc = store.get_or_load("/tmp/test-concurrent", "ws-shared", "ch-shared");
232 let final_session = final_arc.read().await;
233 assert_eq!(
234 final_session.files_touched.len(),
235 n_tasks * 10,
236 "all concurrent mutations must be persisted"
237 );
238 }
239
240 #[tokio::test]
241 async fn different_workspace_channels_are_isolated() {
242 let store = SharedSessionStore::new();
243
244 {
245 let arc_a = store.get_or_load("/tmp/proj-iso", "ws-a", "ch-1");
246 arc_a
247 .write()
248 .await
249 .files_touched
250 .push(crate::core::session::FileTouched {
251 path: "fileA.rs".to_string(),
252 file_ref: None,
253 read_count: 1,
254 modified: false,
255 last_mode: "full".to_string(),
256 tokens: 0,
257 stale: false,
258 context_item_id: None,
259 });
260 }
261 {
262 let arc_b = store.get_or_load("/tmp/proj-iso", "ws-b", "ch-1");
263 arc_b
264 .write()
265 .await
266 .files_touched
267 .push(crate::core::session::FileTouched {
268 path: "fileB.rs".to_string(),
269 file_ref: None,
270 read_count: 1,
271 modified: false,
272 last_mode: "full".to_string(),
273 tokens: 0,
274 stale: false,
275 context_item_id: None,
276 });
277 }
278
279 let session_a = store.get_or_load("/tmp/proj-iso", "ws-a", "ch-1");
280 let session_b = store.get_or_load("/tmp/proj-iso", "ws-b", "ch-1");
281
282 assert_eq!(session_a.read().await.files_touched.len(), 1);
283 assert_eq!(session_a.read().await.files_touched[0].path, "fileA.rs");
284 assert_eq!(session_b.read().await.files_touched.len(), 1);
285 assert_eq!(session_b.read().await.files_touched[0].path, "fileB.rs");
286 }
287}