use super::model::*;
use std::path::{Path, PathBuf};
use std::time::Duration;
const LOCK_RETRY_BUDGET: Duration = Duration::from_secs(5);
const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(50);
pub fn save_session(session: &Session, dir: &Path) -> Result<PathBuf, SessionError> {
std::fs::create_dir_all(dir)?;
let path = dir.join(format!("{}.json", session.session_id));
let file = std::fs::File::create(&path)?;
serde_json::to_writer_pretty(file, session)?;
Ok(path)
}
pub fn load_session(session_id: &str, dir: &Path) -> Result<Session, SessionError> {
let path = dir.join(format!("{}.json", session_id));
if !path.exists() {
return Err(SessionError::NotFound {
session_id: session_id.to_string(),
});
}
let file = std::fs::File::open(path)?;
let session: Session = serde_json::from_reader(file)?;
Ok(session)
}
pub fn list_session_ids(dir: &Path) -> Result<Vec<String>, SessionError> {
if !dir.exists() {
return Ok(Vec::new());
}
let mut entries: Vec<(std::time::SystemTime, String)> = std::fs::read_dir(dir)?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.and_then(|x| x.to_str())
.map(|x| x == "json")
.unwrap_or(false)
})
.filter_map(|e| {
let stem = e
.path()
.file_stem()
.and_then(|s| s.to_str())
.map(|s| s.to_string())?;
let mtime = e.metadata().ok()?.modified().ok()?;
Some((mtime, stem))
})
.collect();
entries.sort_by_key(|e| std::cmp::Reverse(e.0)); Ok(entries.into_iter().map(|(_, id)| id).collect())
}
pub fn load_sessions_for_agent(agent_id: &str, dir: &Path) -> Result<Vec<Session>, SessionError> {
let ids = list_session_ids(dir)?;
let mut sessions = Vec::new();
for id in ids {
match load_session(&id, dir) {
Ok(s) if s.agent_id == agent_id => sessions.push(s),
Ok(_) => {}
Err(SessionError::NotFound { .. }) => {}
Err(e) => return Err(e),
}
}
Ok(sessions)
}
pub fn delete_session(session_id: &str, dir: &Path) -> Result<(), SessionError> {
let path = dir.join(format!("{}.json", session_id));
if !path.exists() {
return Err(SessionError::NotFound {
session_id: session_id.to_string(),
});
}
std::fs::remove_file(path)?;
Ok(())
}
#[async_trait::async_trait]
pub trait SessionStore: Send + Sync {
async fn save(&self, session: &Session) -> Result<(), SessionError>;
async fn load(&self, session_id: &str) -> Result<Session, SessionError>;
async fn list_ids(&self) -> Result<Vec<String>, SessionError>;
async fn delete(&self, session_id: &str) -> Result<(), SessionError>;
async fn list_for_agent(&self, agent_id: &str) -> Result<Vec<Session>, SessionError> {
let mut sessions = Vec::new();
for id in self.list_ids().await? {
match self.load(&id).await {
Ok(s) if s.agent_id == agent_id => sessions.push(s),
Ok(_) => {}
Err(SessionError::NotFound { .. }) => {}
Err(e) => return Err(e),
}
}
Ok(sessions)
}
}
pub struct FileSystemSessionStore {
dir: PathBuf,
}
impl FileSystemSessionStore {
pub fn new(dir: impl Into<PathBuf>) -> Self {
Self { dir: dir.into() }
}
pub fn dir(&self) -> &Path {
&self.dir
}
}
fn map_join_err(e: tokio::task::JoinError) -> SessionError {
SessionError::Task(e.to_string())
}
#[async_trait::async_trait]
impl SessionStore for FileSystemSessionStore {
async fn save(&self, session: &Session) -> Result<(), SessionError> {
let dir = self.dir.clone();
let session = session.clone();
tokio::task::spawn_blocking(move || save_with_lock(&dir, &session))
.await
.map_err(map_join_err)?
}
async fn load(&self, session_id: &str) -> Result<Session, SessionError> {
let dir = self.dir.clone();
let id = session_id.to_string();
tokio::task::spawn_blocking(move || load_session(&id, &dir))
.await
.map_err(map_join_err)?
}
async fn list_ids(&self) -> Result<Vec<String>, SessionError> {
let dir = self.dir.clone();
tokio::task::spawn_blocking(move || list_session_ids(&dir))
.await
.map_err(map_join_err)?
}
async fn delete(&self, session_id: &str) -> Result<(), SessionError> {
let dir = self.dir.clone();
let id = session_id.to_string();
tokio::task::spawn_blocking(move || delete_session(&id, &dir))
.await
.map_err(map_join_err)?
}
}
fn save_with_lock(dir: &Path, session: &Session) -> Result<(), SessionError> {
use fs2::FileExt;
use std::time::Instant;
std::fs::create_dir_all(dir)?;
let session_id = &session.session_id;
let final_path = dir.join(format!("{}.json", session_id));
let lock_path = dir.join(format!("{}.json.lock", session_id));
let lock_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)?;
let start = Instant::now();
loop {
match lock_file.try_lock_exclusive() {
Ok(()) => break,
Err(_) if start.elapsed() < LOCK_RETRY_BUDGET => {
std::thread::sleep(LOCK_RETRY_INTERVAL);
}
Err(_) => {
return Err(SessionError::Locked {
session_id: session_id.clone(),
});
}
}
}
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let tmp_path = dir.join(format!(
"{}.json.tmp.{}.{}",
session_id,
std::process::id(),
nonce
));
{
let tmp_file = std::fs::File::create(&tmp_path)?;
serde_json::to_writer_pretty(&tmp_file, session)?;
tmp_file.sync_all()?;
}
if let Err(e) = std::fs::rename(&tmp_path, &final_path) {
let _ = std::fs::remove_file(&tmp_path);
return Err(SessionError::Io(e));
}
drop(lock_file);
Ok(())
}