Skip to main content

phi_core/session/
storage.rs

1use super::model::*;
2use std::path::{Path, PathBuf};
3use std::time::Duration;
4
5/// Total budget for waiting on an exclusive advisory lock before giving up with
6/// [`SessionError::Locked`]. 5 s is enough to ride out brief contention from a
7/// peer writer while still failing fast on stuck locks.
8const LOCK_RETRY_BUDGET: Duration = Duration::from_secs(5);
9/// Interval between exclusive-lock retry attempts during the budget window.
10const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(50);
11
12/// Save a session to `{dir}/{session_id}.json`, creating `dir` if necessary.
13///
14/// Returns the path the file was written to.
15///
16/// This is the legacy sync entry point retained for backward compatibility. New code
17/// should prefer [`SessionStore`] + [`FileSystemSessionStore`] which adds advisory
18/// file locking and atomic rename semantics so concurrent writers cannot corrupt the
19/// session file.
20pub fn save_session(session: &Session, dir: &Path) -> Result<PathBuf, SessionError> {
21    std::fs::create_dir_all(dir)?;
22    let path = dir.join(format!("{}.json", session.session_id));
23    let file = std::fs::File::create(&path)?;
24    serde_json::to_writer_pretty(file, session)?;
25    Ok(path)
26}
27
28/// Load a session from `{dir}/{session_id}.json`.
29pub fn load_session(session_id: &str, dir: &Path) -> Result<Session, SessionError> {
30    let path = dir.join(format!("{}.json", session_id));
31    if !path.exists() {
32        return Err(SessionError::NotFound {
33            session_id: session_id.to_string(),
34        });
35    }
36    let file = std::fs::File::open(path)?;
37    let session: Session = serde_json::from_reader(file)?;
38    Ok(session)
39}
40
41/// List all session IDs in `dir`, sorted by file modification time (newest first).
42pub fn list_session_ids(dir: &Path) -> Result<Vec<String>, SessionError> {
43    if !dir.exists() {
44        return Ok(Vec::new());
45    }
46    let mut entries: Vec<(std::time::SystemTime, String)> = std::fs::read_dir(dir)?
47        .filter_map(|e| e.ok())
48        .filter(|e| {
49            e.path()
50                .extension()
51                .and_then(|x| x.to_str())
52                .map(|x| x == "json")
53                .unwrap_or(false)
54        })
55        .filter_map(|e| {
56            let stem = e
57                .path()
58                .file_stem()
59                .and_then(|s| s.to_str())
60                .map(|s| s.to_string())?;
61            let mtime = e.metadata().ok()?.modified().ok()?;
62            Some((mtime, stem))
63        })
64        .collect();
65    entries.sort_by_key(|e| std::cmp::Reverse(e.0)); // newest first
66    Ok(entries.into_iter().map(|(_, id)| id).collect())
67}
68
69/// Load all sessions in `dir` that belong to `agent_id`.
70pub fn load_sessions_for_agent(agent_id: &str, dir: &Path) -> Result<Vec<Session>, SessionError> {
71    let ids = list_session_ids(dir)?;
72    let mut sessions = Vec::new();
73    for id in ids {
74        match load_session(&id, dir) {
75            Ok(s) if s.agent_id == agent_id => sessions.push(s),
76            Ok(_) => {}
77            Err(SessionError::NotFound { .. }) => {}
78            Err(e) => return Err(e),
79        }
80    }
81    Ok(sessions)
82}
83
84/// Delete `{dir}/{session_id}.json`.
85pub fn delete_session(session_id: &str, dir: &Path) -> Result<(), SessionError> {
86    let path = dir.join(format!("{}.json", session_id));
87    if !path.exists() {
88        return Err(SessionError::NotFound {
89            session_id: session_id.to_string(),
90        });
91    }
92    std::fs::remove_file(path)?;
93    Ok(())
94}
95
96// ---------------------------------------------------------------------------
97// SessionStore — async, concurrency-safe persistence
98// ---------------------------------------------------------------------------
99
100/// Async pluggable persistence backend for sessions.
101///
102/// Implementors are free to back this with the filesystem, an object store, a database,
103/// etc. The default in-tree implementation is [`FileSystemSessionStore`] which guards
104/// every write with an exclusive advisory lock + atomic rename so concurrent processes
105/// cannot corrupt session files.
106///
107/// `list_for_agent` carries a default implementation in terms of the other methods,
108/// so most implementors only need to provide the four required methods.
109#[async_trait::async_trait]
110pub trait SessionStore: Send + Sync {
111    /// Persist a session, replacing any existing record with the same `session_id`.
112    async fn save(&self, session: &Session) -> Result<(), SessionError>;
113
114    /// Load a session by id. Returns `SessionError::NotFound` if absent.
115    async fn load(&self, session_id: &str) -> Result<Session, SessionError>;
116
117    /// List all session ids known to this store.
118    async fn list_ids(&self) -> Result<Vec<String>, SessionError>;
119
120    /// Delete a session by id. Returns `SessionError::NotFound` if absent.
121    async fn delete(&self, session_id: &str) -> Result<(), SessionError>;
122
123    /// Load every session belonging to `agent_id`. Default impl iterates `list_ids` +
124    /// `load`; override for stores that can serve this from an index.
125    async fn list_for_agent(&self, agent_id: &str) -> Result<Vec<Session>, SessionError> {
126        let mut sessions = Vec::new();
127        for id in self.list_ids().await? {
128            match self.load(&id).await {
129                Ok(s) if s.agent_id == agent_id => sessions.push(s),
130                Ok(_) => {}
131                Err(SessionError::NotFound { .. }) => {}
132                Err(e) => return Err(e),
133            }
134        }
135        Ok(sessions)
136    }
137}
138
139/// Filesystem-backed [`SessionStore`] with advisory locking and atomic writes.
140///
141/// Writes go through a tmp file in the same directory followed by `std::fs::rename`,
142/// which is atomic on POSIX. An exclusive advisory lock on a `.lock` sidecar serialises
143/// writers; readers are unsynchronised because the atomic rename guarantees a complete
144/// file is always visible. On contention, `save()` retries for up to
145/// `LOCK_RETRY_BUDGET` (5 s) before returning `SessionError::Locked`.
146pub struct FileSystemSessionStore {
147    dir: PathBuf,
148}
149
150impl FileSystemSessionStore {
151    /// Construct a store rooted at `dir`. The directory is created lazily on first save.
152    pub fn new(dir: impl Into<PathBuf>) -> Self {
153        Self { dir: dir.into() }
154    }
155
156    /// Return the storage root directory.
157    pub fn dir(&self) -> &Path {
158        &self.dir
159    }
160}
161
162/// Map a `tokio::task::JoinError` into a `SessionError::Task` while preserving the message.
163fn map_join_err(e: tokio::task::JoinError) -> SessionError {
164    SessionError::Task(e.to_string())
165}
166
167#[async_trait::async_trait]
168impl SessionStore for FileSystemSessionStore {
169    async fn save(&self, session: &Session) -> Result<(), SessionError> {
170        let dir = self.dir.clone();
171        let session = session.clone();
172        tokio::task::spawn_blocking(move || save_with_lock(&dir, &session))
173            .await
174            .map_err(map_join_err)?
175    }
176
177    async fn load(&self, session_id: &str) -> Result<Session, SessionError> {
178        let dir = self.dir.clone();
179        let id = session_id.to_string();
180        tokio::task::spawn_blocking(move || load_session(&id, &dir))
181            .await
182            .map_err(map_join_err)?
183    }
184
185    async fn list_ids(&self) -> Result<Vec<String>, SessionError> {
186        let dir = self.dir.clone();
187        tokio::task::spawn_blocking(move || list_session_ids(&dir))
188            .await
189            .map_err(map_join_err)?
190    }
191
192    async fn delete(&self, session_id: &str) -> Result<(), SessionError> {
193        let dir = self.dir.clone();
194        let id = session_id.to_string();
195        tokio::task::spawn_blocking(move || delete_session(&id, &dir))
196            .await
197            .map_err(map_join_err)?
198    }
199}
200
201/// Internal: perform a concurrency-safe save inside a blocking thread.
202///
203/// 1. Create the directory if missing.
204/// 2. Open / create the `.lock` sidecar and acquire an exclusive advisory lock
205///    (retrying within `LOCK_RETRY_BUDGET`).
206/// 3. Write JSON to `{dir}/{id}.json.tmp.{pid}.{nonce}`, fsync.
207/// 4. Atomically rename onto the final `{dir}/{id}.json`.
208/// 5. Release the lock by dropping the file handle.
209fn save_with_lock(dir: &Path, session: &Session) -> Result<(), SessionError> {
210    use fs2::FileExt;
211    use std::time::Instant;
212
213    std::fs::create_dir_all(dir)?;
214    let session_id = &session.session_id;
215    let final_path = dir.join(format!("{}.json", session_id));
216    let lock_path = dir.join(format!("{}.json.lock", session_id));
217
218    // Open (or create) the lock sidecar. Truncation is irrelevant — we only use the
219    // file descriptor as the lock target.
220    let lock_file = std::fs::OpenOptions::new()
221        .read(true)
222        .write(true)
223        .create(true)
224        .truncate(false)
225        .open(&lock_path)?;
226
227    // Acquire exclusive lock with bounded retry. On Windows + POSIX, fs2 uses the
228    // platform-native primitive (LockFileEx / fcntl) so this is cross-platform.
229    let start = Instant::now();
230    loop {
231        match lock_file.try_lock_exclusive() {
232            Ok(()) => break,
233            Err(_) if start.elapsed() < LOCK_RETRY_BUDGET => {
234                std::thread::sleep(LOCK_RETRY_INTERVAL);
235            }
236            Err(_) => {
237                return Err(SessionError::Locked {
238                    session_id: session_id.clone(),
239                });
240            }
241        }
242    }
243
244    // Atomic write: tmp → rename. The tmp name carries pid + nanos to avoid collision
245    // between concurrent processes that might share a directory.
246    let nonce = std::time::SystemTime::now()
247        .duration_since(std::time::UNIX_EPOCH)
248        .map(|d| d.as_nanos())
249        .unwrap_or(0);
250    let tmp_path = dir.join(format!(
251        "{}.json.tmp.{}.{}",
252        session_id,
253        std::process::id(),
254        nonce
255    ));
256
257    {
258        let tmp_file = std::fs::File::create(&tmp_path)?;
259        serde_json::to_writer_pretty(&tmp_file, session)?;
260        // fsync to make the bytes durable before the rename, so a crash here cannot
261        // leave a half-written file behind a fresh rename.
262        tmp_file.sync_all()?;
263    }
264
265    if let Err(e) = std::fs::rename(&tmp_path, &final_path) {
266        // Clean up the tmp file on rename failure; the lock will be released by drop.
267        let _ = std::fs::remove_file(&tmp_path);
268        return Err(SessionError::Io(e));
269    }
270
271    // Releasing the lock happens when `lock_file` goes out of scope (drop closes the fd
272    // which releases the OS-level advisory lock). The lock sidecar file is left behind
273    // intentionally — recreating it on every save would defeat the lock semantics under
274    // contention (a concurrent writer might recreate-and-lock between drop and unlink).
275    drop(lock_file);
276    Ok(())
277}