lean_ctx/core/context_os/
shared_sessions.rs1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::RwLock;
6
7use crate::core::project_hash;
8use crate::core::session::SessionState;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub struct SharedSessionKey {
12 pub project_hash: String,
13 pub workspace_id: String,
14 pub channel_id: String,
15}
16
17impl SharedSessionKey {
18 pub fn new(project_root: &str, workspace_id: &str, channel_id: &str) -> Self {
19 Self {
20 project_hash: project_hash::hash_project_root(project_root),
21 workspace_id: normalize_id(workspace_id, "default"),
22 channel_id: normalize_id(channel_id, "default"),
23 }
24 }
25}
26
27pub struct SharedSessionStore {
28 sessions: Mutex<HashMap<SharedSessionKey, Arc<RwLock<SessionState>>>>,
29}
30
31impl Default for SharedSessionStore {
32 fn default() -> Self {
33 Self {
34 sessions: Mutex::new(HashMap::new()),
35 }
36 }
37}
38
39impl SharedSessionStore {
40 pub fn new() -> Self {
41 Self::default()
42 }
43
44 pub fn get_or_load(
45 &self,
46 project_root: &str,
47 workspace_id: &str,
48 channel_id: &str,
49 ) -> Arc<RwLock<SessionState>> {
50 let key = SharedSessionKey::new(project_root, workspace_id, channel_id);
51 let disk_key = key.clone();
52 let root = project_root.to_string();
53 let mut map = self
54 .sessions
55 .lock()
56 .unwrap_or_else(std::sync::PoisonError::into_inner);
57 map.entry(key)
58 .or_insert_with(|| {
59 let mut loaded = load_session_from_disk(&root, &disk_key)
60 .or_else(|| SessionState::load_latest_for_project_root(&root))
61 .unwrap_or_default();
62 loaded.project_root = Some(root);
63 Arc::new(RwLock::new(loaded))
64 })
65 .clone()
66 }
67
68 pub fn persist_best_effort(
69 &self,
70 project_root: &str,
71 workspace_id: &str,
72 channel_id: &str,
73 session: &SessionState,
74 ) {
75 let key = SharedSessionKey::new(project_root, workspace_id, channel_id);
76 let Some(dir) = shared_session_dir(&key) else {
77 return;
78 };
79 let _ = std::fs::create_dir_all(&dir);
80 let state_path = dir.join("session.json");
81 let tmp = dir.join("session.json.tmp");
82
83 if let Ok(json) = serde_json::to_string_pretty(session) {
84 let _ = std::fs::write(&tmp, json);
85 let _ = std::fs::rename(&tmp, &state_path);
86 }
87
88 let snap = if session.task.is_some() {
90 Some(session.build_compaction_snapshot())
91 } else {
92 None
93 };
94 if let Some(snapshot) = snap {
95 let _ = std::fs::write(dir.join("snapshot.txt"), snapshot);
96 }
97 }
98}
99
100fn normalize_id(s: &str, fallback: &str) -> String {
101 let t = s.trim();
102 if t.is_empty() {
103 fallback.to_string()
104 } else {
105 t.chars()
107 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
108 .collect::<String>()
109 }
110}
111
112fn shared_session_dir(key: &SharedSessionKey) -> Option<PathBuf> {
113 let data = crate::core::data_dir::lean_ctx_data_dir().ok()?;
114 Some(
115 data.join("context-os")
116 .join("sessions")
117 .join(&key.project_hash)
118 .join(&key.workspace_id)
119 .join(&key.channel_id),
120 )
121}
122
123fn load_session_from_disk(project_root: &str, key: &SharedSessionKey) -> Option<SessionState> {
124 let dir = shared_session_dir(key)?;
125 let state_path = dir.join("session.json");
126 let json = std::fs::read_to_string(&state_path).ok()?;
127 let mut session: SessionState = serde_json::from_str(&json).ok()?;
128 session.project_root = Some(project_root.to_string());
130 Some(session)
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136
137 #[test]
138 fn normalize_id_filters_weird_chars() {
139 assert_eq!(normalize_id(" ", "x"), "x");
140 assert_eq!(normalize_id("abc-123_DEF", "x"), "abc-123_DEF");
141 assert_eq!(normalize_id("a b$c", "x"), "abc");
142 }
143
144 #[test]
145 fn key_is_stable() {
146 let k1 = SharedSessionKey::new("/tmp/proj", "ws", "ch");
147 let k2 = SharedSessionKey::new("/tmp/proj", "ws", "ch");
148 assert_eq!(k1, k2);
149 }
150
151 #[tokio::test]
152 async fn concurrent_session_access_no_data_race() {
153 let store = Arc::new(SharedSessionStore::new());
154 let n_tasks: usize = 8;
155
156 let mut handles = vec![];
157 for task_idx in 0..n_tasks {
158 let store = Arc::clone(&store);
159 handles.push(tokio::spawn(async move {
160 let project_root = "/tmp/test-concurrent";
161 for i in 0..10 {
162 let session_arc = store.get_or_load(project_root, "ws-shared", "ch-shared");
163 let mut s = session_arc.write().await;
164 s.files_touched.push(crate::core::session::FileTouched {
165 path: format!("file-{task_idx}-{i}.rs"),
166 file_ref: None,
167 read_count: 1,
168 modified: false,
169 last_mode: "full".to_string(),
170 tokens: 0,
171 stale: false,
172 context_item_id: None,
173 });
174 }
175 }));
176 }
177
178 for h in handles {
179 h.await.unwrap();
180 }
181
182 let final_arc = store.get_or_load("/tmp/test-concurrent", "ws-shared", "ch-shared");
183 let final_session = final_arc.read().await;
184 assert_eq!(
185 final_session.files_touched.len(),
186 n_tasks * 10,
187 "all concurrent mutations must be persisted"
188 );
189 }
190
191 #[tokio::test]
192 async fn different_workspace_channels_are_isolated() {
193 let store = SharedSessionStore::new();
194
195 {
196 let arc_a = store.get_or_load("/tmp/proj-iso", "ws-a", "ch-1");
197 arc_a
198 .write()
199 .await
200 .files_touched
201 .push(crate::core::session::FileTouched {
202 path: "fileA.rs".to_string(),
203 file_ref: None,
204 read_count: 1,
205 modified: false,
206 last_mode: "full".to_string(),
207 tokens: 0,
208 stale: false,
209 context_item_id: None,
210 });
211 }
212 {
213 let arc_b = store.get_or_load("/tmp/proj-iso", "ws-b", "ch-1");
214 arc_b
215 .write()
216 .await
217 .files_touched
218 .push(crate::core::session::FileTouched {
219 path: "fileB.rs".to_string(),
220 file_ref: None,
221 read_count: 1,
222 modified: false,
223 last_mode: "full".to_string(),
224 tokens: 0,
225 stale: false,
226 context_item_id: None,
227 });
228 }
229
230 let session_a = store.get_or_load("/tmp/proj-iso", "ws-a", "ch-1");
231 let session_b = store.get_or_load("/tmp/proj-iso", "ws-b", "ch-1");
232
233 assert_eq!(session_a.read().await.files_touched.len(), 1);
234 assert_eq!(session_a.read().await.files_touched[0].path, "fileA.rs");
235 assert_eq!(session_b.read().await.files_touched.len(), 1);
236 assert_eq!(session_b.read().await.files_touched[0].path, "fileB.rs");
237 }
238}