use std::collections::BTreeMap;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use crate::error::{atomic_write, Result, StoreError};
use crate::mailbox::Mailbox;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProjectKey(String);
impl ProjectKey {
pub fn from_workspace(workspace: &Path) -> Self {
let canonical =
std::fs::canonicalize(workspace).unwrap_or_else(|_| workspace.to_path_buf());
let raw = canonical.to_string_lossy();
let folded: String = raw
.chars()
.map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
.collect();
ProjectKey(format!("{folded}-{:08x}", fnv1a64(raw.as_bytes()) as u32))
}
pub fn from_raw(key: impl Into<String>) -> Self {
ProjectKey(key.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SessionLoc {
Root {
key: ProjectKey,
session_id: String,
},
Child {
key: ProjectKey,
parent_id: String,
child_id: String,
},
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct ProjectIndex {
#[serde(default)]
pub version: u32,
#[serde(default)]
pub roots: Vec<RootEntry>,
#[serde(default)]
pub child_lookup: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RootEntry {
pub session_id: String,
pub title: String,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct ChildrenIndex {
#[serde(default)]
pub version: u32,
#[serde(default)]
pub children: Vec<ChildEntry>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ChildEntry {
pub child_id: String,
pub subagent_type: String,
pub status: ChildStatus,
pub title: String,
pub responsibility: String,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ChildStatus {
Pending,
Running,
Idle,
Completed,
Error,
Cancelled,
}
#[derive(Debug, Clone, PartialEq)]
pub struct RootFields {
pub title: String,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ChildFields {
pub subagent_type: String,
pub status: ChildStatus,
pub title: String,
pub responsibility: String,
pub updated_at: DateTime<Utc>,
}
pub trait MetaExtractor: Sync {
fn root(&self, session_id: &str, payload: &serde_json::Value) -> RootFields;
fn child(&self, child_id: &str, payload: &serde_json::Value) -> ChildFields;
}
pub struct SubagentStore {
root: PathBuf,
write_lock: tokio::sync::Mutex<()>,
}
impl SubagentStore {
pub fn open(root: impl Into<PathBuf>) -> Self {
Self {
root: root.into(),
write_lock: tokio::sync::Mutex::new(()),
}
}
fn project_dir(&self, key: &ProjectKey) -> PathBuf {
self.root.join("projects").join(key.as_str())
}
fn index_file(&self, key: &ProjectKey) -> PathBuf {
self.project_dir(key).join("index.json")
}
fn sessions_dir(&self, key: &ProjectKey) -> PathBuf {
self.project_dir(key).join("sessions")
}
fn parent_dir(&self, key: &ProjectKey, parent_id: &str) -> PathBuf {
self.sessions_dir(key).join(parent_id)
}
fn children_index_file(&self, key: &ProjectKey, parent_id: &str) -> PathBuf {
self.parent_dir(key, parent_id).join("children.json")
}
fn child_dir(&self, key: &ProjectKey, parent_id: &str, child_id: &str) -> PathBuf {
self.parent_dir(key, parent_id)
.join("children")
.join(child_id)
}
fn session_dir(&self, loc: &SessionLoc) -> PathBuf {
match loc {
SessionLoc::Root { key, session_id } => self.parent_dir(key, session_id),
SessionLoc::Child {
key,
parent_id,
child_id,
} => self.child_dir(key, parent_id, child_id),
}
}
fn session_file(&self, loc: &SessionLoc) -> PathBuf {
self.session_dir(loc).join("session.json")
}
pub fn mailbox(&self, loc: &SessionLoc) -> Mailbox {
Mailbox::at(self.session_dir(loc).join("mailbox"))
}
pub async fn save_session<T: Serialize>(&self, loc: &SessionLoc, payload: &T) -> Result<()> {
let path = self.session_file(loc);
let bytes = serde_json::to_vec_pretty(payload).map_err(|e| StoreError::decode(&path, e))?;
atomic_write(&path, &bytes).await
}
pub async fn load_session<T: DeserializeOwned>(&self, loc: &SessionLoc) -> Result<T> {
let path = self.session_file(loc);
let bytes = tokio::fs::read(&path)
.await
.map_err(|e| StoreError::io(&path, e))?;
serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(&path, e))
}
pub async fn session_exists(&self, loc: &SessionLoc) -> bool {
tokio::fs::try_exists(self.session_file(loc))
.await
.unwrap_or(false)
}
pub async fn list_roots(&self, key: &ProjectKey) -> Result<Vec<RootEntry>> {
let idx: ProjectIndex = self.read_json(&self.index_file(key)).await?;
Ok(idx.roots)
}
pub async fn list_children(
&self,
key: &ProjectKey,
parent_id: &str,
) -> Result<Vec<ChildEntry>> {
let idx: ChildrenIndex = self
.read_json(&self.children_index_file(key, parent_id))
.await?;
Ok(idx.children)
}
pub async fn resolve_child(
&self,
key: &ProjectKey,
child_id: &str,
) -> Result<Option<SessionLoc>> {
let idx: ProjectIndex = self.read_json(&self.index_file(key)).await?;
Ok(idx
.child_lookup
.get(child_id)
.map(|parent_id| SessionLoc::Child {
key: key.clone(),
parent_id: parent_id.clone(),
child_id: child_id.to_string(),
}))
}
pub async fn upsert_root(&self, key: &ProjectKey, entry: RootEntry) -> Result<()> {
let _guard = self.write_lock.lock().await;
let path = self.index_file(key);
let mut idx: ProjectIndex = self.read_json(&path).await?;
match idx
.roots
.iter_mut()
.find(|r| r.session_id == entry.session_id)
{
Some(slot) => *slot = entry,
None => idx.roots.push(entry),
}
idx.roots.sort_by(|a, b| a.session_id.cmp(&b.session_id));
self.write_json(&path, &idx).await
}
pub async fn upsert_child(
&self,
key: &ProjectKey,
parent_id: &str,
entry: ChildEntry,
) -> Result<()> {
let _guard = self.write_lock.lock().await;
let cpath = self.children_index_file(key, parent_id);
let mut cidx: ChildrenIndex = self.read_json(&cpath).await?;
match cidx
.children
.iter_mut()
.find(|c| c.child_id == entry.child_id)
{
Some(slot) => *slot = entry.clone(),
None => cidx.children.push(entry.clone()),
}
cidx.children.sort_by(|a, b| a.child_id.cmp(&b.child_id));
self.write_json(&cpath, &cidx).await?;
let ipath = self.index_file(key);
let mut idx: ProjectIndex = self.read_json(&ipath).await?;
idx.child_lookup
.insert(entry.child_id, parent_id.to_string());
self.write_json(&ipath, &idx).await
}
pub async fn remove_child(
&self,
key: &ProjectKey,
parent_id: &str,
child_id: &str,
) -> Result<()> {
let _guard = self.write_lock.lock().await;
let cpath = self.children_index_file(key, parent_id);
let mut cidx: ChildrenIndex = self.read_json(&cpath).await?;
cidx.children.retain(|c| c.child_id != child_id);
self.write_json(&cpath, &cidx).await?;
let ipath = self.index_file(key);
let mut idx: ProjectIndex = self.read_json(&ipath).await?;
idx.child_lookup.remove(child_id);
self.write_json(&ipath, &idx).await
}
pub async fn rebuild_index(
&self,
key: &ProjectKey,
extractor: &dyn MetaExtractor,
) -> Result<()> {
let _guard = self.write_lock.lock().await;
let sessions = self.sessions_dir(key);
let mut idx = ProjectIndex::default();
let mut parents = match tokio::fs::read_dir(&sessions).await {
Ok(rd) => rd,
Err(e) if e.kind() == ErrorKind::NotFound => {
return self.write_json(&self.index_file(key), &idx).await;
}
Err(e) => return Err(StoreError::io(&sessions, e)),
};
while let Some(p) = parents
.next_entry()
.await
.map_err(|e| StoreError::io(&sessions, e))?
{
if !is_dir(&p).await {
continue;
}
let parent_id = p.file_name().to_string_lossy().into_owned();
if let Some(val) = self.try_read_value(&p.path().join("session.json")).await? {
let rf = extractor.root(&parent_id, &val);
idx.roots.push(RootEntry {
session_id: parent_id.clone(),
title: rf.title,
updated_at: rf.updated_at,
});
}
let mut cidx = ChildrenIndex::default();
let cdir = p.path().join("children");
if let Ok(mut kids) = tokio::fs::read_dir(&cdir).await {
while let Some(c) = kids
.next_entry()
.await
.map_err(|e| StoreError::io(&cdir, e))?
{
if !is_dir(&c).await {
continue;
}
let child_id = c.file_name().to_string_lossy().into_owned();
if let Some(val) = self.try_read_value(&c.path().join("session.json")).await? {
let cf = extractor.child(&child_id, &val);
cidx.children.push(ChildEntry {
child_id: child_id.clone(),
subagent_type: cf.subagent_type,
status: cf.status,
title: cf.title,
responsibility: cf.responsibility,
updated_at: cf.updated_at,
});
idx.child_lookup.insert(child_id, parent_id.clone());
}
}
}
cidx.children.sort_by(|a, b| a.child_id.cmp(&b.child_id));
self.write_json(&self.children_index_file(key, &parent_id), &cidx)
.await?;
}
idx.roots.sort_by(|a, b| a.session_id.cmp(&b.session_id));
self.write_json(&self.index_file(key), &idx).await
}
async fn read_json<T: DeserializeOwned + Default>(&self, path: &Path) -> Result<T> {
match tokio::fs::read(path).await {
Ok(bytes) => serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(path, e)),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(T::default()),
Err(e) => Err(StoreError::io(path, e)),
}
}
async fn try_read_value(&self, path: &Path) -> Result<Option<serde_json::Value>> {
match tokio::fs::read(path).await {
Ok(bytes) => {
let v = serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(path, e))?;
Ok(Some(v))
}
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(StoreError::io(path, e)),
}
}
async fn write_json<T: Serialize>(&self, path: &Path, value: &T) -> Result<()> {
let bytes = serde_json::to_vec_pretty(value).map_err(|e| StoreError::decode(path, e))?;
atomic_write(path, &bytes).await
}
}
async fn is_dir(entry: &tokio::fs::DirEntry) -> bool {
match entry.file_type().await {
Ok(ft) => ft.is_dir(),
Err(_) => false,
}
}
fn fnv1a64(bytes: &[u8]) -> u64 {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for b in bytes {
hash ^= u64::from(*b);
hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
}
hash
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use serde_json::json;
use tempfile::TempDir;
fn ts() -> DateTime<Utc> {
Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap()
}
fn key() -> ProjectKey {
ProjectKey::from_raw("proj")
}
fn store() -> (TempDir, SubagentStore) {
let dir = TempDir::new().unwrap();
let store = SubagentStore::open(dir.path());
(dir, store)
}
fn child_payload(title: &str, kind: &str, status: &str) -> serde_json::Value {
json!({
"title": title,
"subagent_type": kind,
"status": status,
"responsibility": format!("do {title}"),
"updated_at": ts().to_rfc3339(),
})
}
struct Extract;
impl MetaExtractor for Extract {
fn root(&self, _id: &str, p: &serde_json::Value) -> RootFields {
RootFields {
title: p["title"].as_str().unwrap_or_default().to_string(),
updated_at: parse_ts(&p["updated_at"]),
}
}
fn child(&self, _id: &str, p: &serde_json::Value) -> ChildFields {
ChildFields {
subagent_type: p["subagent_type"].as_str().unwrap_or_default().to_string(),
status: parse_status(p["status"].as_str().unwrap_or("pending")),
title: p["title"].as_str().unwrap_or_default().to_string(),
responsibility: p["responsibility"].as_str().unwrap_or_default().to_string(),
updated_at: parse_ts(&p["updated_at"]),
}
}
}
fn parse_ts(v: &serde_json::Value) -> DateTime<Utc> {
v.as_str()
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|d| d.with_timezone(&Utc))
.unwrap_or_else(ts)
}
fn parse_status(s: &str) -> ChildStatus {
match s {
"running" => ChildStatus::Running,
"idle" => ChildStatus::Idle,
"completed" => ChildStatus::Completed,
"error" => ChildStatus::Error,
"cancelled" => ChildStatus::Cancelled,
_ => ChildStatus::Pending,
}
}
#[tokio::test]
async fn session_round_trips() {
let (_d, s) = store();
let loc = SessionLoc::Root {
key: key(),
session_id: "p1".into(),
};
let payload = json!({"hello": "world", "n": 42});
s.save_session(&loc, &payload).await.unwrap();
assert!(s.session_exists(&loc).await);
let got: serde_json::Value = s.load_session(&loc).await.unwrap();
assert_eq!(got, payload);
}
#[tokio::test]
async fn upsert_list_and_resolve_child() {
let (_d, s) = store();
let k = key();
let entry = ChildEntry {
child_id: "c1".into(),
subagent_type: "researcher".into(),
status: ChildStatus::Running,
title: "t".into(),
responsibility: "r".into(),
updated_at: ts(),
};
s.upsert_child(&k, "p1", entry.clone()).await.unwrap();
let listed = s.list_children(&k, "p1").await.unwrap();
assert_eq!(listed, vec![entry]);
let loc = s.resolve_child(&k, "c1").await.unwrap();
assert_eq!(
loc,
Some(SessionLoc::Child {
key: k.clone(),
parent_id: "p1".into(),
child_id: "c1".into(),
})
);
assert_eq!(s.resolve_child(&k, "missing").await.unwrap(), None);
}
#[tokio::test]
async fn upsert_replaces_in_place() {
let (_d, s) = store();
let k = key();
let mut e = ChildEntry {
child_id: "c1".into(),
subagent_type: "x".into(),
status: ChildStatus::Pending,
title: "t".into(),
responsibility: "r".into(),
updated_at: ts(),
};
s.upsert_child(&k, "p1", e.clone()).await.unwrap();
e.status = ChildStatus::Completed;
s.upsert_child(&k, "p1", e.clone()).await.unwrap();
let listed = s.list_children(&k, "p1").await.unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].status, ChildStatus::Completed);
}
#[tokio::test]
async fn remove_child_clears_index_and_lookup() {
let (_d, s) = store();
let k = key();
let e = ChildEntry {
child_id: "c1".into(),
subagent_type: "x".into(),
status: ChildStatus::Pending,
title: "t".into(),
responsibility: "r".into(),
updated_at: ts(),
};
s.upsert_child(&k, "p1", e).await.unwrap();
s.remove_child(&k, "p1", "c1").await.unwrap();
assert!(s.list_children(&k, "p1").await.unwrap().is_empty());
assert_eq!(s.resolve_child(&k, "c1").await.unwrap(), None);
}
#[tokio::test]
async fn rebuild_matches_incremental() {
let (_d, s) = store();
let k = key();
let root = SessionLoc::Root {
key: k.clone(),
session_id: "p1".into(),
};
s.save_session(
&root,
&json!({"title": "Parent", "updated_at": ts().to_rfc3339()}),
)
.await
.unwrap();
s.upsert_root(
&k,
RootEntry {
session_id: "p1".into(),
title: "Parent".into(),
updated_at: ts(),
},
)
.await
.unwrap();
for (cid, kind) in [("c1", "researcher"), ("c2", "coder")] {
let loc = SessionLoc::Child {
key: k.clone(),
parent_id: "p1".into(),
child_id: cid.into(),
};
s.save_session(&loc, &child_payload(cid, kind, "running"))
.await
.unwrap();
s.upsert_child(
&k,
"p1",
ChildEntry {
child_id: cid.into(),
subagent_type: kind.into(),
status: ChildStatus::Running,
title: cid.into(),
responsibility: format!("do {cid}"),
updated_at: ts(),
},
)
.await
.unwrap();
}
let index_path = s.index_file(&k);
let children_path = s.children_index_file(&k, "p1");
let before_index: ProjectIndex = s.read_json(&index_path).await.unwrap();
let before_children: ChildrenIndex = s.read_json(&children_path).await.unwrap();
tokio::fs::remove_file(&index_path).await.unwrap();
tokio::fs::remove_file(&children_path).await.unwrap();
s.rebuild_index(&k, &Extract).await.unwrap();
let after_index: ProjectIndex = s.read_json(&index_path).await.unwrap();
let after_children: ChildrenIndex = s.read_json(&children_path).await.unwrap();
assert_eq!(after_index, before_index);
assert_eq!(after_children, before_children);
}
#[test]
fn project_key_distinguishes_colliding_folds_and_is_deterministic() {
let k1 = ProjectKey::from_workspace(Path::new("/nonexistent/a/b"));
let k2 = ProjectKey::from_workspace(Path::new("/nonexistent/a-b"));
assert_ne!(k1, k2);
assert_eq!(
k1,
ProjectKey::from_workspace(Path::new("/nonexistent/a/b"))
);
}
#[tokio::test]
async fn rebuild_converges_after_partial_write() {
let (_d, s) = store();
let k = key();
let loc = SessionLoc::Child {
key: k.clone(),
parent_id: "p1".into(),
child_id: "c1".into(),
};
s.save_session(&loc, &child_payload("c1", "researcher", "running"))
.await
.unwrap();
s.write_json(
&s.children_index_file(&k, "p1"),
&ChildrenIndex {
version: 0,
children: vec![ChildEntry {
child_id: "c1".into(),
subagent_type: "researcher".into(),
status: ChildStatus::Running,
title: "c1".into(),
responsibility: "do c1".into(),
updated_at: ts(),
}],
},
)
.await
.unwrap();
assert_eq!(s.resolve_child(&k, "c1").await.unwrap(), None);
s.rebuild_index(&k, &Extract).await.unwrap();
assert!(s.resolve_child(&k, "c1").await.unwrap().is_some());
}
#[tokio::test(flavor = "multi_thread")]
async fn concurrent_upserts_do_not_lose_children() {
use std::sync::Arc;
let dir = TempDir::new().unwrap();
let s = Arc::new(SubagentStore::open(dir.path()));
let k = key();
let mut handles = Vec::new();
for i in 0..16 {
let s = s.clone();
let k = k.clone();
handles.push(tokio::spawn(async move {
let child_id = format!("c{i}");
let entry = ChildEntry {
child_id: child_id.clone(),
subagent_type: "coder".into(),
status: ChildStatus::Pending,
title: child_id.clone(),
responsibility: format!("do {child_id}"),
updated_at: ts(),
};
s.upsert_child(&k, "p1", entry).await.unwrap();
}));
}
for h in handles {
h.await.unwrap();
}
let listed = s.list_children(&k, "p1").await.unwrap();
assert_eq!(
listed.len(),
16,
"all 16 children must survive concurrent upserts"
);
let resolved = s.resolve_child(&k, "c7").await.unwrap();
assert!(
resolved.is_some(),
"resolve_child(\"c7\") must hit after concurrent upserts"
);
}
}