use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::fs;
use tracing::{debug, warn};
use super::record::{ManagedSessionId, SessionRecord};
#[derive(Debug, Error)]
pub enum StoreError {
#[error("session store I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("session store serialization error: {0}")]
Serialize(String),
#[error("session not found: {0}")]
NotFound(String),
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct StoredData {
sessions: HashMap<String, SessionRecord>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
struct FileSig {
mtime: Option<SystemTime>,
len: u64,
}
#[derive(Debug)]
pub struct SessionStore {
data: StoredData,
path: PathBuf,
last_sig: Option<FileSig>,
}
impl SessionStore {
pub async fn load(data_dir: &Path) -> Result<Self, StoreError> {
let path = data_dir.join("sessions.json");
let (data, last_sig) = Self::read_file(&path).await?;
Ok(Self {
data,
path,
last_sig,
})
}
async fn sig_of(path: &Path) -> Option<FileSig> {
let meta = fs::metadata(path).await.ok()?;
Some(FileSig {
mtime: meta.modified().ok(),
len: meta.len(),
})
}
async fn read_file(path: &Path) -> Result<(StoredData, Option<FileSig>), StoreError> {
match fs::read_to_string(path).await {
Ok(raw) => {
let data = serde_json::from_str::<StoredData>(&raw)
.map_err(|e| StoreError::Serialize(e.to_string()))?;
let sig = Self::sig_of(path).await.unwrap_or_default();
Ok((data, Some(sig)))
}
Err(_) => {
debug!(path = %path.display(), "no session store file found; starting fresh");
Ok((StoredData::default(), None))
}
}
}
pub async fn reload_if_changed(&mut self) -> Result<(), StoreError> {
let current = Self::sig_of(&self.path).await;
let unchanged = matches!((current, self.last_sig), (Some(a), Some(b)) if a == b);
if unchanged {
return Ok(());
}
let (data, sig) = Self::read_file(&self.path).await?;
self.data = data;
self.last_sig = sig;
debug!(path = %self.path.display(), "session store reloaded after external change");
Ok(())
}
pub async fn save(&mut self) -> Result<(), StoreError> {
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent).await?;
}
let json = serde_json::to_string_pretty(&self.data)
.map_err(|e| StoreError::Serialize(e.to_string()))?;
let tmp = self.path.with_extension("json.tmp");
fs::write(&tmp, json).await?;
fs::rename(&tmp, &self.path).await?;
self.last_sig = Self::sig_of(&self.path).await;
debug!(path = %self.path.display(), "session store saved");
Ok(())
}
pub async fn upsert(&mut self, record: SessionRecord) -> Result<(), StoreError> {
self.reload_if_changed().await?;
let key = record.id.to_string();
self.data.sessions.insert(key, record);
self.save().await
}
pub async fn get(&mut self, id: &ManagedSessionId) -> Result<SessionRecord, StoreError> {
self.reload_if_changed().await?;
let key = id.to_string();
let record = self.data.sessions.get(&key).cloned();
record.ok_or(StoreError::NotFound(key))
}
pub async fn all(&mut self) -> Result<Vec<SessionRecord>, StoreError> {
self.reload_if_changed().await?;
Ok(self.cached_all())
}
pub fn cached_all(&self) -> Vec<SessionRecord> {
self.data.sessions.values().cloned().collect()
}
pub fn cached_get(&self, id: &ManagedSessionId) -> Result<SessionRecord, StoreError> {
let key = id.to_string();
self.data
.sessions
.get(&key)
.cloned()
.ok_or(StoreError::NotFound(key))
}
pub async fn remove(&mut self, id: &ManagedSessionId) -> Result<(), StoreError> {
self.reload_if_changed().await?;
let key = id.to_string();
if self.data.sessions.remove(&key).is_none() {
warn!(id = %key, "remove: session not found in store");
}
self.save().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::session_manager::record::{ManagedSessionState, SessionRecord};
use chrono::Utc;
use std::path::PathBuf;
use tempfile::TempDir;
fn make_record(id: ManagedSessionId) -> SessionRecord {
SessionRecord {
id,
tmux_name: format!("tmpm-test-{id}"),
cwd: PathBuf::from("/tmp"),
task: "test task".into(),
state: ManagedSessionState::Active,
created_at: Utc::now(),
last_activity_at: None,
workspace_path: None,
repo_url: None,
branch: None,
pending_decision: None,
proposed_default: None,
correlation: Default::default(),
runtime: Default::default(),
}
}
fn make_record_with_state(id: ManagedSessionId, state: ManagedSessionState) -> SessionRecord {
SessionRecord {
state,
..make_record(id)
}
}
#[tokio::test]
async fn store_load_save_round_trip() {
let dir = TempDir::new().expect("tempdir");
let data_dir = dir.path();
let mut store = SessionStore::load(data_dir).await.expect("load empty");
assert!(store.all().await.expect("all").is_empty());
let id = ManagedSessionId::new();
store.upsert(make_record(id)).await.expect("upsert");
let mut store2 = SessionStore::load(data_dir).await.expect("reload");
let record = store2.get(&id).await.expect("get after reload");
assert_eq!(record.id, id);
assert_eq!(record.state, ManagedSessionState::Active);
}
#[tokio::test]
async fn store_upsert_and_get() {
let dir = TempDir::new().expect("tempdir");
let mut store = SessionStore::load(dir.path()).await.expect("load");
let id = ManagedSessionId::new();
store.upsert(make_record(id)).await.expect("upsert");
let record = store.get(&id).await.expect("get");
assert_eq!(record.id, id);
assert_eq!(store.all().await.expect("all").len(), 1);
}
#[tokio::test]
async fn store_remove() {
let dir = TempDir::new().expect("tempdir");
let mut store = SessionStore::load(dir.path()).await.expect("load");
let id = ManagedSessionId::new();
store.upsert(make_record(id)).await.expect("upsert");
assert_eq!(store.all().await.expect("all").len(), 1);
store.remove(&id).await.expect("remove");
assert!(store.all().await.expect("all").is_empty());
assert!(matches!(store.get(&id).await, Err(StoreError::NotFound(_))));
}
#[tokio::test]
async fn store_reload_picks_up_external_write() {
let dir = TempDir::new().expect("tempdir");
let id = ManagedSessionId::new();
let mut store_a = SessionStore::load(dir.path()).await.expect("load A");
store_a
.upsert(make_record_with_state(id, ManagedSessionState::Active))
.await
.expect("upsert A");
let before = store_a.get(&id).await.expect("get A before");
assert_eq!(before.state, ManagedSessionState::Active);
let mut store_b = SessionStore::load(dir.path()).await.expect("load B");
store_b
.upsert(make_record_with_state(id, ManagedSessionState::Stopped))
.await
.expect("upsert B");
let after = store_a.get(&id).await.expect("get A after");
assert_eq!(
after.state,
ManagedSessionState::Stopped,
"store A must reload the external Stopped write"
);
let all = store_a.all().await.expect("all A");
assert_eq!(all.len(), 1);
assert_eq!(all[0].state, ManagedSessionState::Stopped);
}
#[tokio::test]
async fn store_reload_noop_when_unchanged() {
let dir = TempDir::new().expect("tempdir");
let id = ManagedSessionId::new();
let mut store = SessionStore::load(dir.path()).await.expect("load");
store.upsert(make_record(id)).await.expect("upsert");
store.reload_if_changed().await.expect("reload no-op");
let record = store.get(&id).await.expect("get");
assert_eq!(record.id, id);
}
#[tokio::test]
async fn store_upsert_preserves_concurrent_write() {
let dir = TempDir::new().expect("tempdir");
let id_x = ManagedSessionId::new();
let id_y = ManagedSessionId::new();
let mut store_a = SessionStore::load(dir.path()).await.expect("load A");
store_a.upsert(make_record(id_x)).await.expect("seed X");
let mut store_b = SessionStore::load(dir.path()).await.expect("load B");
store_b.upsert(make_record(id_y)).await.expect("write Y");
store_a
.upsert(make_record_with_state(id_x, ManagedSessionState::Stopped))
.await
.expect("update X");
let all = store_a.all().await.expect("all A");
assert_eq!(all.len(), 2, "both X and Y must survive: {all:?}");
assert!(store_a.get(&id_x).await.is_ok(), "X present");
assert!(store_a.get(&id_y).await.is_ok(), "Y must survive A's write");
}
#[tokio::test]
async fn store_cached_accessors_ignore_disk() {
let dir = TempDir::new().expect("tempdir");
let mut store = SessionStore::load(dir.path()).await.expect("load");
let id = ManagedSessionId::new();
store.upsert(make_record(id)).await.expect("upsert");
let path = dir.path().join("sessions.json");
std::fs::write(&path, b"{ not json ]").expect("corrupt file");
assert_eq!(store.cached_all().len(), 1, "cached_all serves last-known");
assert_eq!(
store.cached_get(&id).expect("cached_get hit").id,
id,
"cached_get serves last-known record"
);
let missing = ManagedSessionId::new();
assert!(
matches!(store.cached_get(&missing), Err(StoreError::NotFound(_))),
"cached_get still reports genuinely-absent ids as NotFound"
);
assert!(store.all().await.is_err(), "all() propagates reload error");
}
#[tokio::test]
async fn store_read_file_sig_matches_post_read_bytes() {
let dir = TempDir::new().expect("tempdir");
let id = ManagedSessionId::new();
let mut writer = SessionStore::load(dir.path()).await.expect("load writer");
writer.upsert(make_record(id)).await.expect("seed");
let mut reader = SessionStore::load(dir.path()).await.expect("load reader");
let on_disk_len = std::fs::metadata(dir.path().join("sessions.json"))
.expect("stat")
.len();
let sig = reader.last_sig.expect("reader captured a signature");
assert_eq!(
sig.len, on_disk_len,
"recorded signature length must match the bytes that were parsed"
);
reader.reload_if_changed().await.expect("reload no-op");
assert!(
reader.get(&id).await.is_ok(),
"record still present after no-op reload"
);
}
}