use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::thought::protocol::ThoughtDeliveryState;
use crate::thought::runtime_config::ThoughtConfig;
use crate::types::{RestState, SessionState, ThoughtSource, ThoughtState};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedSession {
pub session_id: String,
pub tmux_name: String,
pub state: SessionState,
pub tool: Option<String>,
pub token_count: u64,
pub context_limit: u64,
pub thought: Option<String>,
#[serde(default)]
pub thought_state: ThoughtState,
#[serde(default)]
pub thought_source: ThoughtSource,
#[serde(default)]
pub thought_updated_at: Option<DateTime<Utc>>,
#[serde(default)]
pub rest_state: RestState,
#[serde(default)]
pub commit_candidate: bool,
#[serde(default)]
pub objective_changed_at: Option<DateTime<Utc>>,
#[serde(default)]
pub last_skill: Option<String>,
#[serde(default)]
pub objective_fingerprint: Option<String>,
pub cwd: String,
pub last_activity_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThoughtSnapshot {
pub thought: Option<String>,
#[serde(default)]
pub thought_state: ThoughtState,
#[serde(default)]
pub thought_source: ThoughtSource,
#[serde(default)]
pub rest_state: RestState,
#[serde(default)]
pub commit_candidate: bool,
#[serde(default)]
pub objective_changed_at: Option<DateTime<Utc>>,
#[serde(default)]
pub objective_fingerprint: Option<String>,
pub token_count: u64,
pub context_limit: u64,
pub updated_at: DateTime<Utc>,
#[serde(flatten)]
pub delivery: ThoughtDeliveryState,
}
pub struct FileStore {
base_dir: PathBuf,
cache: RwLock<Vec<PersistedSession>>,
session_write_lock: Mutex<()>,
thought_cache: RwLock<HashMap<String, ThoughtSnapshot>>,
thought_config_cache: RwLock<ThoughtConfig>,
thought_write_lock: Mutex<()>,
}
impl FileStore {
pub async fn new(base_dir: impl Into<PathBuf>) -> anyhow::Result<Arc<Self>> {
let base_dir = base_dir.into();
let dir = base_dir.clone();
tokio::task::spawn_blocking(move || std::fs::create_dir_all(&dir))
.await
.map_err(|e| anyhow::anyhow!("spawn_blocking panicked: {e}"))?
.map_err(|e| anyhow::anyhow!("failed to create persistence directory: {e}"))?;
let store = Arc::new(Self {
base_dir,
cache: RwLock::new(Vec::new()),
session_write_lock: Mutex::new(()),
thought_cache: RwLock::new(HashMap::new()),
thought_config_cache: RwLock::new(ThoughtConfig::default()),
thought_write_lock: Mutex::new(()),
});
let loaded = store.load_sessions_from_disk().await;
let loaded_thoughts = store.load_thoughts_from_disk().await;
let loaded_thought_config = store.load_thought_config_from_disk().await;
{
let mut cache = store.cache.write().await;
*cache = loaded;
}
{
let mut thought_cache = store.thought_cache.write().await;
*thought_cache = loaded_thoughts;
}
{
let mut thought_config_cache = store.thought_config_cache.write().await;
*thought_config_cache = loaded_thought_config;
}
info!(
dir = %store.base_dir.display(),
sessions = store.cache.read().await.len(),
thoughts = store.thought_cache.read().await.len(),
"persistence store initialized"
);
Ok(store)
}
fn registry_path(&self) -> PathBuf {
self.base_dir.join("session_registry.json")
}
fn thoughts_path(&self) -> PathBuf {
self.base_dir.join("thoughts.json")
}
fn thought_config_path(&self) -> PathBuf {
self.base_dir.join("thought_config.json")
}
pub async fn save_sessions(&self, sessions: &[PersistedSession]) {
let _write_guard = self.session_write_lock.lock().await;
{
let mut cache = self.cache.write().await;
*cache = sessions.to_vec();
}
let path = self.registry_path();
let data = match serde_json::to_string_pretty(sessions) {
Ok(d) => d,
Err(e) => {
error!("failed to serialize session registry: {e}");
return;
}
};
if let Err(e) = atomic_write_blocking(path, data).await {
error!("failed to write session registry: {e}");
} else {
debug!(count = sessions.len(), "persisted session registry");
}
}
async fn load_sessions_from_disk(&self) -> Vec<PersistedSession> {
let path = self.registry_path();
match read_file_blocking(path).await {
Ok(Some(data)) => match serde_json::from_str::<Vec<PersistedSession>>(&data) {
Ok(sessions) => {
info!(count = sessions.len(), "loaded persisted session registry");
sessions
}
Err(e) => {
warn!("corrupt session registry, starting fresh: {e}");
Vec::new()
}
},
Ok(None) => {
debug!("no persisted session registry found");
Vec::new()
}
Err(e) => {
warn!("failed to read session registry: {e}");
Vec::new()
}
}
}
pub async fn load_sessions(&self) -> Vec<PersistedSession> {
self.cache.read().await.clone()
}
pub async fn save_thought(
&self,
session_id: &str,
thought: Option<&str>,
token_count: u64,
context_limit: u64,
thought_state: ThoughtState,
thought_source: ThoughtSource,
rest_state: RestState,
commit_candidate: bool,
updated_at: DateTime<Utc>,
delivery: ThoughtDeliveryState,
objective_changed_at: Option<DateTime<Utc>>,
objective_fingerprint: Option<String>,
) {
let _write_guard = self.thought_write_lock.lock().await;
let data = {
let mut thoughts = self.thought_cache.write().await;
let objective_changed_at = objective_changed_at.or_else(|| {
thoughts
.get(session_id)
.and_then(|existing| existing.objective_changed_at)
});
thoughts.insert(
session_id.to_string(),
ThoughtSnapshot {
thought: thought.map(|value| value.to_string()),
thought_state,
thought_source,
rest_state,
commit_candidate,
objective_changed_at,
objective_fingerprint,
token_count,
context_limit,
updated_at,
delivery,
},
);
match serde_json::to_string_pretty(&*thoughts) {
Ok(d) => d,
Err(e) => {
error!("failed to serialize thoughts: {e}");
return;
}
}
};
let path = self.thoughts_path();
if let Err(e) = atomic_write_blocking(path, data).await {
error!("failed to write thoughts: {e}");
} else {
debug!(session_id, "persisted thought snapshot");
}
}
pub async fn load_thoughts(&self) -> HashMap<String, ThoughtSnapshot> {
self.thought_cache.read().await.clone()
}
async fn load_thoughts_from_disk(&self) -> HashMap<String, ThoughtSnapshot> {
let path = self.thoughts_path();
match read_file_blocking(path).await {
Ok(Some(data)) => {
match serde_json::from_str::<HashMap<String, ThoughtSnapshot>>(&data) {
Ok(thoughts) => thoughts,
Err(e) => {
warn!("corrupt thoughts file, starting fresh: {e}");
HashMap::new()
}
}
}
Ok(None) => HashMap::new(),
Err(e) => {
warn!("failed to read thoughts: {e}");
HashMap::new()
}
}
}
pub async fn save_thought_config(&self, config: &ThoughtConfig) -> anyhow::Result<()> {
let normalized = config
.clone()
.normalize_and_validate()
.map_err(|e| anyhow::anyhow!("invalid thought config: {e}"))?;
let path = self.thought_config_path();
let data = serde_json::to_string_pretty(&normalized)
.map_err(|e| anyhow::anyhow!("failed to serialize thought config: {e}"))?;
atomic_write_blocking(path, data).await?;
{
let mut thought_config_cache = self.thought_config_cache.write().await;
*thought_config_cache = normalized;
}
debug!("persisted thought runtime config");
Ok(())
}
pub async fn load_thought_config(&self) -> ThoughtConfig {
self.thought_config_cache.read().await.clone()
}
async fn load_thought_config_from_disk(&self) -> ThoughtConfig {
let path = self.thought_config_path();
match read_file_blocking(path).await {
Ok(Some(data)) => match serde_json::from_str::<ThoughtConfig>(&data) {
Ok(config) => match config.normalize_and_validate() {
Ok(config) => config,
Err(e) => {
warn!("invalid thought config file, using defaults: {e}");
ThoughtConfig::default()
}
},
Err(e) => {
warn!("corrupt thought config file, using defaults: {e}");
ThoughtConfig::default()
}
},
Ok(None) => ThoughtConfig::default(),
Err(e) => {
warn!("failed to read thought config file, using defaults: {e}");
ThoughtConfig::default()
}
}
}
}
async fn atomic_write_blocking(path: PathBuf, data: String) -> anyhow::Result<()> {
tokio::task::spawn_blocking(move || {
ensure_parent(&path).map_err(|e| anyhow::anyhow!("ensure parent failed: {e}"))?;
let tmp_path = path.with_extension(format!("json.tmp.{}", Uuid::new_v4()));
std::fs::write(&tmp_path, data.as_bytes())
.map_err(|e| anyhow::anyhow!("write to tmp failed: {e}"))?;
if let Err(e) = std::fs::rename(&tmp_path, &path) {
let _ = std::fs::remove_file(&tmp_path);
return Err(anyhow::anyhow!("rename failed: {e}"));
}
Ok(())
})
.await
.map_err(|e| anyhow::anyhow!("spawn_blocking panicked: {e}"))?
}
async fn read_file_blocking(path: PathBuf) -> anyhow::Result<Option<String>> {
tokio::task::spawn_blocking(move || match std::fs::read_to_string(&path) {
Ok(data) => Ok(Some(data)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(anyhow::anyhow!("read failed: {e}")),
})
.await
.map_err(|e| anyhow::anyhow!("spawn_blocking panicked: {e}"))?
}
#[allow(dead_code)]
fn ensure_parent(path: &Path) -> std::io::Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::atomic_write_blocking;
#[tokio::test]
async fn atomic_write_blocking_supports_concurrent_writes() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("session_registry.json");
let mut tasks = Vec::new();
for n in 0..16 {
let path = path.clone();
tasks.push(tokio::spawn(async move {
atomic_write_blocking(path, format!("{{\"n\":{n}}}")).await
}));
}
for task in tasks {
let result = task.await.expect("join task");
assert!(result.is_ok(), "concurrent write failed: {result:?}");
}
let contents = tokio::fs::read_to_string(&path)
.await
.expect("read persisted file");
assert!(contents.starts_with("{\"n\":"));
}
}