phi_core/session/
storage.rs1use super::model::*;
2use std::path::{Path, PathBuf};
3use std::time::Duration;
4
5const LOCK_RETRY_BUDGET: Duration = Duration::from_secs(5);
9const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(50);
11
12pub fn save_session(session: &Session, dir: &Path) -> Result<PathBuf, SessionError> {
21 std::fs::create_dir_all(dir)?;
22 let path = dir.join(format!("{}.json", session.session_id));
23 let file = std::fs::File::create(&path)?;
24 serde_json::to_writer_pretty(file, session)?;
25 Ok(path)
26}
27
28pub fn load_session(session_id: &str, dir: &Path) -> Result<Session, SessionError> {
30 let path = dir.join(format!("{}.json", session_id));
31 if !path.exists() {
32 return Err(SessionError::NotFound {
33 session_id: session_id.to_string(),
34 });
35 }
36 let file = std::fs::File::open(path)?;
37 let session: Session = serde_json::from_reader(file)?;
38 Ok(session)
39}
40
41pub fn list_session_ids(dir: &Path) -> Result<Vec<String>, SessionError> {
43 if !dir.exists() {
44 return Ok(Vec::new());
45 }
46 let mut entries: Vec<(std::time::SystemTime, String)> = std::fs::read_dir(dir)?
47 .filter_map(|e| e.ok())
48 .filter(|e| {
49 e.path()
50 .extension()
51 .and_then(|x| x.to_str())
52 .map(|x| x == "json")
53 .unwrap_or(false)
54 })
55 .filter_map(|e| {
56 let stem = e
57 .path()
58 .file_stem()
59 .and_then(|s| s.to_str())
60 .map(|s| s.to_string())?;
61 let mtime = e.metadata().ok()?.modified().ok()?;
62 Some((mtime, stem))
63 })
64 .collect();
65 entries.sort_by_key(|e| std::cmp::Reverse(e.0)); Ok(entries.into_iter().map(|(_, id)| id).collect())
67}
68
69pub fn load_sessions_for_agent(agent_id: &str, dir: &Path) -> Result<Vec<Session>, SessionError> {
71 let ids = list_session_ids(dir)?;
72 let mut sessions = Vec::new();
73 for id in ids {
74 match load_session(&id, dir) {
75 Ok(s) if s.agent_id == agent_id => sessions.push(s),
76 Ok(_) => {}
77 Err(SessionError::NotFound { .. }) => {}
78 Err(e) => return Err(e),
79 }
80 }
81 Ok(sessions)
82}
83
84pub fn delete_session(session_id: &str, dir: &Path) -> Result<(), SessionError> {
86 let path = dir.join(format!("{}.json", session_id));
87 if !path.exists() {
88 return Err(SessionError::NotFound {
89 session_id: session_id.to_string(),
90 });
91 }
92 std::fs::remove_file(path)?;
93 Ok(())
94}
95
96#[async_trait::async_trait]
110pub trait SessionStore: Send + Sync {
111 async fn save(&self, session: &Session) -> Result<(), SessionError>;
113
114 async fn load(&self, session_id: &str) -> Result<Session, SessionError>;
116
117 async fn list_ids(&self) -> Result<Vec<String>, SessionError>;
119
120 async fn delete(&self, session_id: &str) -> Result<(), SessionError>;
122
123 async fn list_for_agent(&self, agent_id: &str) -> Result<Vec<Session>, SessionError> {
126 let mut sessions = Vec::new();
127 for id in self.list_ids().await? {
128 match self.load(&id).await {
129 Ok(s) if s.agent_id == agent_id => sessions.push(s),
130 Ok(_) => {}
131 Err(SessionError::NotFound { .. }) => {}
132 Err(e) => return Err(e),
133 }
134 }
135 Ok(sessions)
136 }
137}
138
139pub struct FileSystemSessionStore {
147 dir: PathBuf,
148}
149
150impl FileSystemSessionStore {
151 pub fn new(dir: impl Into<PathBuf>) -> Self {
153 Self { dir: dir.into() }
154 }
155
156 pub fn dir(&self) -> &Path {
158 &self.dir
159 }
160}
161
162fn map_join_err(e: tokio::task::JoinError) -> SessionError {
164 SessionError::Task(e.to_string())
165}
166
167#[async_trait::async_trait]
168impl SessionStore for FileSystemSessionStore {
169 async fn save(&self, session: &Session) -> Result<(), SessionError> {
170 let dir = self.dir.clone();
171 let session = session.clone();
172 tokio::task::spawn_blocking(move || save_with_lock(&dir, &session))
173 .await
174 .map_err(map_join_err)?
175 }
176
177 async fn load(&self, session_id: &str) -> Result<Session, SessionError> {
178 let dir = self.dir.clone();
179 let id = session_id.to_string();
180 tokio::task::spawn_blocking(move || load_session(&id, &dir))
181 .await
182 .map_err(map_join_err)?
183 }
184
185 async fn list_ids(&self) -> Result<Vec<String>, SessionError> {
186 let dir = self.dir.clone();
187 tokio::task::spawn_blocking(move || list_session_ids(&dir))
188 .await
189 .map_err(map_join_err)?
190 }
191
192 async fn delete(&self, session_id: &str) -> Result<(), SessionError> {
193 let dir = self.dir.clone();
194 let id = session_id.to_string();
195 tokio::task::spawn_blocking(move || delete_session(&id, &dir))
196 .await
197 .map_err(map_join_err)?
198 }
199}
200
201fn save_with_lock(dir: &Path, session: &Session) -> Result<(), SessionError> {
210 use fs2::FileExt;
211 use std::time::Instant;
212
213 std::fs::create_dir_all(dir)?;
214 let session_id = &session.session_id;
215 let final_path = dir.join(format!("{}.json", session_id));
216 let lock_path = dir.join(format!("{}.json.lock", session_id));
217
218 let lock_file = std::fs::OpenOptions::new()
221 .read(true)
222 .write(true)
223 .create(true)
224 .truncate(false)
225 .open(&lock_path)?;
226
227 let start = Instant::now();
230 loop {
231 match lock_file.try_lock_exclusive() {
232 Ok(()) => break,
233 Err(_) if start.elapsed() < LOCK_RETRY_BUDGET => {
234 std::thread::sleep(LOCK_RETRY_INTERVAL);
235 }
236 Err(_) => {
237 return Err(SessionError::Locked {
238 session_id: session_id.clone(),
239 });
240 }
241 }
242 }
243
244 let nonce = std::time::SystemTime::now()
247 .duration_since(std::time::UNIX_EPOCH)
248 .map(|d| d.as_nanos())
249 .unwrap_or(0);
250 let tmp_path = dir.join(format!(
251 "{}.json.tmp.{}.{}",
252 session_id,
253 std::process::id(),
254 nonce
255 ));
256
257 {
258 let tmp_file = std::fs::File::create(&tmp_path)?;
259 serde_json::to_writer_pretty(&tmp_file, session)?;
260 tmp_file.sync_all()?;
263 }
264
265 if let Err(e) = std::fs::rename(&tmp_path, &final_path) {
266 let _ = std::fs::remove_file(&tmp_path);
268 return Err(SessionError::Io(e));
269 }
270
271 drop(lock_file);
276 Ok(())
277}