1use std::collections::BTreeMap;
26use std::io::ErrorKind;
27use std::path::{Path, PathBuf};
28
29use chrono::{DateTime, Utc};
30use serde::de::DeserializeOwned;
31use serde::{Deserialize, Serialize};
32
33use crate::error::{atomic_write, Result, StoreError};
34use crate::mailbox::Mailbox;
35
36#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct ProjectKey(String);
39
40impl ProjectKey {
41 pub fn from_workspace(workspace: &Path) -> Self {
48 let canonical =
49 std::fs::canonicalize(workspace).unwrap_or_else(|_| workspace.to_path_buf());
50 let raw = canonical.to_string_lossy();
51 let folded: String = raw
52 .chars()
53 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
54 .collect();
55 ProjectKey(format!("{folded}-{:08x}", fnv1a64(raw.as_bytes()) as u32))
56 }
57
58 pub fn from_raw(key: impl Into<String>) -> Self {
60 ProjectKey(key.into())
61 }
62
63 pub fn as_str(&self) -> &str {
64 &self.0
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum SessionLoc {
71 Root {
72 key: ProjectKey,
73 session_id: String,
74 },
75 Child {
76 key: ProjectKey,
77 parent_id: String,
78 child_id: String,
79 },
80}
81
82#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
84pub struct ProjectIndex {
85 #[serde(default)]
86 pub version: u32,
87 #[serde(default)]
88 pub roots: Vec<RootEntry>,
89 #[serde(default)]
90 pub child_lookup: BTreeMap<String, String>,
91}
92
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
94pub struct RootEntry {
95 pub session_id: String,
96 pub title: String,
97 pub updated_at: DateTime<Utc>,
98}
99
100#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
102pub struct ChildrenIndex {
103 #[serde(default)]
104 pub version: u32,
105 #[serde(default)]
106 pub children: Vec<ChildEntry>,
107}
108
109#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
110pub struct ChildEntry {
111 pub child_id: String,
112 pub subagent_type: String,
113 pub status: ChildStatus,
114 pub title: String,
115 pub responsibility: String,
116 pub updated_at: DateTime<Utc>,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
120#[serde(rename_all = "snake_case")]
121pub enum ChildStatus {
122 Pending,
123 Running,
124 Idle,
125 Completed,
126 Error,
127 Cancelled,
128}
129
130#[derive(Debug, Clone, PartialEq)]
132pub struct RootFields {
133 pub title: String,
134 pub updated_at: DateTime<Utc>,
135}
136
137#[derive(Debug, Clone, PartialEq)]
139pub struct ChildFields {
140 pub subagent_type: String,
141 pub status: ChildStatus,
142 pub title: String,
143 pub responsibility: String,
144 pub updated_at: DateTime<Utc>,
145}
146
147pub trait MetaExtractor: Sync {
150 fn root(&self, session_id: &str, payload: &serde_json::Value) -> RootFields;
151 fn child(&self, child_id: &str, payload: &serde_json::Value) -> ChildFields;
152}
153
154pub struct SubagentStore {
156 root: PathBuf,
157 write_lock: tokio::sync::Mutex<()>,
159}
160
161impl SubagentStore {
162 pub fn open(root: impl Into<PathBuf>) -> Self {
163 Self {
164 root: root.into(),
165 write_lock: tokio::sync::Mutex::new(()),
166 }
167 }
168
169 fn project_dir(&self, key: &ProjectKey) -> PathBuf {
172 self.root.join("projects").join(key.as_str())
173 }
174 fn index_file(&self, key: &ProjectKey) -> PathBuf {
175 self.project_dir(key).join("index.json")
176 }
177 fn sessions_dir(&self, key: &ProjectKey) -> PathBuf {
178 self.project_dir(key).join("sessions")
179 }
180 fn parent_dir(&self, key: &ProjectKey, parent_id: &str) -> PathBuf {
181 self.sessions_dir(key).join(parent_id)
182 }
183 fn children_index_file(&self, key: &ProjectKey, parent_id: &str) -> PathBuf {
184 self.parent_dir(key, parent_id).join("children.json")
185 }
186 fn child_dir(&self, key: &ProjectKey, parent_id: &str, child_id: &str) -> PathBuf {
187 self.parent_dir(key, parent_id)
188 .join("children")
189 .join(child_id)
190 }
191 fn session_dir(&self, loc: &SessionLoc) -> PathBuf {
192 match loc {
193 SessionLoc::Root { key, session_id } => self.parent_dir(key, session_id),
194 SessionLoc::Child {
195 key,
196 parent_id,
197 child_id,
198 } => self.child_dir(key, parent_id, child_id),
199 }
200 }
201 fn session_file(&self, loc: &SessionLoc) -> PathBuf {
202 self.session_dir(loc).join("session.json")
203 }
204
205 pub fn mailbox(&self, loc: &SessionLoc) -> Mailbox {
207 Mailbox::at(self.session_dir(loc).join("mailbox"))
208 }
209
210 pub async fn save_session<T: Serialize>(&self, loc: &SessionLoc, payload: &T) -> Result<()> {
213 let path = self.session_file(loc);
214 let bytes = serde_json::to_vec_pretty(payload).map_err(|e| StoreError::decode(&path, e))?;
215 atomic_write(&path, &bytes).await
216 }
217
218 pub async fn load_session<T: DeserializeOwned>(&self, loc: &SessionLoc) -> Result<T> {
219 let path = self.session_file(loc);
220 let bytes = tokio::fs::read(&path)
221 .await
222 .map_err(|e| StoreError::io(&path, e))?;
223 serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(&path, e))
224 }
225
226 pub async fn session_exists(&self, loc: &SessionLoc) -> bool {
227 tokio::fs::try_exists(self.session_file(loc))
228 .await
229 .unwrap_or(false)
230 }
231
232 pub async fn list_roots(&self, key: &ProjectKey) -> Result<Vec<RootEntry>> {
235 let idx: ProjectIndex = self.read_json(&self.index_file(key)).await?;
236 Ok(idx.roots)
237 }
238
239 pub async fn list_children(
240 &self,
241 key: &ProjectKey,
242 parent_id: &str,
243 ) -> Result<Vec<ChildEntry>> {
244 let idx: ChildrenIndex = self
245 .read_json(&self.children_index_file(key, parent_id))
246 .await?;
247 Ok(idx.children)
248 }
249
250 pub async fn resolve_child(
252 &self,
253 key: &ProjectKey,
254 child_id: &str,
255 ) -> Result<Option<SessionLoc>> {
256 let idx: ProjectIndex = self.read_json(&self.index_file(key)).await?;
257 Ok(idx
258 .child_lookup
259 .get(child_id)
260 .map(|parent_id| SessionLoc::Child {
261 key: key.clone(),
262 parent_id: parent_id.clone(),
263 child_id: child_id.to_string(),
264 }))
265 }
266
267 pub async fn upsert_root(&self, key: &ProjectKey, entry: RootEntry) -> Result<()> {
270 let _guard = self.write_lock.lock().await;
271 let path = self.index_file(key);
272 let mut idx: ProjectIndex = self.read_json(&path).await?;
273 match idx
274 .roots
275 .iter_mut()
276 .find(|r| r.session_id == entry.session_id)
277 {
278 Some(slot) => *slot = entry,
279 None => idx.roots.push(entry),
280 }
281 idx.roots.sort_by(|a, b| a.session_id.cmp(&b.session_id));
282 self.write_json(&path, &idx).await
283 }
284
285 pub async fn upsert_child(
286 &self,
287 key: &ProjectKey,
288 parent_id: &str,
289 entry: ChildEntry,
290 ) -> Result<()> {
291 let _guard = self.write_lock.lock().await;
292 let cpath = self.children_index_file(key, parent_id);
294 let mut cidx: ChildrenIndex = self.read_json(&cpath).await?;
295 match cidx
296 .children
297 .iter_mut()
298 .find(|c| c.child_id == entry.child_id)
299 {
300 Some(slot) => *slot = entry.clone(),
301 None => cidx.children.push(entry.clone()),
302 }
303 cidx.children.sort_by(|a, b| a.child_id.cmp(&b.child_id));
304 self.write_json(&cpath, &cidx).await?;
305
306 let ipath = self.index_file(key);
308 let mut idx: ProjectIndex = self.read_json(&ipath).await?;
309 idx.child_lookup
310 .insert(entry.child_id, parent_id.to_string());
311 self.write_json(&ipath, &idx).await
312 }
313
314 pub async fn remove_child(
315 &self,
316 key: &ProjectKey,
317 parent_id: &str,
318 child_id: &str,
319 ) -> Result<()> {
320 let _guard = self.write_lock.lock().await;
321 let cpath = self.children_index_file(key, parent_id);
322 let mut cidx: ChildrenIndex = self.read_json(&cpath).await?;
323 cidx.children.retain(|c| c.child_id != child_id);
324 self.write_json(&cpath, &cidx).await?;
325
326 let ipath = self.index_file(key);
327 let mut idx: ProjectIndex = self.read_json(&ipath).await?;
328 idx.child_lookup.remove(child_id);
329 self.write_json(&ipath, &idx).await
330 }
331
332 pub async fn rebuild_index(
337 &self,
338 key: &ProjectKey,
339 extractor: &dyn MetaExtractor,
340 ) -> Result<()> {
341 let _guard = self.write_lock.lock().await;
342 let sessions = self.sessions_dir(key);
343 let mut idx = ProjectIndex::default();
344
345 let mut parents = match tokio::fs::read_dir(&sessions).await {
346 Ok(rd) => rd,
347 Err(e) if e.kind() == ErrorKind::NotFound => {
348 return self.write_json(&self.index_file(key), &idx).await;
350 }
351 Err(e) => return Err(StoreError::io(&sessions, e)),
352 };
353
354 while let Some(p) = parents
355 .next_entry()
356 .await
357 .map_err(|e| StoreError::io(&sessions, e))?
358 {
359 if !is_dir(&p).await {
360 continue;
361 }
362 let parent_id = p.file_name().to_string_lossy().into_owned();
363
364 if let Some(val) = self.try_read_value(&p.path().join("session.json")).await? {
365 let rf = extractor.root(&parent_id, &val);
366 idx.roots.push(RootEntry {
367 session_id: parent_id.clone(),
368 title: rf.title,
369 updated_at: rf.updated_at,
370 });
371 }
372
373 let mut cidx = ChildrenIndex::default();
375 let cdir = p.path().join("children");
376 if let Ok(mut kids) = tokio::fs::read_dir(&cdir).await {
377 while let Some(c) = kids
378 .next_entry()
379 .await
380 .map_err(|e| StoreError::io(&cdir, e))?
381 {
382 if !is_dir(&c).await {
383 continue;
384 }
385 let child_id = c.file_name().to_string_lossy().into_owned();
386 if let Some(val) = self.try_read_value(&c.path().join("session.json")).await? {
387 let cf = extractor.child(&child_id, &val);
388 cidx.children.push(ChildEntry {
389 child_id: child_id.clone(),
390 subagent_type: cf.subagent_type,
391 status: cf.status,
392 title: cf.title,
393 responsibility: cf.responsibility,
394 updated_at: cf.updated_at,
395 });
396 idx.child_lookup.insert(child_id, parent_id.clone());
397 }
398 }
399 }
400 cidx.children.sort_by(|a, b| a.child_id.cmp(&b.child_id));
401 self.write_json(&self.children_index_file(key, &parent_id), &cidx)
402 .await?;
403 }
404
405 idx.roots.sort_by(|a, b| a.session_id.cmp(&b.session_id));
406 self.write_json(&self.index_file(key), &idx).await
407 }
408
409 async fn read_json<T: DeserializeOwned + Default>(&self, path: &Path) -> Result<T> {
412 match tokio::fs::read(path).await {
413 Ok(bytes) => serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(path, e)),
414 Err(e) if e.kind() == ErrorKind::NotFound => Ok(T::default()),
415 Err(e) => Err(StoreError::io(path, e)),
416 }
417 }
418
419 async fn try_read_value(&self, path: &Path) -> Result<Option<serde_json::Value>> {
420 match tokio::fs::read(path).await {
421 Ok(bytes) => {
422 let v = serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(path, e))?;
423 Ok(Some(v))
424 }
425 Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
426 Err(e) => Err(StoreError::io(path, e)),
427 }
428 }
429
430 async fn write_json<T: Serialize>(&self, path: &Path, value: &T) -> Result<()> {
431 let bytes = serde_json::to_vec_pretty(value).map_err(|e| StoreError::decode(path, e))?;
432 atomic_write(path, &bytes).await
433 }
434}
435
436async fn is_dir(entry: &tokio::fs::DirEntry) -> bool {
437 match entry.file_type().await {
438 Ok(ft) => ft.is_dir(),
439 Err(_) => false,
440 }
441}
442
443fn fnv1a64(bytes: &[u8]) -> u64 {
445 let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
446 for b in bytes {
447 hash ^= u64::from(*b);
448 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
449 }
450 hash
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use chrono::TimeZone;
457 use serde_json::json;
458 use tempfile::TempDir;
459
460 fn ts() -> DateTime<Utc> {
461 Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap()
462 }
463
464 fn key() -> ProjectKey {
465 ProjectKey::from_raw("proj")
466 }
467
468 fn store() -> (TempDir, SubagentStore) {
469 let dir = TempDir::new().unwrap();
470 let store = SubagentStore::open(dir.path());
471 (dir, store)
472 }
473
474 fn child_payload(title: &str, kind: &str, status: &str) -> serde_json::Value {
475 json!({
476 "title": title,
477 "subagent_type": kind,
478 "status": status,
479 "responsibility": format!("do {title}"),
480 "updated_at": ts().to_rfc3339(),
481 })
482 }
483
484 struct Extract;
486 impl MetaExtractor for Extract {
487 fn root(&self, _id: &str, p: &serde_json::Value) -> RootFields {
488 RootFields {
489 title: p["title"].as_str().unwrap_or_default().to_string(),
490 updated_at: parse_ts(&p["updated_at"]),
491 }
492 }
493 fn child(&self, _id: &str, p: &serde_json::Value) -> ChildFields {
494 ChildFields {
495 subagent_type: p["subagent_type"].as_str().unwrap_or_default().to_string(),
496 status: parse_status(p["status"].as_str().unwrap_or("pending")),
497 title: p["title"].as_str().unwrap_or_default().to_string(),
498 responsibility: p["responsibility"].as_str().unwrap_or_default().to_string(),
499 updated_at: parse_ts(&p["updated_at"]),
500 }
501 }
502 }
503 fn parse_ts(v: &serde_json::Value) -> DateTime<Utc> {
504 v.as_str()
505 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
506 .map(|d| d.with_timezone(&Utc))
507 .unwrap_or_else(ts)
508 }
509 fn parse_status(s: &str) -> ChildStatus {
510 match s {
511 "running" => ChildStatus::Running,
512 "idle" => ChildStatus::Idle,
513 "completed" => ChildStatus::Completed,
514 "error" => ChildStatus::Error,
515 "cancelled" => ChildStatus::Cancelled,
516 _ => ChildStatus::Pending,
517 }
518 }
519
520 #[tokio::test]
521 async fn session_round_trips() {
522 let (_d, s) = store();
523 let loc = SessionLoc::Root {
524 key: key(),
525 session_id: "p1".into(),
526 };
527 let payload = json!({"hello": "world", "n": 42});
528 s.save_session(&loc, &payload).await.unwrap();
529 assert!(s.session_exists(&loc).await);
530 let got: serde_json::Value = s.load_session(&loc).await.unwrap();
531 assert_eq!(got, payload);
532 }
533
534 #[tokio::test]
535 async fn upsert_list_and_resolve_child() {
536 let (_d, s) = store();
537 let k = key();
538 let entry = ChildEntry {
539 child_id: "c1".into(),
540 subagent_type: "researcher".into(),
541 status: ChildStatus::Running,
542 title: "t".into(),
543 responsibility: "r".into(),
544 updated_at: ts(),
545 };
546 s.upsert_child(&k, "p1", entry.clone()).await.unwrap();
547
548 let listed = s.list_children(&k, "p1").await.unwrap();
549 assert_eq!(listed, vec![entry]);
550
551 let loc = s.resolve_child(&k, "c1").await.unwrap();
552 assert_eq!(
553 loc,
554 Some(SessionLoc::Child {
555 key: k.clone(),
556 parent_id: "p1".into(),
557 child_id: "c1".into(),
558 })
559 );
560 assert_eq!(s.resolve_child(&k, "missing").await.unwrap(), None);
561 }
562
563 #[tokio::test]
564 async fn upsert_replaces_in_place() {
565 let (_d, s) = store();
566 let k = key();
567 let mut e = ChildEntry {
568 child_id: "c1".into(),
569 subagent_type: "x".into(),
570 status: ChildStatus::Pending,
571 title: "t".into(),
572 responsibility: "r".into(),
573 updated_at: ts(),
574 };
575 s.upsert_child(&k, "p1", e.clone()).await.unwrap();
576 e.status = ChildStatus::Completed;
577 s.upsert_child(&k, "p1", e.clone()).await.unwrap();
578 let listed = s.list_children(&k, "p1").await.unwrap();
579 assert_eq!(listed.len(), 1);
580 assert_eq!(listed[0].status, ChildStatus::Completed);
581 }
582
583 #[tokio::test]
584 async fn remove_child_clears_index_and_lookup() {
585 let (_d, s) = store();
586 let k = key();
587 let e = ChildEntry {
588 child_id: "c1".into(),
589 subagent_type: "x".into(),
590 status: ChildStatus::Pending,
591 title: "t".into(),
592 responsibility: "r".into(),
593 updated_at: ts(),
594 };
595 s.upsert_child(&k, "p1", e).await.unwrap();
596 s.remove_child(&k, "p1", "c1").await.unwrap();
597 assert!(s.list_children(&k, "p1").await.unwrap().is_empty());
598 assert_eq!(s.resolve_child(&k, "c1").await.unwrap(), None);
599 }
600
601 #[tokio::test]
602 async fn rebuild_matches_incremental() {
603 let (_d, s) = store();
604 let k = key();
605
606 let root = SessionLoc::Root {
608 key: k.clone(),
609 session_id: "p1".into(),
610 };
611 s.save_session(
612 &root,
613 &json!({"title": "Parent", "updated_at": ts().to_rfc3339()}),
614 )
615 .await
616 .unwrap();
617 s.upsert_root(
618 &k,
619 RootEntry {
620 session_id: "p1".into(),
621 title: "Parent".into(),
622 updated_at: ts(),
623 },
624 )
625 .await
626 .unwrap();
627
628 for (cid, kind) in [("c1", "researcher"), ("c2", "coder")] {
629 let loc = SessionLoc::Child {
630 key: k.clone(),
631 parent_id: "p1".into(),
632 child_id: cid.into(),
633 };
634 s.save_session(&loc, &child_payload(cid, kind, "running"))
635 .await
636 .unwrap();
637 s.upsert_child(
638 &k,
639 "p1",
640 ChildEntry {
641 child_id: cid.into(),
642 subagent_type: kind.into(),
643 status: ChildStatus::Running,
644 title: cid.into(),
645 responsibility: format!("do {cid}"),
646 updated_at: ts(),
647 },
648 )
649 .await
650 .unwrap();
651 }
652
653 let index_path = s.index_file(&k);
654 let children_path = s.children_index_file(&k, "p1");
655 let before_index: ProjectIndex = s.read_json(&index_path).await.unwrap();
656 let before_children: ChildrenIndex = s.read_json(&children_path).await.unwrap();
657
658 tokio::fs::remove_file(&index_path).await.unwrap();
660 tokio::fs::remove_file(&children_path).await.unwrap();
661 s.rebuild_index(&k, &Extract).await.unwrap();
662
663 let after_index: ProjectIndex = s.read_json(&index_path).await.unwrap();
664 let after_children: ChildrenIndex = s.read_json(&children_path).await.unwrap();
665 assert_eq!(after_index, before_index);
666 assert_eq!(after_children, before_children);
667 }
668
669 #[test]
670 fn project_key_distinguishes_colliding_folds_and_is_deterministic() {
671 let k1 = ProjectKey::from_workspace(Path::new("/nonexistent/a/b"));
673 let k2 = ProjectKey::from_workspace(Path::new("/nonexistent/a-b"));
674 assert_ne!(k1, k2);
675 assert_eq!(
677 k1,
678 ProjectKey::from_workspace(Path::new("/nonexistent/a/b"))
679 );
680 }
681
682 #[tokio::test]
683 async fn rebuild_converges_after_partial_write() {
684 let (_d, s) = store();
685 let k = key();
686 let loc = SessionLoc::Child {
687 key: k.clone(),
688 parent_id: "p1".into(),
689 child_id: "c1".into(),
690 };
691 s.save_session(&loc, &child_payload("c1", "researcher", "running"))
693 .await
694 .unwrap();
695 s.write_json(
696 &s.children_index_file(&k, "p1"),
697 &ChildrenIndex {
698 version: 0,
699 children: vec![ChildEntry {
700 child_id: "c1".into(),
701 subagent_type: "researcher".into(),
702 status: ChildStatus::Running,
703 title: "c1".into(),
704 responsibility: "do c1".into(),
705 updated_at: ts(),
706 }],
707 },
708 )
709 .await
710 .unwrap();
711 assert_eq!(s.resolve_child(&k, "c1").await.unwrap(), None);
713
714 s.rebuild_index(&k, &Extract).await.unwrap();
715 assert!(s.resolve_child(&k, "c1").await.unwrap().is_some());
717 }
718
719 #[tokio::test(flavor = "multi_thread")]
720 async fn concurrent_upserts_do_not_lose_children() {
721 use std::sync::Arc;
722
723 let dir = TempDir::new().unwrap();
724 let s = Arc::new(SubagentStore::open(dir.path()));
725 let k = key();
726
727 let mut handles = Vec::new();
728 for i in 0..16 {
729 let s = s.clone();
730 let k = k.clone();
731 handles.push(tokio::spawn(async move {
732 let child_id = format!("c{i}");
733 let entry = ChildEntry {
734 child_id: child_id.clone(),
735 subagent_type: "coder".into(),
736 status: ChildStatus::Pending,
737 title: child_id.clone(),
738 responsibility: format!("do {child_id}"),
739 updated_at: ts(),
740 };
741 s.upsert_child(&k, "p1", entry).await.unwrap();
742 }));
743 }
744 for h in handles {
745 h.await.unwrap();
746 }
747
748 let listed = s.list_children(&k, "p1").await.unwrap();
749 assert_eq!(
750 listed.len(),
751 16,
752 "all 16 children must survive concurrent upserts"
753 );
754
755 let resolved = s.resolve_child(&k, "c7").await.unwrap();
756 assert!(
757 resolved.is_some(),
758 "resolve_child(\"c7\") must hit after concurrent upserts"
759 );
760 }
761}