Skip to main content

bamboo_subagent/
store.rs

1//! Project-keyed session store + denormalized indices.
2//!
3//! Layout (see design §5.2):
4//! ```text
5//! <root>/projects/<key>/
6//!   index.json                         ProjectIndex(roots + child_lookup)
7//!   sessions/<parent-id>/
8//!     session.json                     opaque session payload (authoritative)
9//!     children.json                    ChildrenIndex (denormalized cache)
10//!     mailbox/{new,cur,corrupt}/       see `mailbox`
11//!     children/<child-id>/
12//!       session.json                   opaque session payload (authoritative, isolated)
13//!       mailbox/{new,cur,corrupt}/
14//! ```
15//!
16//! Invariants:
17//! - `session.json` is authoritative; `index.json` / `children.json` are caches, fully
18//!   rebuildable via [`SubagentStore::rebuild_index`].
19//! - Index mutations are serialized within this process by an internal `tokio::sync::Mutex`
20//!   (`write_lock`), so concurrent `upsert_*` / `remove_*` / `rebuild_index` calls cannot
21//!   silently overwrite each other. Cross-process safety still relies on the registry being
22//!   the sole writer (no file-level locking).
23//! - Every write is atomic (temp + rename). Aggregates are kept sorted by id for determinism.
24
25use std::collections::BTreeMap;
26use std::io::ErrorKind;
27use std::path::{Path, PathBuf};
28
29use chrono::{DateTime, Utc};
30use serde::de::DeserializeOwned;
31use serde::{Deserialize, Serialize};
32
33use crate::error::{atomic_write, Result, StoreError};
34use crate::mailbox::Mailbox;
35
36/// Stable encoding of a project's workspace path (mirrors `~/.claude/projects/<key>`).
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct ProjectKey(String);
39
40impl ProjectKey {
41    /// Derive a filesystem-safe key from a workspace path. Deterministic.
42    ///
43    /// The path is canonicalized when possible (so symlinks/relative spellings
44    /// of the same workspace agree), then folded to a readable slug PLUS a
45    /// short hash of the exact path — the hash disambiguates paths whose
46    /// folded forms collide (`/a/b` vs `/a-b`).
47    pub fn from_workspace(workspace: &Path) -> Self {
48        let canonical =
49            std::fs::canonicalize(workspace).unwrap_or_else(|_| workspace.to_path_buf());
50        let raw = canonical.to_string_lossy();
51        let folded: String = raw
52            .chars()
53            .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
54            .collect();
55        ProjectKey(format!("{folded}-{:08x}", fnv1a64(raw.as_bytes()) as u32))
56    }
57
58    /// Wrap an already-computed key verbatim.
59    pub fn from_raw(key: impl Into<String>) -> Self {
60        ProjectKey(key.into())
61    }
62
63    pub fn as_str(&self) -> &str {
64        &self.0
65    }
66}
67
68/// Logical location of a session: a root (parent) or a child under a parent.
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum SessionLoc {
71    Root {
72        key: ProjectKey,
73        session_id: String,
74    },
75    Child {
76        key: ProjectKey,
77        parent_id: String,
78        child_id: String,
79    },
80}
81
82/// Project index: all root sessions + an `child_id -> parent_id` lookup table (O(1) resolve).
83#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
84pub struct ProjectIndex {
85    #[serde(default)]
86    pub version: u32,
87    #[serde(default)]
88    pub roots: Vec<RootEntry>,
89    #[serde(default)]
90    pub child_lookup: BTreeMap<String, String>,
91}
92
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
94pub struct RootEntry {
95    pub session_id: String,
96    pub title: String,
97    pub updated_at: DateTime<Utc>,
98}
99
100/// Per-parent index: a denormalized list of children for one-read listing.
101#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
102pub struct ChildrenIndex {
103    #[serde(default)]
104    pub version: u32,
105    #[serde(default)]
106    pub children: Vec<ChildEntry>,
107}
108
109#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
110pub struct ChildEntry {
111    pub child_id: String,
112    pub subagent_type: String,
113    pub status: ChildStatus,
114    pub title: String,
115    pub responsibility: String,
116    pub updated_at: DateTime<Utc>,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
120#[serde(rename_all = "snake_case")]
121pub enum ChildStatus {
122    Pending,
123    Running,
124    Idle,
125    Completed,
126    Error,
127    Cancelled,
128}
129
130/// Fields the index needs from a root session payload (for rebuild).
131#[derive(Debug, Clone, PartialEq)]
132pub struct RootFields {
133    pub title: String,
134    pub updated_at: DateTime<Utc>,
135}
136
137/// Fields the index needs from a child session payload (for rebuild).
138#[derive(Debug, Clone, PartialEq)]
139pub struct ChildFields {
140    pub subagent_type: String,
141    pub status: ChildStatus,
142    pub title: String,
143    pub responsibility: String,
144    pub updated_at: DateTime<Utc>,
145}
146
147/// Decouples index rebuild from the (opaque) session payload shape: the caller knows how to
148/// read its own session JSON, the store knows the directory structure.
149pub trait MetaExtractor: Sync {
150    fn root(&self, session_id: &str, payload: &serde_json::Value) -> RootFields;
151    fn child(&self, child_id: &str, payload: &serde_json::Value) -> ChildFields;
152}
153
154/// Filesystem-backed store rooted at `<root>` (default `~/.bamboo`, injected for tests).
155pub struct SubagentStore {
156    root: PathBuf,
157    /// In-process mutex that serializes all index read-modify-write sequences.
158    write_lock: tokio::sync::Mutex<()>,
159}
160
161impl SubagentStore {
162    pub fn open(root: impl Into<PathBuf>) -> Self {
163        Self {
164            root: root.into(),
165            write_lock: tokio::sync::Mutex::new(()),
166        }
167    }
168
169    // ---- path layout -------------------------------------------------------
170
171    fn project_dir(&self, key: &ProjectKey) -> PathBuf {
172        self.root.join("projects").join(key.as_str())
173    }
174    fn index_file(&self, key: &ProjectKey) -> PathBuf {
175        self.project_dir(key).join("index.json")
176    }
177    fn sessions_dir(&self, key: &ProjectKey) -> PathBuf {
178        self.project_dir(key).join("sessions")
179    }
180    fn parent_dir(&self, key: &ProjectKey, parent_id: &str) -> PathBuf {
181        self.sessions_dir(key).join(parent_id)
182    }
183    fn children_index_file(&self, key: &ProjectKey, parent_id: &str) -> PathBuf {
184        self.parent_dir(key, parent_id).join("children.json")
185    }
186    fn child_dir(&self, key: &ProjectKey, parent_id: &str, child_id: &str) -> PathBuf {
187        self.parent_dir(key, parent_id)
188            .join("children")
189            .join(child_id)
190    }
191    fn session_dir(&self, loc: &SessionLoc) -> PathBuf {
192        match loc {
193            SessionLoc::Root { key, session_id } => self.parent_dir(key, session_id),
194            SessionLoc::Child {
195                key,
196                parent_id,
197                child_id,
198            } => self.child_dir(key, parent_id, child_id),
199        }
200    }
201    fn session_file(&self, loc: &SessionLoc) -> PathBuf {
202        self.session_dir(loc).join("session.json")
203    }
204
205    /// Mailbox handle for the actor at `loc` (`<session_dir>/mailbox`).
206    pub fn mailbox(&self, loc: &SessionLoc) -> Mailbox {
207        Mailbox::at(self.session_dir(loc).join("mailbox"))
208    }
209
210    // ---- session payload (opaque, atomic) ---------------------------------
211
212    pub async fn save_session<T: Serialize>(&self, loc: &SessionLoc, payload: &T) -> Result<()> {
213        let path = self.session_file(loc);
214        let bytes = serde_json::to_vec_pretty(payload).map_err(|e| StoreError::decode(&path, e))?;
215        atomic_write(&path, &bytes).await
216    }
217
218    pub async fn load_session<T: DeserializeOwned>(&self, loc: &SessionLoc) -> Result<T> {
219        let path = self.session_file(loc);
220        let bytes = tokio::fs::read(&path)
221            .await
222            .map_err(|e| StoreError::io(&path, e))?;
223        serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(&path, e))
224    }
225
226    pub async fn session_exists(&self, loc: &SessionLoc) -> bool {
227        tokio::fs::try_exists(self.session_file(loc))
228            .await
229            .unwrap_or(false)
230    }
231
232    // ---- index reads -------------------------------------------------------
233
234    pub async fn list_roots(&self, key: &ProjectKey) -> Result<Vec<RootEntry>> {
235        let idx: ProjectIndex = self.read_json(&self.index_file(key)).await?;
236        Ok(idx.roots)
237    }
238
239    pub async fn list_children(
240        &self,
241        key: &ProjectKey,
242        parent_id: &str,
243    ) -> Result<Vec<ChildEntry>> {
244        let idx: ChildrenIndex = self
245            .read_json(&self.children_index_file(key, parent_id))
246            .await?;
247        Ok(idx.children)
248    }
249
250    /// O(1) resolve: consult the project `child_lookup` table.
251    pub async fn resolve_child(
252        &self,
253        key: &ProjectKey,
254        child_id: &str,
255    ) -> Result<Option<SessionLoc>> {
256        let idx: ProjectIndex = self.read_json(&self.index_file(key)).await?;
257        Ok(idx
258            .child_lookup
259            .get(child_id)
260            .map(|parent_id| SessionLoc::Child {
261                key: key.clone(),
262                parent_id: parent_id.clone(),
263                child_id: child_id.to_string(),
264            }))
265    }
266
267    // ---- index writes (single-writer = registry) --------------------------
268
269    pub async fn upsert_root(&self, key: &ProjectKey, entry: RootEntry) -> Result<()> {
270        let _guard = self.write_lock.lock().await;
271        let path = self.index_file(key);
272        let mut idx: ProjectIndex = self.read_json(&path).await?;
273        match idx
274            .roots
275            .iter_mut()
276            .find(|r| r.session_id == entry.session_id)
277        {
278            Some(slot) => *slot = entry,
279            None => idx.roots.push(entry),
280        }
281        idx.roots.sort_by(|a, b| a.session_id.cmp(&b.session_id));
282        self.write_json(&path, &idx).await
283    }
284
285    pub async fn upsert_child(
286        &self,
287        key: &ProjectKey,
288        parent_id: &str,
289        entry: ChildEntry,
290    ) -> Result<()> {
291        let _guard = self.write_lock.lock().await;
292        // 1. children.json (per parent)
293        let cpath = self.children_index_file(key, parent_id);
294        let mut cidx: ChildrenIndex = self.read_json(&cpath).await?;
295        match cidx
296            .children
297            .iter_mut()
298            .find(|c| c.child_id == entry.child_id)
299        {
300            Some(slot) => *slot = entry.clone(),
301            None => cidx.children.push(entry.clone()),
302        }
303        cidx.children.sort_by(|a, b| a.child_id.cmp(&b.child_id));
304        self.write_json(&cpath, &cidx).await?;
305
306        // 2. index.json child_lookup (written after, so a crash converges on rebuild)
307        let ipath = self.index_file(key);
308        let mut idx: ProjectIndex = self.read_json(&ipath).await?;
309        idx.child_lookup
310            .insert(entry.child_id, parent_id.to_string());
311        self.write_json(&ipath, &idx).await
312    }
313
314    pub async fn remove_child(
315        &self,
316        key: &ProjectKey,
317        parent_id: &str,
318        child_id: &str,
319    ) -> Result<()> {
320        let _guard = self.write_lock.lock().await;
321        let cpath = self.children_index_file(key, parent_id);
322        let mut cidx: ChildrenIndex = self.read_json(&cpath).await?;
323        cidx.children.retain(|c| c.child_id != child_id);
324        self.write_json(&cpath, &cidx).await?;
325
326        let ipath = self.index_file(key);
327        let mut idx: ProjectIndex = self.read_json(&ipath).await?;
328        idx.child_lookup.remove(child_id);
329        self.write_json(&ipath, &idx).await
330    }
331
332    // ---- self-heal ---------------------------------------------------------
333
334    /// Rebuild `index.json` + every `children.json` by scanning the session payloads.
335    /// Authoritative recovery path: safe to call any time; idempotent.
336    pub async fn rebuild_index(
337        &self,
338        key: &ProjectKey,
339        extractor: &dyn MetaExtractor,
340    ) -> Result<()> {
341        let _guard = self.write_lock.lock().await;
342        let sessions = self.sessions_dir(key);
343        let mut idx = ProjectIndex::default();
344
345        let mut parents = match tokio::fs::read_dir(&sessions).await {
346            Ok(rd) => rd,
347            Err(e) if e.kind() == ErrorKind::NotFound => {
348                // No sessions yet: write an empty project index.
349                return self.write_json(&self.index_file(key), &idx).await;
350            }
351            Err(e) => return Err(StoreError::io(&sessions, e)),
352        };
353
354        while let Some(p) = parents
355            .next_entry()
356            .await
357            .map_err(|e| StoreError::io(&sessions, e))?
358        {
359            if !is_dir(&p).await {
360                continue;
361            }
362            let parent_id = p.file_name().to_string_lossy().into_owned();
363
364            if let Some(val) = self.try_read_value(&p.path().join("session.json")).await? {
365                let rf = extractor.root(&parent_id, &val);
366                idx.roots.push(RootEntry {
367                    session_id: parent_id.clone(),
368                    title: rf.title,
369                    updated_at: rf.updated_at,
370                });
371            }
372
373            // children
374            let mut cidx = ChildrenIndex::default();
375            let cdir = p.path().join("children");
376            if let Ok(mut kids) = tokio::fs::read_dir(&cdir).await {
377                while let Some(c) = kids
378                    .next_entry()
379                    .await
380                    .map_err(|e| StoreError::io(&cdir, e))?
381                {
382                    if !is_dir(&c).await {
383                        continue;
384                    }
385                    let child_id = c.file_name().to_string_lossy().into_owned();
386                    if let Some(val) = self.try_read_value(&c.path().join("session.json")).await? {
387                        let cf = extractor.child(&child_id, &val);
388                        cidx.children.push(ChildEntry {
389                            child_id: child_id.clone(),
390                            subagent_type: cf.subagent_type,
391                            status: cf.status,
392                            title: cf.title,
393                            responsibility: cf.responsibility,
394                            updated_at: cf.updated_at,
395                        });
396                        idx.child_lookup.insert(child_id, parent_id.clone());
397                    }
398                }
399            }
400            cidx.children.sort_by(|a, b| a.child_id.cmp(&b.child_id));
401            self.write_json(&self.children_index_file(key, &parent_id), &cidx)
402                .await?;
403        }
404
405        idx.roots.sort_by(|a, b| a.session_id.cmp(&b.session_id));
406        self.write_json(&self.index_file(key), &idx).await
407    }
408
409    // ---- helpers -----------------------------------------------------------
410
411    async fn read_json<T: DeserializeOwned + Default>(&self, path: &Path) -> Result<T> {
412        match tokio::fs::read(path).await {
413            Ok(bytes) => serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(path, e)),
414            Err(e) if e.kind() == ErrorKind::NotFound => Ok(T::default()),
415            Err(e) => Err(StoreError::io(path, e)),
416        }
417    }
418
419    async fn try_read_value(&self, path: &Path) -> Result<Option<serde_json::Value>> {
420        match tokio::fs::read(path).await {
421            Ok(bytes) => {
422                let v = serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(path, e))?;
423                Ok(Some(v))
424            }
425            Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
426            Err(e) => Err(StoreError::io(path, e)),
427        }
428    }
429
430    async fn write_json<T: Serialize>(&self, path: &Path, value: &T) -> Result<()> {
431        let bytes = serde_json::to_vec_pretty(value).map_err(|e| StoreError::decode(path, e))?;
432        atomic_write(path, &bytes).await
433    }
434}
435
436async fn is_dir(entry: &tokio::fs::DirEntry) -> bool {
437    match entry.file_type().await {
438        Ok(ft) => ft.is_dir(),
439        Err(_) => false,
440    }
441}
442
443/// FNV-1a 64-bit — tiny, dependency-free, deterministic path fingerprint.
444fn fnv1a64(bytes: &[u8]) -> u64 {
445    let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
446    for b in bytes {
447        hash ^= u64::from(*b);
448        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
449    }
450    hash
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use chrono::TimeZone;
457    use serde_json::json;
458    use tempfile::TempDir;
459
460    fn ts() -> DateTime<Utc> {
461        Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap()
462    }
463
464    fn key() -> ProjectKey {
465        ProjectKey::from_raw("proj")
466    }
467
468    fn store() -> (TempDir, SubagentStore) {
469        let dir = TempDir::new().unwrap();
470        let store = SubagentStore::open(dir.path());
471        (dir, store)
472    }
473
474    fn child_payload(title: &str, kind: &str, status: &str) -> serde_json::Value {
475        json!({
476            "title": title,
477            "subagent_type": kind,
478            "status": status,
479            "responsibility": format!("do {title}"),
480            "updated_at": ts().to_rfc3339(),
481        })
482    }
483
484    /// Test extractor: read the index fields straight out of the opaque payload.
485    struct Extract;
486    impl MetaExtractor for Extract {
487        fn root(&self, _id: &str, p: &serde_json::Value) -> RootFields {
488            RootFields {
489                title: p["title"].as_str().unwrap_or_default().to_string(),
490                updated_at: parse_ts(&p["updated_at"]),
491            }
492        }
493        fn child(&self, _id: &str, p: &serde_json::Value) -> ChildFields {
494            ChildFields {
495                subagent_type: p["subagent_type"].as_str().unwrap_or_default().to_string(),
496                status: parse_status(p["status"].as_str().unwrap_or("pending")),
497                title: p["title"].as_str().unwrap_or_default().to_string(),
498                responsibility: p["responsibility"].as_str().unwrap_or_default().to_string(),
499                updated_at: parse_ts(&p["updated_at"]),
500            }
501        }
502    }
503    fn parse_ts(v: &serde_json::Value) -> DateTime<Utc> {
504        v.as_str()
505            .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
506            .map(|d| d.with_timezone(&Utc))
507            .unwrap_or_else(ts)
508    }
509    fn parse_status(s: &str) -> ChildStatus {
510        match s {
511            "running" => ChildStatus::Running,
512            "idle" => ChildStatus::Idle,
513            "completed" => ChildStatus::Completed,
514            "error" => ChildStatus::Error,
515            "cancelled" => ChildStatus::Cancelled,
516            _ => ChildStatus::Pending,
517        }
518    }
519
520    #[tokio::test]
521    async fn session_round_trips() {
522        let (_d, s) = store();
523        let loc = SessionLoc::Root {
524            key: key(),
525            session_id: "p1".into(),
526        };
527        let payload = json!({"hello": "world", "n": 42});
528        s.save_session(&loc, &payload).await.unwrap();
529        assert!(s.session_exists(&loc).await);
530        let got: serde_json::Value = s.load_session(&loc).await.unwrap();
531        assert_eq!(got, payload);
532    }
533
534    #[tokio::test]
535    async fn upsert_list_and_resolve_child() {
536        let (_d, s) = store();
537        let k = key();
538        let entry = ChildEntry {
539            child_id: "c1".into(),
540            subagent_type: "researcher".into(),
541            status: ChildStatus::Running,
542            title: "t".into(),
543            responsibility: "r".into(),
544            updated_at: ts(),
545        };
546        s.upsert_child(&k, "p1", entry.clone()).await.unwrap();
547
548        let listed = s.list_children(&k, "p1").await.unwrap();
549        assert_eq!(listed, vec![entry]);
550
551        let loc = s.resolve_child(&k, "c1").await.unwrap();
552        assert_eq!(
553            loc,
554            Some(SessionLoc::Child {
555                key: k.clone(),
556                parent_id: "p1".into(),
557                child_id: "c1".into(),
558            })
559        );
560        assert_eq!(s.resolve_child(&k, "missing").await.unwrap(), None);
561    }
562
563    #[tokio::test]
564    async fn upsert_replaces_in_place() {
565        let (_d, s) = store();
566        let k = key();
567        let mut e = ChildEntry {
568            child_id: "c1".into(),
569            subagent_type: "x".into(),
570            status: ChildStatus::Pending,
571            title: "t".into(),
572            responsibility: "r".into(),
573            updated_at: ts(),
574        };
575        s.upsert_child(&k, "p1", e.clone()).await.unwrap();
576        e.status = ChildStatus::Completed;
577        s.upsert_child(&k, "p1", e.clone()).await.unwrap();
578        let listed = s.list_children(&k, "p1").await.unwrap();
579        assert_eq!(listed.len(), 1);
580        assert_eq!(listed[0].status, ChildStatus::Completed);
581    }
582
583    #[tokio::test]
584    async fn remove_child_clears_index_and_lookup() {
585        let (_d, s) = store();
586        let k = key();
587        let e = ChildEntry {
588            child_id: "c1".into(),
589            subagent_type: "x".into(),
590            status: ChildStatus::Pending,
591            title: "t".into(),
592            responsibility: "r".into(),
593            updated_at: ts(),
594        };
595        s.upsert_child(&k, "p1", e).await.unwrap();
596        s.remove_child(&k, "p1", "c1").await.unwrap();
597        assert!(s.list_children(&k, "p1").await.unwrap().is_empty());
598        assert_eq!(s.resolve_child(&k, "c1").await.unwrap(), None);
599    }
600
601    #[tokio::test]
602    async fn rebuild_matches_incremental() {
603        let (_d, s) = store();
604        let k = key();
605
606        // Author session payloads (authoritative) + maintain indices incrementally.
607        let root = SessionLoc::Root {
608            key: k.clone(),
609            session_id: "p1".into(),
610        };
611        s.save_session(
612            &root,
613            &json!({"title": "Parent", "updated_at": ts().to_rfc3339()}),
614        )
615        .await
616        .unwrap();
617        s.upsert_root(
618            &k,
619            RootEntry {
620                session_id: "p1".into(),
621                title: "Parent".into(),
622                updated_at: ts(),
623            },
624        )
625        .await
626        .unwrap();
627
628        for (cid, kind) in [("c1", "researcher"), ("c2", "coder")] {
629            let loc = SessionLoc::Child {
630                key: k.clone(),
631                parent_id: "p1".into(),
632                child_id: cid.into(),
633            };
634            s.save_session(&loc, &child_payload(cid, kind, "running"))
635                .await
636                .unwrap();
637            s.upsert_child(
638                &k,
639                "p1",
640                ChildEntry {
641                    child_id: cid.into(),
642                    subagent_type: kind.into(),
643                    status: ChildStatus::Running,
644                    title: cid.into(),
645                    responsibility: format!("do {cid}"),
646                    updated_at: ts(),
647                },
648            )
649            .await
650            .unwrap();
651        }
652
653        let index_path = s.index_file(&k);
654        let children_path = s.children_index_file(&k, "p1");
655        let before_index: ProjectIndex = s.read_json(&index_path).await.unwrap();
656        let before_children: ChildrenIndex = s.read_json(&children_path).await.unwrap();
657
658        // Nuke the caches and rebuild from session payloads.
659        tokio::fs::remove_file(&index_path).await.unwrap();
660        tokio::fs::remove_file(&children_path).await.unwrap();
661        s.rebuild_index(&k, &Extract).await.unwrap();
662
663        let after_index: ProjectIndex = s.read_json(&index_path).await.unwrap();
664        let after_children: ChildrenIndex = s.read_json(&children_path).await.unwrap();
665        assert_eq!(after_index, before_index);
666        assert_eq!(after_children, before_children);
667    }
668
669    #[test]
670    fn project_key_distinguishes_colliding_folds_and_is_deterministic() {
671        // `/a/b` and `/a-b` fold to the same slug; the hash must split them.
672        let k1 = ProjectKey::from_workspace(Path::new("/nonexistent/a/b"));
673        let k2 = ProjectKey::from_workspace(Path::new("/nonexistent/a-b"));
674        assert_ne!(k1, k2);
675        // deterministic
676        assert_eq!(
677            k1,
678            ProjectKey::from_workspace(Path::new("/nonexistent/a/b"))
679        );
680    }
681
682    #[tokio::test]
683    async fn rebuild_converges_after_partial_write() {
684        let (_d, s) = store();
685        let k = key();
686        let loc = SessionLoc::Child {
687            key: k.clone(),
688            parent_id: "p1".into(),
689            child_id: "c1".into(),
690        };
691        // children.json written but index.json child_lookup missing (simulated crash mid-upsert).
692        s.save_session(&loc, &child_payload("c1", "researcher", "running"))
693            .await
694            .unwrap();
695        s.write_json(
696            &s.children_index_file(&k, "p1"),
697            &ChildrenIndex {
698                version: 0,
699                children: vec![ChildEntry {
700                    child_id: "c1".into(),
701                    subagent_type: "researcher".into(),
702                    status: ChildStatus::Running,
703                    title: "c1".into(),
704                    responsibility: "do c1".into(),
705                    updated_at: ts(),
706                }],
707            },
708        )
709        .await
710        .unwrap();
711        // lookup is empty -> resolve misses
712        assert_eq!(s.resolve_child(&k, "c1").await.unwrap(), None);
713
714        s.rebuild_index(&k, &Extract).await.unwrap();
715        // now converged
716        assert!(s.resolve_child(&k, "c1").await.unwrap().is_some());
717    }
718
719    #[tokio::test(flavor = "multi_thread")]
720    async fn concurrent_upserts_do_not_lose_children() {
721        use std::sync::Arc;
722
723        let dir = TempDir::new().unwrap();
724        let s = Arc::new(SubagentStore::open(dir.path()));
725        let k = key();
726
727        let mut handles = Vec::new();
728        for i in 0..16 {
729            let s = s.clone();
730            let k = k.clone();
731            handles.push(tokio::spawn(async move {
732                let child_id = format!("c{i}");
733                let entry = ChildEntry {
734                    child_id: child_id.clone(),
735                    subagent_type: "coder".into(),
736                    status: ChildStatus::Pending,
737                    title: child_id.clone(),
738                    responsibility: format!("do {child_id}"),
739                    updated_at: ts(),
740                };
741                s.upsert_child(&k, "p1", entry).await.unwrap();
742            }));
743        }
744        for h in handles {
745            h.await.unwrap();
746        }
747
748        let listed = s.list_children(&k, "p1").await.unwrap();
749        assert_eq!(
750            listed.len(),
751            16,
752            "all 16 children must survive concurrent upserts"
753        );
754
755        let resolved = s.resolve_child(&k, "c7").await.unwrap();
756        assert!(
757            resolved.is_some(),
758            "resolve_child(\"c7\") must hit after concurrent upserts"
759        );
760    }
761}