Skip to main content

objects/store/
agent_registry.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Agent registry: lightweight discovery index for parallel agent sessions.
3//!
4//! Stores one TOML file per active agent in `.heddle/agents/<session-id>.toml`.
5//! The registry does NOT manage worktrees or refs — those remain independent.
6//! It exists purely so an orchestrator can query which agents are in flight.
7
8use std::{
9    collections::{HashMap, HashSet},
10    path::{Path, PathBuf},
11};
12
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15
16use crate::{
17    lock::RepoLock,
18    store::{
19        HeddleError, Result,
20        atomic::write_file_atomic,
21        liveness::{Liveness, is_owner_alive},
22    },
23};
24
25const STALE_AGENT_TTL_DAYS: i64 = 7;
26
27/// Outcome of an attempt to reserve an active session on a thread.
28///
29/// `LiveOwner` carries the existing reservation so callers can compare
30/// anchors and surface either an "already reserved by another live
31/// process" or an "anchor drift" error. The registry reaps dead owners
32/// in-place before producing this outcome — so an `Active` entry seen
33/// here is guaranteed to have been alive at the moment of the check.
34#[derive(Debug)]
35pub enum ReserveOutcome {
36    /// Reservation succeeded; the new entry is included.
37    Reserved(AgentEntry),
38    /// Another live agent already holds this thread.
39    LiveOwner(AgentEntry),
40}
41
42/// A record of one `heddle context get` call made during an agent session.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ContextQueryEntry {
45    /// The file path that was queried.
46    pub path: String,
47    /// The scope filter used, if any (e.g. `symbol:parse_manifest`).
48    pub scope: Option<String>,
49    /// When the query was made.
50    pub queried_at: DateTime<Utc>,
51}
52
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct AgentUsageSummary {
55    #[serde(default)]
56    pub input_tokens: Option<u64>,
57    #[serde(default)]
58    pub output_tokens: Option<u64>,
59    #[serde(default)]
60    pub reasoning_tokens: Option<u64>,
61    #[serde(default)]
62    pub tool_calls: Option<u32>,
63    #[serde(default)]
64    pub cost_micros_usd: Option<u64>,
65}
66
67/// Status of an agent session.
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum AgentStatus {
71    /// Agent is actively working.
72    Active,
73    /// Agent's reservation was left behind and has been reaped.
74    Abandoned,
75    /// Agent has finished work (snapshot taken) but not yet merged.
76    Complete,
77    /// Agent's thread has been merged into the base thread.
78    Merged,
79}
80
81impl std::fmt::Display for AgentStatus {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        match self {
84            AgentStatus::Active => write!(f, "active"),
85            AgentStatus::Abandoned => write!(f, "abandoned"),
86            AgentStatus::Complete => write!(f, "complete"),
87            AgentStatus::Merged => write!(f, "merged"),
88        }
89    }
90}
91
92/// A registry entry describing one active (or recently finished) agent session.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct AgentEntry {
95    /// Unique session identifier (e.g. `agent-xxxxxxxxxxxx`).
96    pub session_id: String,
97    /// Stable harness-side instance identifier used to reconnect the same
98    /// local client process to its registry entry across bridge restarts.
99    #[serde(default)]
100    pub client_instance_id: Option<String>,
101    /// Harness-native actor identity such as `codex:thread:thr_123`.
102    #[serde(default)]
103    pub native_actor_key: Option<String>,
104    /// Harness-native parent actor identity for child/subagent sessions.
105    #[serde(default)]
106    pub native_parent_actor_key: Option<String>,
107    /// Harness-native reconnect key such as a transcript path or client name.
108    #[serde(default)]
109    pub native_instance_key: Option<String>,
110    /// Heddle session identifier when this registry entry is attached to a
111    /// first-class Heddle multi-segment session.
112    #[serde(default)]
113    pub heddle_session_id: Option<String>,
114    /// Thread identifier when the session is attached to a Heddle thread record.
115    #[serde(default)]
116    pub thread_id: Option<String>,
117    /// The Heddle thread the agent writes to.
118    pub thread: String,
119    /// Process id that created or last renewed the reservation.
120    #[serde(default)]
121    pub pid: Option<u32>,
122    /// Host boot identifier when available; differentiates reused PIDs.
123    #[serde(default)]
124    pub boot_id: Option<String>,
125    /// Advisory liveness lock file path for future long-lived runners.
126    #[serde(default)]
127    pub liveness_path: Option<PathBuf>,
128    /// Most recent reservation heartbeat.
129    #[serde(default)]
130    pub heartbeat_at: Option<DateTime<Utc>>,
131    /// Full state id the session was anchored to.
132    #[serde(default)]
133    pub anchor_state: Option<String>,
134    /// Root tree id the session was anchored to.
135    #[serde(default)]
136    pub anchor_root: Option<String>,
137    /// Opaque token returned to programmatic agent clients.
138    #[serde(default)]
139    pub reservation_token: Option<String>,
140    /// Absolute path to the agent's checkout directory, if filesystem-based.
141    #[serde(default)]
142    pub path: Option<PathBuf>,
143    /// Short display form of the base state the agent started from.
144    pub base_state: String,
145    /// When the agent session was created.
146    pub started_at: DateTime<Utc>,
147    /// AI provider (e.g. `anthropic`).
148    pub provider: Option<String>,
149    /// AI model (e.g. `claude-sonnet-4-6`).
150    pub model: Option<String>,
151    /// Harness or client name (e.g. `claude-code`, `codex`).
152    #[serde(default)]
153    pub harness: Option<String>,
154    /// Harness-specific reasoning/thinking level when available.
155    #[serde(default)]
156    pub thinking_level: Option<String>,
157    /// Aggregated usage counters captured for the active session.
158    #[serde(default)]
159    pub usage_summary: AgentUsageSummary,
160    /// Most recent progress heartbeat timestamp.
161    #[serde(default)]
162    pub last_progress_at: Option<DateTime<Utc>>,
163    /// Summary flush state for the local session reporter.
164    #[serde(default)]
165    pub report_flush_state: Option<String>,
166    /// Most recent explanation of why Heddle attached this actor to its current
167    /// thread/session context.
168    #[serde(default)]
169    pub attach_reason: Option<String>,
170    /// Ordered explanation of attach rules Heddle evaluated.
171    #[serde(default)]
172    pub attach_precedence: Vec<String>,
173    /// The attach rule that won for this actor.
174    #[serde(default)]
175    pub winning_attach_rule: Option<String>,
176    /// Where Heddle learned the harness identity from.
177    #[serde(default)]
178    pub probe_source: Option<String>,
179    /// How confident Heddle was in the probe result.
180    #[serde(default)]
181    pub probe_confidence: Option<f32>,
182    /// Current status.
183    pub status: AgentStatus,
184    /// When the agent was marked complete or merged.
185    #[serde(default)]
186    pub completed_at: Option<DateTime<Utc>>,
187    /// Log of `heddle context get` calls made during this session.
188    /// Appended by the CLI each time an agent queries context from its worktree.
189    #[serde(default)]
190    pub context_queries: Vec<ContextQueryEntry>,
191}
192
193/// One hop in an actor ancestry chain, ordered root to leaf.
194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub struct ActorChainNode {
196    pub session_id: String,
197    #[serde(default)]
198    pub native_actor_key: Option<String>,
199    #[serde(default)]
200    pub native_parent_actor_key: Option<String>,
201    pub thread: String,
202    pub status: AgentStatus,
203    #[serde(default)]
204    pub provider: Option<String>,
205    #[serde(default)]
206    pub model: Option<String>,
207    #[serde(default)]
208    pub harness: Option<String>,
209}
210
211impl From<&AgentEntry> for ActorChainNode {
212    fn from(entry: &AgentEntry) -> Self {
213        Self {
214            session_id: entry.session_id.clone(),
215            native_actor_key: entry.native_actor_key.clone(),
216            native_parent_actor_key: entry.native_parent_actor_key.clone(),
217            thread: entry.thread.clone(),
218            status: entry.status.clone(),
219            provider: entry.provider.clone(),
220            model: entry.model.clone(),
221            harness: entry.harness.clone(),
222        }
223    }
224}
225
226/// Manages agent registry entries stored in `.heddle/agents/`.
227pub struct AgentRegistry {
228    agents_dir: PathBuf,
229}
230
231impl AgentRegistry {
232    /// Create a new registry backed by `<heddle_dir>/agents/`.
233    pub fn new(heddle_dir: &Path) -> Self {
234        Self {
235            agents_dir: heddle_dir.join("agents"),
236        }
237    }
238
239    fn entry_path(&self, session_id: &str) -> Result<PathBuf> {
240        // Only allow characters produced by generate_agent_id: lowercase
241        // alphanumeric and hyphens.  This makes path traversal structurally
242        // impossible: none of [a-z0-9-] can form ".." or "/".
243        if session_id.is_empty()
244            || !session_id
245                .bytes()
246                .all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
247        {
248            return Err(HeddleError::Config(format!(
249                "invalid session ID '{}': only lowercase alphanumeric and hyphens allowed",
250                session_id
251            )));
252        }
253        Ok(self.agents_dir.join(format!("{}.toml", session_id)))
254    }
255
256    fn lock_path(&self) -> PathBuf {
257        self.agents_dir.join(".lock")
258    }
259
260    fn write_lock(&self) -> Result<crate::lock::WriteLockGuard> {
261        RepoLock::at(self.lock_path()).write().map_err(|err| {
262            HeddleError::Config(format!("failed to acquire agent registry lock: {err}"))
263        })
264    }
265
266    fn write_entry_file(&self, entry: &AgentEntry) -> Result<()> {
267        std::fs::create_dir_all(&self.agents_dir)?;
268        let path = self.entry_path(&entry.session_id)?;
269        let content =
270            toml::to_string_pretty(entry).map_err(|e| HeddleError::Config(e.to_string()))?;
271        Ok(write_file_atomic(&path, content.as_bytes())?)
272    }
273
274    fn load_entry_from_path(&self, path: &Path) -> Result<Option<AgentEntry>> {
275        if !path.exists() {
276            return Ok(None);
277        }
278
279        let content = std::fs::read_to_string(path)?;
280        let entry = toml::from_str(&content).map_err(|e| HeddleError::Config(e.to_string()))?;
281        Ok(Some(entry))
282    }
283
284    fn is_stale_terminal_entry(&self, entry: &AgentEntry) -> bool {
285        if matches!(entry.status, AgentStatus::Active) {
286            return false;
287        }
288
289        let terminal_at = entry.completed_at.unwrap_or(entry.started_at);
290        terminal_at <= Utc::now() - chrono::Duration::days(STALE_AGENT_TTL_DAYS)
291    }
292
293    fn prune_stale_entry_path(&self, path: &Path) -> Result<()> {
294        if path.exists() {
295            std::fs::remove_file(path)?;
296        }
297        Ok(())
298    }
299
300    /// Liveness verdict for an active entry. Terminal entries (anything
301    /// other than `Active`) are reported as `Dead` so reaping logic can
302    /// skip them in one pass alongside crashed actives.
303    pub fn liveness_for(entry: &AgentEntry) -> Liveness {
304        if entry.status != AgentStatus::Active {
305            return Liveness::Dead;
306        }
307        is_owner_alive(entry.pid, entry.boot_id.as_deref())
308    }
309
310    /// Mark an active entry as `Abandoned`, recording the moment the
311    /// reservation was reaped.
312    fn abandon_active_entry(&self, mut entry: AgentEntry) -> Result<AgentEntry> {
313        entry.status = AgentStatus::Abandoned;
314        entry.completed_at = Some(Utc::now());
315        self.write_entry_file(&entry)?;
316        Ok(entry)
317    }
318
319    /// Sweep entries on a single thread and reap any whose recorded
320    /// owner is demonstrably dead. Returns the number of entries
321    /// transitioned to `Abandoned`. Caller must already hold the
322    /// registry write lock.
323    fn reap_dead_locked(&self, thread_filter: Option<&str>) -> Result<usize> {
324        if !self.agents_dir.exists() {
325            return Ok(0);
326        }
327        let mut reaped = 0usize;
328        for dir_entry in std::fs::read_dir(&self.agents_dir)? {
329            let dir_entry = dir_entry?;
330            let path = dir_entry.path();
331            if !path.extension().map(|e| e == "toml").unwrap_or(false) {
332                continue;
333            }
334            let Some(entry) = self.load_entry_from_path(&path)? else {
335                continue;
336            };
337            if entry.status != AgentStatus::Active {
338                continue;
339            }
340            if let Some(name) = thread_filter
341                && entry.thread != name
342            {
343                continue;
344            }
345            if Self::liveness_for(&entry) == Liveness::Dead {
346                self.abandon_active_entry(entry)?;
347                reaped += 1;
348            }
349        }
350        Ok(reaped)
351    }
352
353    /// Reap any active reservation on `thread` whose owning process is
354    /// no longer alive. Returns the number of entries abandoned.
355    ///
356    /// This is the read-side complement of [`try_reserve_thread`]:
357    /// callers that only need a fresh view of which sessions are alive
358    /// (e.g. `heddle agent list --alive-only`) can run it without
359    /// reserving.
360    pub fn reap_dead_for_thread(&self, thread: &str) -> Result<usize> {
361        let _lock = self.write_lock()?;
362        self.reap_dead_locked(Some(thread))
363    }
364
365    /// Reap dead reservations across every thread.
366    pub fn reap_dead(&self) -> Result<usize> {
367        let _lock = self.write_lock()?;
368        self.reap_dead_locked(None)
369    }
370
371    fn create_generated_entry_with<F, G>(
372        &self,
373        mut generate_id: G,
374        mut build_entry: F,
375    ) -> Result<AgentEntry>
376    where
377        F: FnMut(&str) -> Result<AgentEntry>,
378        G: FnMut() -> String,
379    {
380        let _lock = self.write_lock()?;
381
382        loop {
383            let session_id = generate_id();
384            let path = self.entry_path(&session_id)?;
385            if path.exists() {
386                continue;
387            }
388
389            let entry = build_entry(&session_id)?;
390            self.write_entry_file(&entry)?;
391            return Ok(entry);
392        }
393    }
394
395    /// Create and persist a new agent entry with a unique generated session ID.
396    pub fn create_generated_entry<F>(&self, build_entry: F) -> Result<AgentEntry>
397    where
398        F: FnMut(&str) -> Result<AgentEntry>,
399    {
400        self.create_generated_entry_with(generate_agent_id, build_entry)
401    }
402
403    /// Persist an agent entry.
404    ///
405    /// Atomic write: uses write-to-temp-then-rename so a crash mid-write
406    /// never leaves the TOML file truncated or partially written.
407    pub fn save(&self, entry: &AgentEntry) -> Result<()> {
408        let _lock = self.write_lock()?;
409        self.write_entry_file(entry)
410    }
411
412    /// Load a single agent entry by session ID.
413    pub fn load(&self, session_id: &str) -> Result<Option<AgentEntry>> {
414        let path = self.entry_path(session_id)?;
415        let Some(entry) = self.load_entry_from_path(&path)? else {
416            return Ok(None);
417        };
418
419        if self.is_stale_terminal_entry(&entry) {
420            let _lock = self.write_lock()?;
421            if let Some(latest) = self.load_entry_from_path(&path)?
422                && self.is_stale_terminal_entry(&latest)
423            {
424                self.prune_stale_entry_path(&path)?;
425                return Ok(None);
426            }
427        }
428
429        Ok(Some(entry))
430    }
431
432    /// List all agent entries, most-recently-started first.
433    pub fn list(&self) -> Result<Vec<AgentEntry>> {
434        if !self.agents_dir.exists() {
435            return Ok(Vec::new());
436        }
437
438        let mut stale_paths = Vec::new();
439        let mut entries = Vec::new();
440        for dir_entry in std::fs::read_dir(&self.agents_dir)? {
441            let dir_entry = dir_entry?;
442            let path = dir_entry.path();
443            if path.extension().map(|e| e == "toml").unwrap_or(false)
444                && let Ok(content) = std::fs::read_to_string(&path)
445                && let Ok(entry) = toml::from_str::<AgentEntry>(&content)
446            {
447                if self.is_stale_terminal_entry(&entry) {
448                    stale_paths.push(path);
449                } else {
450                    entries.push(entry);
451                }
452            }
453        }
454
455        if !stale_paths.is_empty() {
456            let _lock = self.write_lock()?;
457            for path in stale_paths {
458                if let Some(entry) = self.load_entry_from_path(&path)?
459                    && self.is_stale_terminal_entry(&entry)
460                {
461                    self.prune_stale_entry_path(&path)?;
462                }
463            }
464        }
465
466        entries.sort_by_key(|a| std::cmp::Reverse(a.started_at));
467        Ok(entries)
468    }
469
470    /// Update the status of an agent entry in place.
471    pub fn update_status(&self, session_id: &str, status: AgentStatus) -> Result<()> {
472        let _lock = self.write_lock()?;
473        let path = self.entry_path(session_id)?;
474        if let Some(mut entry) = self.load_entry_from_path(&path)? {
475            entry.status = status;
476            entry.completed_at = match entry.status {
477                AgentStatus::Active => None,
478                AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
479                    Some(Utc::now())
480                }
481            };
482            self.write_entry_file(&entry)?;
483        }
484        Ok(())
485    }
486
487    /// Mutate an existing agent entry under the registry write lock.
488    pub fn update_entry<F>(&self, session_id: &str, mut update: F) -> Result<Option<AgentEntry>>
489    where
490        F: FnMut(&mut AgentEntry),
491    {
492        let _lock = self.write_lock()?;
493        let path = self.entry_path(session_id)?;
494        let Some(mut entry) = self.load_entry_from_path(&path)? else {
495            return Ok(None);
496        };
497        update(&mut entry);
498        self.write_entry_file(&entry)?;
499        Ok(Some(entry))
500    }
501
502    /// Under one registry write lock, reuse a matching active entry if one
503    /// exists; otherwise create a new generated entry.
504    pub fn find_or_create_active_entry<FMatch, FUpdate, FBuild>(
505        &self,
506        mut matches: FMatch,
507        mut update_existing: FUpdate,
508        mut build_entry: FBuild,
509    ) -> Result<(AgentEntry, bool)>
510    where
511        FMatch: FnMut(&AgentEntry) -> bool,
512        FUpdate: FnMut(&mut AgentEntry),
513        FBuild: FnMut(&str) -> Result<AgentEntry>,
514    {
515        let _lock = self.write_lock()?;
516        std::fs::create_dir_all(&self.agents_dir)?;
517
518        for dir_entry in std::fs::read_dir(&self.agents_dir)? {
519            let dir_entry = dir_entry?;
520            let path = dir_entry.path();
521            if !path.extension().map(|e| e == "toml").unwrap_or(false) {
522                continue;
523            }
524            let Some(mut entry) = self.load_entry_from_path(&path)? else {
525                continue;
526            };
527            if self.is_stale_terminal_entry(&entry) {
528                self.prune_stale_entry_path(&path)?;
529                continue;
530            }
531            if entry.status == AgentStatus::Active && matches(&entry) {
532                update_existing(&mut entry);
533                self.write_entry_file(&entry)?;
534                return Ok((entry, false));
535            }
536        }
537
538        loop {
539            let session_id = generate_agent_id();
540            let path = self.entry_path(&session_id)?;
541            if path.exists() {
542                continue;
543            }
544
545            let entry = build_entry(&session_id)?;
546            self.write_entry_file(&entry)?;
547            return Ok((entry, true));
548        }
549    }
550
551    /// Atomic "reap dead, then reserve" for a single thread.
552    ///
553    /// Under one registry write lock this:
554    /// 1. Prunes terminal-status entries past their TTL.
555    /// 2. Transitions any *active* entry whose recorded owner has died
556    ///    (per [`liveness_for`](Self::liveness_for)) to `Abandoned`.
557    /// 3. If a still-living active entry on `thread` remains, returns
558    ///    [`ReserveOutcome::LiveOwner`] with that entry.
559    /// 4. Otherwise builds and persists a new entry, returning
560    ///    [`ReserveOutcome::Reserved`].
561    ///
562    /// Both `cmd_agent_reserve` (the JSON API) and `start_thread`
563    /// route through here, so reaping and conflict detection share
564    /// exactly one code path.
565    pub fn try_reserve_thread<F>(&self, thread: &str, build_entry: F) -> Result<ReserveOutcome>
566    where
567        F: FnMut(&str) -> Result<AgentEntry>,
568    {
569        let _lock = self.write_lock()?;
570        std::fs::create_dir_all(&self.agents_dir)?;
571
572        let mut live_owner: Option<AgentEntry> = None;
573        for dir_entry in std::fs::read_dir(&self.agents_dir)? {
574            let dir_entry = dir_entry?;
575            let path = dir_entry.path();
576            if !path.extension().map(|e| e == "toml").unwrap_or(false) {
577                continue;
578            }
579            let Some(entry) = self.load_entry_from_path(&path)? else {
580                continue;
581            };
582            if self.is_stale_terminal_entry(&entry) {
583                self.prune_stale_entry_path(&path)?;
584                continue;
585            }
586            if entry.status != AgentStatus::Active || entry.thread != thread {
587                continue;
588            }
589            match Self::liveness_for(&entry) {
590                Liveness::Dead => {
591                    self.abandon_active_entry(entry)?;
592                }
593                Liveness::Alive | Liveness::Unknown => {
594                    // Unknown collapses to Alive here so we never
595                    // double-allocate a thread on insufficient evidence.
596                    live_owner = Some(entry);
597                    break;
598                }
599            }
600        }
601
602        if let Some(existing) = live_owner {
603            return Ok(ReserveOutcome::LiveOwner(existing));
604        }
605
606        let mut build_entry = build_entry;
607        loop {
608            let session_id = generate_agent_id();
609            let path = self.entry_path(&session_id)?;
610            if path.exists() {
611                continue;
612            }
613
614            let entry = build_entry(&session_id)?;
615            self.write_entry_file(&entry)?;
616            return Ok(ReserveOutcome::Reserved(entry));
617        }
618    }
619
620    /// Backwards-compatible thin wrapper around [`try_reserve_thread`]
621    /// that returns the same flat `Result<AgentEntry>` shape callers
622    /// historically expected. Live-owner conflicts surface as a
623    /// `HeddleError::Config(..)` with the same human-readable message
624    /// as before.
625    pub fn create_generated_entry_for_thread<F>(
626        &self,
627        thread: &str,
628        build_entry: F,
629    ) -> Result<AgentEntry>
630    where
631        F: FnMut(&str) -> Result<AgentEntry>,
632    {
633        match self.try_reserve_thread(thread, build_entry)? {
634            ReserveOutcome::Reserved(entry) => Ok(entry),
635            ReserveOutcome::LiveOwner(existing) => Err(HeddleError::Config(format!(
636                "thread '{}' already has active reservation {}. Use `heddle thread show {}` to inspect it, or release the session before starting another writer.",
637                thread, existing.session_id, thread
638            ))),
639        }
640    }
641
642    /// Find the active session whose visible or private execution root matches
643    /// the given worktree root.
644    pub fn find_active_by_path(&self, worktree_root: &Path) -> Result<Option<AgentEntry>> {
645        let canonical = worktree_root
646            .canonicalize()
647            .unwrap_or_else(|_| worktree_root.to_path_buf());
648        let entries = self.list()?;
649        Ok(entries
650            .into_iter()
651            .find(|e| e.status == AgentStatus::Active && entry_matches_root(e, &canonical)))
652    }
653
654    /// Find the active registry entry associated with the given Heddle session ID.
655    pub fn find_active_by_heddle_session_id(
656        &self,
657        heddle_session_id: &str,
658    ) -> Result<Option<AgentEntry>> {
659        let entries = self.list()?;
660        Ok(entries.into_iter().find(|entry| {
661            entry.status == AgentStatus::Active
662                && entry.heddle_session_id.as_deref() == Some(heddle_session_id)
663        }))
664    }
665
666    /// Find the active registry entry associated with a stable harness-side
667    /// client instance identifier.
668    pub fn find_active_by_client_instance_id(
669        &self,
670        client_instance_id: &str,
671    ) -> Result<Option<AgentEntry>> {
672        let entries = self.list()?;
673        Ok(entries.into_iter().find(|entry| {
674            entry.status == AgentStatus::Active
675                && entry.client_instance_id.as_deref() == Some(client_instance_id)
676        }))
677    }
678
679    /// Find the active registry entry associated with a harness-native actor key.
680    pub fn find_active_by_native_actor_key(
681        &self,
682        native_actor_key: &str,
683    ) -> Result<Option<AgentEntry>> {
684        let entries = self.list()?;
685        Ok(entries.into_iter().find(|entry| {
686            entry.status == AgentStatus::Active
687                && entry.native_actor_key.as_deref() == Some(native_actor_key)
688        }))
689    }
690
691    /// Return this actor's native parent chain, ordered root to leaf.
692    ///
693    /// The lookup intentionally follows harness-native actor keys rather than
694    /// thread names: subagents may work in lightweight directories or forked
695    /// threads, but the native parent key is the stable "who spawned whom"
696    /// edge that preserves Human -> agent -> agent attribution.
697    pub fn actor_chain_for_session(&self, session_id: &str) -> Result<Vec<ActorChainNode>> {
698        let entries = self.list()?;
699        let by_session: HashMap<&str, &AgentEntry> = entries
700            .iter()
701            .map(|entry| (entry.session_id.as_str(), entry))
702            .collect();
703        let by_native_key: HashMap<&str, &AgentEntry> = entries
704            .iter()
705            .filter_map(|entry| entry.native_actor_key.as_deref().map(|key| (key, entry)))
706            .collect();
707
708        let Some(mut current) = by_session.get(session_id).copied() else {
709            return Ok(Vec::new());
710        };
711        let mut leaf_to_root = vec![ActorChainNode::from(current)];
712        let mut seen = HashSet::from([current.session_id.as_str()]);
713
714        while let Some(parent_key) = current.native_parent_actor_key.as_deref() {
715            let Some(parent) = by_native_key.get(parent_key).copied() else {
716                break;
717            };
718            if !seen.insert(parent.session_id.as_str()) {
719                break;
720            }
721            leaf_to_root.push(ActorChainNode::from(parent));
722            current = parent;
723        }
724
725        leaf_to_root.reverse();
726        Ok(leaf_to_root)
727    }
728
729    /// Find the active registry entry associated with a harness-native instance
730    /// key inside the given worktree root.
731    pub fn find_active_by_native_instance_key_at_path(
732        &self,
733        native_instance_key: &str,
734        worktree_root: &Path,
735    ) -> Result<Option<AgentEntry>> {
736        let canonical = worktree_root
737            .canonicalize()
738            .unwrap_or_else(|_| worktree_root.to_path_buf());
739        let entries = self.list()?;
740        Ok(entries.into_iter().find(|entry| {
741            entry.status == AgentStatus::Active
742                && entry.native_instance_key.as_deref() == Some(native_instance_key)
743                && entry_matches_root(entry, &canonical)
744        }))
745    }
746
747    /// Append a context query to an active session's log.
748    ///
749    /// Best-effort: silently ignored if the session no longer exists or has completed.
750    pub fn log_context_query(&self, session_id: &str, query: ContextQueryEntry) -> Result<()> {
751        let _lock = self.write_lock()?;
752        let path = self.entry_path(session_id)?;
753        if let Some(mut entry) = self.load_entry_from_path(&path)?
754            && entry.status == AgentStatus::Active
755        {
756            entry.context_queries.push(query);
757            self.write_entry_file(&entry)?;
758        }
759        Ok(())
760    }
761
762    /// Delete an agent entry.
763    pub fn delete(&self, session_id: &str) -> Result<()> {
764        let path = self.entry_path(session_id)?;
765        if path.exists() {
766            std::fs::remove_file(path)?;
767        }
768        Ok(())
769    }
770}
771
772/// Generate a unique agent session identifier.
773///
774/// Uses 12 random bytes (96 bits) encoded as lowercase base32, giving
775/// a birthday-paradox collision probability of < 10⁻²⁰ at a million sessions.
776pub fn generate_agent_id() -> String {
777    let random_bytes: [u8; 12] = rand::random();
778    format!(
779        "agent-{}",
780        base32::encode(base32::Alphabet::Rfc4648 { padding: false }, &random_bytes).to_lowercase()
781    )
782}
783
784fn entry_matches_root(entry: &AgentEntry, canonical: &Path) -> bool {
785    entry
786        .path
787        .as_ref()
788        .map(|p| p.canonicalize().unwrap_or_else(|_| p.clone()) == canonical)
789        .unwrap_or(false)
790}
791
792#[cfg(test)]
793mod tests {
794    use tempfile::TempDir;
795
796    use super::*;
797
798    fn create_registry() -> (TempDir, AgentRegistry) {
799        let temp_dir = TempDir::new().unwrap();
800        let registry = AgentRegistry::new(&temp_dir.path().join(".heddle"));
801        (temp_dir, registry)
802    }
803
804    fn entry(session_id: &str, status: AgentStatus) -> AgentEntry {
805        AgentEntry {
806            session_id: session_id.to_string(),
807            client_instance_id: None,
808            native_actor_key: None,
809            native_parent_actor_key: None,
810            native_instance_key: None,
811            heddle_session_id: None,
812            thread_id: None,
813            thread: format!("agent/{session_id}"),
814            pid: None,
815            boot_id: None,
816            liveness_path: None,
817            heartbeat_at: None,
818            anchor_state: None,
819            anchor_root: None,
820            reservation_token: None,
821            path: None,
822            base_state: "hd-base".to_string(),
823            started_at: Utc::now(),
824            provider: None,
825            model: None,
826            harness: None,
827            thinking_level: None,
828            usage_summary: AgentUsageSummary::default(),
829            last_progress_at: None,
830            report_flush_state: None,
831            attach_reason: None,
832            attach_precedence: vec![],
833            winning_attach_rule: None,
834            probe_source: None,
835            probe_confidence: None,
836            status,
837            completed_at: None,
838            context_queries: vec![],
839        }
840    }
841
842    #[test]
843    fn list_prunes_stale_completed_entries() {
844        let (_temp, registry) = create_registry();
845        let mut stale = entry("agent-stale", AgentStatus::Complete);
846        stale.completed_at = Some(Utc::now() - chrono::Duration::days(8));
847        let active = entry("agent-active", AgentStatus::Active);
848
849        registry.save(&stale).unwrap();
850        registry.save(&active).unwrap();
851
852        let entries = registry.list().unwrap();
853        assert_eq!(entries.len(), 1);
854        assert_eq!(entries[0].session_id, "agent-active");
855        assert!(registry.load("agent-stale").unwrap().is_none());
856    }
857
858    #[test]
859    fn load_returns_none_for_stale_completed_entry() {
860        let (_temp, registry) = create_registry();
861        let mut stale = entry("agent-stale", AgentStatus::Merged);
862        stale.completed_at = Some(Utc::now() - chrono::Duration::days(8));
863        registry.save(&stale).unwrap();
864
865        assert!(registry.load("agent-stale").unwrap().is_none());
866    }
867
868    #[test]
869    fn log_context_query_appends_to_active_session() {
870        let (_temp, registry) = create_registry();
871        let active = entry("agent-active", AgentStatus::Active);
872        registry.save(&active).unwrap();
873
874        let query = ContextQueryEntry {
875            path: "src/auth/session.rs".to_string(),
876            scope: Some("symbol:validate_token".to_string()),
877            queried_at: Utc::now(),
878        };
879        registry.log_context_query("agent-active", query).unwrap();
880
881        let loaded = registry.load("agent-active").unwrap().unwrap();
882        assert_eq!(loaded.context_queries.len(), 1);
883        assert_eq!(loaded.context_queries[0].path, "src/auth/session.rs");
884        assert_eq!(
885            loaded.context_queries[0].scope.as_deref(),
886            Some("symbol:validate_token")
887        );
888    }
889
890    #[test]
891    fn log_context_query_no_op_for_complete_session() {
892        let (_temp, registry) = create_registry();
893        let mut complete = entry("agent-done", AgentStatus::Complete);
894        complete.completed_at = Some(Utc::now());
895        registry.save(&complete).unwrap();
896
897        let query = ContextQueryEntry {
898            path: "src/lib.rs".to_string(),
899            scope: None,
900            queried_at: Utc::now(),
901        };
902        registry.log_context_query("agent-done", query).unwrap();
903
904        let loaded = registry.load("agent-done").unwrap().unwrap();
905        assert_eq!(loaded.context_queries.len(), 0);
906    }
907
908    #[test]
909    fn find_active_by_path_returns_matching_session() {
910        let (temp, registry) = create_registry();
911        let worktree = temp.path().join("checkout");
912        std::fs::create_dir_all(&worktree).unwrap();
913
914        let mut active = entry("agent-match", AgentStatus::Active);
915        active.path = Some(worktree.clone());
916        registry.save(&active).unwrap();
917
918        let mut other = entry("agent-other", AgentStatus::Active);
919        other.path = Some(temp.path().join("other-checkout"));
920        registry.save(&other).unwrap();
921
922        let found = registry.find_active_by_path(&worktree).unwrap();
923        assert!(found.is_some());
924        assert_eq!(found.unwrap().session_id, "agent-match");
925    }
926
927    #[test]
928    fn create_generated_entry_retries_collisions_under_lock() {
929        let (_temp, registry) = create_registry();
930        registry
931            .save(&entry("agent-existing", AgentStatus::Active))
932            .unwrap();
933
934        let mut ids = vec!["agent-existing".to_string(), "agent-new".to_string()].into_iter();
935        let created = registry
936            .create_generated_entry_with(
937                move || ids.next().unwrap(),
938                |session_id| {
939                    let mut entry = entry(session_id, AgentStatus::Active);
940                    entry.thread = format!("agent/{session_id}");
941                    Ok(entry)
942                },
943            )
944            .unwrap();
945
946        assert_eq!(created.session_id, "agent-new");
947        assert!(registry.load("agent-existing").unwrap().is_some());
948        assert!(registry.load("agent-new").unwrap().is_some());
949    }
950
951    /// Regression: `cmd_agent_reserve` reserves an Active entry via
952    /// `try_reserve_thread` *before* it advances the thread ref via
953    /// `set_thread_cas`. If the CAS races (another writer advanced the
954    /// thread between the pre-check and the CAS) and the caller doesn't
955    /// abandon the orphaned reservation, subsequent reservers see a
956    /// ghost live owner that can't be cleared until pid-liveness reaping
957    /// kicks in.
958    ///
959    /// The fix wraps the post-reserve work in a fallible block and
960    /// transitions the entry to `Abandoned` via `update_entry` if the
961    /// block returns Err. This test exercises that primitive: an
962    /// Abandoned entry must not block a fresh reservation, and the
963    /// next reserve must succeed cleanly.
964    #[test]
965    fn abandoning_active_entry_unblocks_subsequent_reserve_on_same_thread() {
966        let (_temp, registry) = create_registry();
967
968        // First reserve: Active entry is created. Stand in for what
969        // `cmd_agent_reserve`'s `try_reserve_thread` call writes.
970        let outcome = registry
971            .try_reserve_thread("feature/leak-repro", |session_id| {
972                let mut e = entry(session_id, AgentStatus::Active);
973                e.thread = "feature/leak-repro".to_string();
974                e.thread_id = Some("feature/leak-repro".to_string());
975                // pid 1 is always alive — keeps liveness honest if the
976                // test environment exposes a boot_id; we'll abandon
977                // explicitly below regardless.
978                e.pid = Some(1);
979                e.boot_id = crate::store::liveness::current_boot_id();
980                Ok(e)
981            })
982            .unwrap();
983        let session_id = match outcome {
984            ReserveOutcome::Reserved(entry) => entry.session_id,
985            ReserveOutcome::LiveOwner(_) => panic!("first reserve must succeed"),
986        };
987
988        // Simulate the cleanup path: post-reserve work failed (e.g.,
989        // `set_thread_cas` lost a race), so we transition the entry to
990        // Abandoned and bubble the original error to the caller. The
991        // caller never saw this session_id in JSON, so no orchestrator
992        // is holding it.
993        registry
994            .update_entry(&session_id, |entry| {
995                entry.status = AgentStatus::Abandoned;
996                entry.completed_at = Some(Utc::now());
997            })
998            .unwrap();
999
1000        // Second reserve on the same thread must succeed: `try_reserve_thread`
1001        // skips Abandoned entries when deciding live-owner vs reuse, so
1002        // no ghost row blocks us.
1003        let next = registry
1004            .try_reserve_thread("feature/leak-repro", |session_id| {
1005                let mut e = entry(session_id, AgentStatus::Active);
1006                e.thread = "feature/leak-repro".to_string();
1007                e.thread_id = Some("feature/leak-repro".to_string());
1008                e.pid = Some(1);
1009                e.boot_id = crate::store::liveness::current_boot_id();
1010                Ok(e)
1011            })
1012            .unwrap();
1013        assert!(
1014            matches!(next, ReserveOutcome::Reserved(_)),
1015            "after the orphaned reservation is abandoned, the next reserve must succeed: {next:?}"
1016        );
1017
1018        // The registry now holds exactly one Active entry on this
1019        // thread — the new one — plus the Abandoned husk. Counting
1020        // Active by thread should be 1, not 2.
1021        let active_count = registry
1022            .list()
1023            .unwrap()
1024            .into_iter()
1025            .filter(|e| e.thread == "feature/leak-repro" && e.status == AgentStatus::Active)
1026            .count();
1027        assert_eq!(
1028            active_count, 1,
1029            "exactly one Active reservation must own the thread after rollback + retry"
1030        );
1031    }
1032
1033    #[test]
1034    fn update_entry_persists_harness_metadata() {
1035        let (_temp, registry) = create_registry();
1036        let active = entry("agent-active", AgentStatus::Active);
1037        registry.save(&active).unwrap();
1038
1039        registry
1040            .update_entry("agent-active", |entry| {
1041                entry.heddle_session_id = Some("sess-123".to_string());
1042                entry.harness = Some("claude-code".to_string());
1043                entry.thinking_level = Some("deep".to_string());
1044                entry.report_flush_state = Some("pending-local".to_string());
1045                entry.attach_reason = Some("attached from test metadata update".to_string());
1046                entry.attach_precedence = vec!["matched-current-session".to_string()];
1047                entry.winning_attach_rule = Some("matched-current-session".to_string());
1048                entry.probe_source = Some("argv_env".to_string());
1049                entry.probe_confidence = Some(0.75);
1050                entry.last_progress_at = Some(Utc::now());
1051                entry.usage_summary.input_tokens = Some(42);
1052            })
1053            .unwrap();
1054
1055        let loaded = registry.load("agent-active").unwrap().unwrap();
1056        assert_eq!(loaded.heddle_session_id.as_deref(), Some("sess-123"));
1057        assert_eq!(loaded.harness.as_deref(), Some("claude-code"));
1058        assert_eq!(loaded.thinking_level.as_deref(), Some("deep"));
1059        assert_eq!(loaded.report_flush_state.as_deref(), Some("pending-local"));
1060        assert_eq!(
1061            loaded.attach_reason.as_deref(),
1062            Some("attached from test metadata update")
1063        );
1064        assert_eq!(loaded.attach_precedence, vec!["matched-current-session"]);
1065        assert_eq!(
1066            loaded.winning_attach_rule.as_deref(),
1067            Some("matched-current-session")
1068        );
1069        assert_eq!(loaded.probe_source.as_deref(), Some("argv_env"));
1070        assert_eq!(loaded.probe_confidence, Some(0.75));
1071        assert_eq!(loaded.usage_summary.input_tokens, Some(42));
1072        assert!(loaded.last_progress_at.is_some());
1073    }
1074
1075    #[test]
1076    fn find_active_by_client_instance_id_returns_matching_session() {
1077        let (_temp, registry) = create_registry();
1078        let mut active = entry("agent-client", AgentStatus::Active);
1079        active.client_instance_id = Some("client-a".to_string());
1080        registry.save(&active).unwrap();
1081
1082        let mut other = entry("agent-other", AgentStatus::Active);
1083        other.client_instance_id = Some("client-b".to_string());
1084        registry.save(&other).unwrap();
1085
1086        let found = registry
1087            .find_active_by_client_instance_id("client-a")
1088            .unwrap()
1089            .unwrap();
1090        assert_eq!(found.session_id, "agent-client");
1091    }
1092
1093    #[test]
1094    fn find_active_by_native_actor_key_returns_matching_session() {
1095        let (_temp, registry) = create_registry();
1096        let mut active = entry("agent-native", AgentStatus::Active);
1097        active.native_actor_key = Some("codex:thread:thr_123".to_string());
1098        registry.save(&active).unwrap();
1099
1100        let found = registry
1101            .find_active_by_native_actor_key("codex:thread:thr_123")
1102            .unwrap()
1103            .unwrap();
1104        assert_eq!(found.session_id, "agent-native");
1105    }
1106
1107    #[test]
1108    fn actor_chain_follows_native_parent_keys_root_to_leaf() {
1109        let (_temp, registry) = create_registry();
1110        let mut root = entry("agent-root", AgentStatus::Active);
1111        root.native_actor_key = Some("human:lukethorne".to_string());
1112        root.provider = Some("human".to_string());
1113
1114        let mut parent = entry("agent-parent", AgentStatus::Active);
1115        parent.native_actor_key = Some("codex:thread:parent".to_string());
1116        parent.native_parent_actor_key = Some("human:lukethorne".to_string());
1117        parent.provider = Some("openai".to_string());
1118        parent.model = Some("gpt-5".to_string());
1119
1120        let mut child = entry("agent-child", AgentStatus::Active);
1121        child.native_actor_key = Some("codex:thread:child".to_string());
1122        child.native_parent_actor_key = Some("codex:thread:parent".to_string());
1123        child.provider = Some("openai".to_string());
1124        child.model = Some("gpt-5-mini".to_string());
1125
1126        registry.save(&child).unwrap();
1127        registry.save(&root).unwrap();
1128        registry.save(&parent).unwrap();
1129
1130        let chain = registry.actor_chain_for_session("agent-child").unwrap();
1131        let ids: Vec<_> = chain.iter().map(|node| node.session_id.as_str()).collect();
1132        assert_eq!(ids, vec!["agent-root", "agent-parent", "agent-child"]);
1133        assert_eq!(
1134            chain[2].native_parent_actor_key.as_deref(),
1135            Some("codex:thread:parent")
1136        );
1137    }
1138
1139    #[test]
1140    fn try_reserve_thread_reaps_dead_active_entry_and_succeeds() {
1141        let (_temp, registry) = create_registry();
1142        let mut dead = entry("agent-dead", AgentStatus::Active);
1143        dead.thread = "feature/race".to_string();
1144        // PID 0x7fff_ffff is unassignable on Linux/macOS. Combined with
1145        // a stale boot id this is unambiguously dead.
1146        dead.pid = Some(0x7fff_ffff);
1147        dead.boot_id = Some("not-the-current-boot".to_string());
1148        registry.save(&dead).unwrap();
1149
1150        let outcome = registry
1151            .try_reserve_thread("feature/race", |session_id| {
1152                let mut new = entry(session_id, AgentStatus::Active);
1153                new.thread = "feature/race".to_string();
1154                new.pid = Some(std::process::id());
1155                new.boot_id = crate::store::liveness::current_boot_id();
1156                Ok(new)
1157            })
1158            .unwrap();
1159
1160        match outcome {
1161            ReserveOutcome::Reserved(entry) => assert_ne!(entry.session_id, "agent-dead"),
1162            ReserveOutcome::LiveOwner(_) => panic!("dead owner should have been reaped"),
1163        }
1164        let abandoned = registry.load("agent-dead").unwrap().unwrap();
1165        assert_eq!(abandoned.status, AgentStatus::Abandoned);
1166        assert!(abandoned.completed_at.is_some());
1167    }
1168
1169    #[test]
1170    fn try_reserve_thread_reports_live_owner_when_pid_is_alive() {
1171        let (_temp, registry) = create_registry();
1172        let mut alive = entry("agent-alive", AgentStatus::Active);
1173        alive.thread = "feature/busy".to_string();
1174        alive.pid = Some(std::process::id());
1175        alive.boot_id = crate::store::liveness::current_boot_id();
1176        registry.save(&alive).unwrap();
1177
1178        let outcome = registry
1179            .try_reserve_thread("feature/busy", |session_id| {
1180                let mut new = entry(session_id, AgentStatus::Active);
1181                new.thread = "feature/busy".to_string();
1182                Ok(new)
1183            })
1184            .unwrap();
1185
1186        match outcome {
1187            ReserveOutcome::Reserved(_) => panic!("live owner should have blocked reservation"),
1188            ReserveOutcome::LiveOwner(existing) => assert_eq!(existing.session_id, "agent-alive"),
1189        }
1190        let still_alive = registry.load("agent-alive").unwrap().unwrap();
1191        assert_eq!(still_alive.status, AgentStatus::Active);
1192    }
1193
1194    #[test]
1195    fn reap_dead_for_thread_only_touches_named_thread() {
1196        let (_temp, registry) = create_registry();
1197        let mut dead_a = entry("agent-dead-a", AgentStatus::Active);
1198        dead_a.thread = "feature/a".to_string();
1199        dead_a.pid = Some(0x7fff_ffff);
1200        dead_a.boot_id = Some("stale".to_string());
1201        let mut dead_b = entry("agent-dead-b", AgentStatus::Active);
1202        dead_b.thread = "feature/b".to_string();
1203        dead_b.pid = Some(0x7fff_ffff);
1204        dead_b.boot_id = Some("stale".to_string());
1205        registry.save(&dead_a).unwrap();
1206        registry.save(&dead_b).unwrap();
1207
1208        let reaped = registry.reap_dead_for_thread("feature/a").unwrap();
1209        assert_eq!(reaped, 1);
1210        assert_eq!(
1211            registry.load("agent-dead-a").unwrap().unwrap().status,
1212            AgentStatus::Abandoned
1213        );
1214        assert_eq!(
1215            registry.load("agent-dead-b").unwrap().unwrap().status,
1216            AgentStatus::Active,
1217            "untargeted thread should not be reaped"
1218        );
1219
1220        let reaped_all = registry.reap_dead().unwrap();
1221        assert_eq!(reaped_all, 1);
1222        assert_eq!(
1223            registry.load("agent-dead-b").unwrap().unwrap().status,
1224            AgentStatus::Abandoned
1225        );
1226    }
1227
1228    #[test]
1229    fn actor_chain_breaks_cycles_without_looping() {
1230        let (_temp, registry) = create_registry();
1231        let mut a = entry("agent-a", AgentStatus::Active);
1232        a.native_actor_key = Some("actor:a".to_string());
1233        a.native_parent_actor_key = Some("actor:b".to_string());
1234        let mut b = entry("agent-b", AgentStatus::Active);
1235        b.native_actor_key = Some("actor:b".to_string());
1236        b.native_parent_actor_key = Some("actor:a".to_string());
1237        registry.save(&a).unwrap();
1238        registry.save(&b).unwrap();
1239
1240        let chain = registry.actor_chain_for_session("agent-a").unwrap();
1241        assert_eq!(chain.len(), 2);
1242        assert_eq!(chain.last().unwrap().session_id, "agent-a");
1243    }
1244
1245    /// Concurrent-reservation soak. The W5b reservation contract:
1246    /// when N threads race to reserve the same thread name from the
1247    /// same anchor, exactly one wins (`ReserveOutcome::Reserved`) and
1248    /// the rest get `ReserveOutcome::LiveOwner` carrying the winner's
1249    /// session id. Tested 100× with 8 racers per round to catch any
1250    /// rare lock-acquisition flake.
1251    ///
1252    /// `#[ignore]` because this iterates 800 reservations and takes
1253    /// a few seconds; runs in `--include-ignored` sweeps and the
1254    /// nightly real-world matrix.
1255    #[test]
1256    #[ignore = "soak: 100× concurrent reservation race"]
1257    fn try_reserve_thread_under_concurrent_load_is_race_free() {
1258        use std::sync::{Arc, Barrier};
1259
1260        const ROUNDS: usize = 100;
1261        const RACERS: usize = 8;
1262
1263        for round in 0..ROUNDS {
1264            // Fresh registry each round so the previous round's
1265            // winner doesn't bias the next round (also models the
1266            // real "release between batches" pattern).
1267            let (_temp, registry) = create_registry();
1268            let registry = Arc::new(registry);
1269            let barrier = Arc::new(Barrier::new(RACERS));
1270            let thread_name = format!("feature/race-{round}");
1271
1272            let handles: Vec<_> = (0..RACERS)
1273                .map(|racer_idx| {
1274                    let registry = Arc::clone(&registry);
1275                    let barrier = Arc::clone(&barrier);
1276                    let thread_name = thread_name.clone();
1277                    std::thread::spawn(move || {
1278                        // All racers wait on the barrier so the
1279                        // contention is maximised — without this the
1280                        // OS scheduler can serialize them and the
1281                        // test passes trivially.
1282                        barrier.wait();
1283                        let outcome = registry.try_reserve_thread(&thread_name, |session_id| {
1284                            let mut entry = entry(
1285                                &format!("agent-{racer_idx}-{session_id}"),
1286                                AgentStatus::Active,
1287                            );
1288                            entry.thread = thread_name.clone();
1289                            entry.pid = Some(std::process::id());
1290                            entry.boot_id = crate::store::liveness::current_boot_id();
1291                            Ok(entry)
1292                        });
1293                        outcome.expect("reservation call must not error")
1294                    })
1295                })
1296                .collect();
1297
1298            let outcomes: Vec<ReserveOutcome> = handles
1299                .into_iter()
1300                .map(|h| h.join().expect("racer panic"))
1301                .collect();
1302
1303            let reserved_count = outcomes
1304                .iter()
1305                .filter(|o| matches!(o, ReserveOutcome::Reserved(_)))
1306                .count();
1307            let live_owner_count = outcomes
1308                .iter()
1309                .filter(|o| matches!(o, ReserveOutcome::LiveOwner(_)))
1310                .count();
1311
1312            assert_eq!(
1313                reserved_count, 1,
1314                "round {round}: exactly one racer must win the reservation; got {reserved_count}"
1315            );
1316            assert_eq!(
1317                live_owner_count,
1318                RACERS - 1,
1319                "round {round}: every loser must get a LiveOwner outcome; \
1320                 reserved={reserved_count} live_owner={live_owner_count}"
1321            );
1322
1323            // Strong invariant: every LiveOwner outcome's `existing`
1324            // entry must be the winner's session — not some stale
1325            // dead entry, not a different racer's losing-and-then-
1326            // dead entry.
1327            let winner_session = outcomes
1328                .iter()
1329                .find_map(|o| match o {
1330                    ReserveOutcome::Reserved(entry) => Some(entry.session_id.clone()),
1331                    _ => None,
1332                })
1333                .expect("a winner must exist");
1334            for outcome in &outcomes {
1335                if let ReserveOutcome::LiveOwner(existing) = outcome {
1336                    assert_eq!(
1337                        existing.session_id, winner_session,
1338                        "round {round}: LiveOwner conflicts must point at the actual winner; \
1339                         got {} expected {winner_session}",
1340                        existing.session_id
1341                    );
1342                }
1343            }
1344        }
1345    }
1346}