Skip to main content

objects/store/
agent_registry.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Agent registry: lightweight discovery index for parallel agent sessions.
3//!
4//! Stores one TOML file per active agent in `.heddle/agents/<session-id>.toml`.
5//! The registry does NOT manage worktrees or refs — those remain independent.
6//! It exists purely so an orchestrator can query which agents are in flight.
7
8use std::{
9    collections::{HashMap, HashSet},
10    path::{Path, PathBuf},
11};
12
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15
16use crate::{
17    fs_atomic::write_file_atomic,
18    lock::RepoLock,
19    store::{
20        HeddleError, Result,
21        liveness::{Liveness, is_owner_alive},
22    },
23};
24
25const STALE_AGENT_TTL_DAYS: i64 = 7;
26
27/// Outcome of an attempt to reserve an active session on a thread.
28///
29/// `LiveOwner` carries the existing reservation so callers can compare
30/// anchors and surface either an "already reserved by another live
31/// process" or an "anchor drift" error. The registry reaps dead owners
32/// in-place before producing this outcome — so an `Active` entry seen
33/// here is guaranteed to have been alive at the moment of the check.
34#[derive(Debug)]
35pub enum ReserveOutcome {
36    /// Reservation succeeded; the new entry is included.
37    Reserved(AgentEntry),
38    /// Another live agent already holds this thread.
39    LiveOwner(AgentEntry),
40}
41
42/// A record of one `heddle context get` call made during an agent session.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ContextQueryEntry {
45    /// The file path that was queried.
46    pub path: String,
47    /// The scope filter used, if any (e.g. `symbol:parse_manifest`).
48    pub scope: Option<String>,
49    /// When the query was made.
50    pub queried_at: DateTime<Utc>,
51}
52
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct AgentUsageSummary {
55    #[serde(default)]
56    pub input_tokens: Option<u64>,
57    #[serde(default)]
58    pub output_tokens: Option<u64>,
59    #[serde(default)]
60    pub reasoning_tokens: Option<u64>,
61    #[serde(default)]
62    pub tool_calls: Option<u32>,
63    #[serde(default)]
64    pub cost_micros_usd: Option<u64>,
65}
66
67/// Status of an agent session.
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum AgentStatus {
71    /// Agent is actively working.
72    Active,
73    /// Agent's reservation was left behind and has been reaped.
74    Abandoned,
75    /// Agent has finished work (snapshot taken) but not yet merged.
76    Complete,
77    /// Agent's thread has been merged into the base thread.
78    Merged,
79}
80
81impl std::fmt::Display for AgentStatus {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        match self {
84            AgentStatus::Active => write!(f, "active"),
85            AgentStatus::Abandoned => write!(f, "abandoned"),
86            AgentStatus::Complete => write!(f, "complete"),
87            AgentStatus::Merged => write!(f, "merged"),
88        }
89    }
90}
91
92/// A registry entry describing one active (or recently finished) agent session.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct AgentEntry {
95    /// Unique session identifier (e.g. `agent-xxxxxxxxxxxx`).
96    pub session_id: String,
97    /// Stable harness-side instance identifier used to reconnect the same
98    /// local client process to its registry entry across bridge restarts.
99    #[serde(default)]
100    pub client_instance_id: Option<String>,
101    /// Harness-native actor identity such as `codex:thread:thr_123`.
102    #[serde(default)]
103    pub native_actor_key: Option<String>,
104    /// Harness-native parent actor identity for child/subagent sessions.
105    #[serde(default)]
106    pub native_parent_actor_key: Option<String>,
107    /// Harness-native reconnect key such as a transcript path or client name.
108    #[serde(default)]
109    pub native_instance_key: Option<String>,
110    /// Heddle session identifier when this registry entry is attached to a
111    /// first-class Heddle multi-segment session.
112    #[serde(default)]
113    pub heddle_session_id: Option<String>,
114    /// Thread identifier when the session is attached to a Heddle thread record.
115    #[serde(default)]
116    pub thread_id: Option<String>,
117    /// The Heddle thread the agent writes to.
118    pub thread: String,
119    /// Process id that created or last renewed the reservation.
120    #[serde(default)]
121    pub pid: Option<u32>,
122    /// Host boot identifier when available; differentiates reused PIDs.
123    #[serde(default)]
124    pub boot_id: Option<String>,
125    /// Advisory liveness lock file path for future long-lived runners.
126    #[serde(default)]
127    pub liveness_path: Option<PathBuf>,
128    /// Most recent reservation heartbeat.
129    #[serde(default)]
130    pub heartbeat_at: Option<DateTime<Utc>>,
131    /// Full state id the session was anchored to.
132    #[serde(default)]
133    pub anchor_state: Option<String>,
134    /// Root tree id the session was anchored to.
135    #[serde(default)]
136    pub anchor_root: Option<String>,
137    /// Opaque token returned to programmatic agent clients.
138    #[serde(default)]
139    pub reservation_token: Option<String>,
140    /// Absolute path to the agent's checkout directory, if filesystem-based.
141    #[serde(default)]
142    pub path: Option<PathBuf>,
143    /// Short display form of the base state the agent started from.
144    pub base_state: String,
145    /// When the agent session was created.
146    pub started_at: DateTime<Utc>,
147    /// AI provider (e.g. `anthropic`).
148    pub provider: Option<String>,
149    /// AI model (e.g. `claude-sonnet-4-6`).
150    pub model: Option<String>,
151    /// Harness or client name (e.g. `claude-code`, `codex`).
152    #[serde(default)]
153    pub harness: Option<String>,
154    /// Harness-specific reasoning/thinking level when available.
155    #[serde(default)]
156    pub thinking_level: Option<String>,
157    /// Aggregated usage counters captured for the active session.
158    #[serde(default)]
159    pub usage_summary: AgentUsageSummary,
160    /// Most recent progress heartbeat timestamp.
161    #[serde(default)]
162    pub last_progress_at: Option<DateTime<Utc>>,
163    /// Summary flush state for the local session reporter.
164    #[serde(default)]
165    pub report_flush_state: Option<String>,
166    /// Most recent explanation of why Heddle attached this actor to its current
167    /// thread/session context.
168    #[serde(default)]
169    pub attach_reason: Option<String>,
170    /// Local agent task assignment id this session is executing, if any.
171    #[serde(default)]
172    pub task_assignment_id: Option<String>,
173    /// Ordered explanation of attach rules Heddle evaluated.
174    #[serde(default)]
175    pub attach_precedence: Vec<String>,
176    /// The attach rule that won for this actor.
177    #[serde(default)]
178    pub winning_attach_rule: Option<String>,
179    /// Where Heddle learned the harness identity from.
180    #[serde(default)]
181    pub probe_source: Option<String>,
182    /// How confident Heddle was in the probe result.
183    #[serde(default)]
184    pub probe_confidence: Option<f32>,
185    /// Current status.
186    pub status: AgentStatus,
187    /// When the agent was marked complete or merged.
188    #[serde(default)]
189    pub completed_at: Option<DateTime<Utc>>,
190    /// Log of `heddle context get` calls made during this session.
191    /// Appended by the CLI each time an agent queries context from its worktree.
192    #[serde(default)]
193    pub context_queries: Vec<ContextQueryEntry>,
194}
195
196/// One hop in an actor ancestry chain, ordered root to leaf.
197#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
198pub struct ActorChainNode {
199    pub session_id: String,
200    #[serde(default)]
201    pub native_actor_key: Option<String>,
202    #[serde(default)]
203    pub native_parent_actor_key: Option<String>,
204    pub thread: String,
205    pub status: AgentStatus,
206    #[serde(default)]
207    pub provider: Option<String>,
208    #[serde(default)]
209    pub model: Option<String>,
210    #[serde(default)]
211    pub harness: Option<String>,
212}
213
214impl From<&AgentEntry> for ActorChainNode {
215    fn from(entry: &AgentEntry) -> Self {
216        Self {
217            session_id: entry.session_id.clone(),
218            native_actor_key: entry.native_actor_key.clone(),
219            native_parent_actor_key: entry.native_parent_actor_key.clone(),
220            thread: entry.thread.clone(),
221            status: entry.status.clone(),
222            provider: entry.provider.clone(),
223            model: entry.model.clone(),
224            harness: entry.harness.clone(),
225        }
226    }
227}
228
229/// Manages agent registry entries stored in `.heddle/agents/`.
230pub struct AgentRegistry {
231    agents_dir: PathBuf,
232}
233
234impl AgentRegistry {
235    /// Create a new registry backed by `<heddle_dir>/agents/`.
236    pub fn new(heddle_dir: &Path) -> Self {
237        Self {
238            agents_dir: heddle_dir.join("agents"),
239        }
240    }
241
242    fn entry_path(&self, session_id: &str) -> Result<PathBuf> {
243        // Only allow characters produced by generate_agent_id: lowercase
244        // alphanumeric and hyphens.  This makes path traversal structurally
245        // impossible: none of [a-z0-9-] can form ".." or "/".
246        if session_id.is_empty()
247            || !session_id
248                .bytes()
249                .all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
250        {
251            return Err(HeddleError::Config(format!(
252                "invalid session ID '{}': only lowercase alphanumeric and hyphens allowed",
253                session_id
254            )));
255        }
256        Ok(self.agents_dir.join(format!("{}.toml", session_id)))
257    }
258
259    fn lock_path(&self) -> PathBuf {
260        self.agents_dir.join(".lock")
261    }
262
263    fn write_lock(&self) -> Result<crate::lock::WriteLockGuard> {
264        RepoLock::at(self.lock_path()).write().map_err(|err| {
265            HeddleError::Config(format!("failed to acquire agent registry lock: {err}"))
266        })
267    }
268
269    fn write_entry_file(&self, entry: &AgentEntry) -> Result<()> {
270        std::fs::create_dir_all(&self.agents_dir)?;
271        let path = self.entry_path(&entry.session_id)?;
272        let content =
273            toml::to_string_pretty(entry).map_err(|e| HeddleError::Config(e.to_string()))?;
274        Ok(write_file_atomic(&path, content.as_bytes())?)
275    }
276
277    fn load_entry_from_path(&self, path: &Path) -> Result<Option<AgentEntry>> {
278        if !path.exists() {
279            return Ok(None);
280        }
281
282        let content = std::fs::read_to_string(path)?;
283        let entry = toml::from_str(&content).map_err(|e| HeddleError::Config(e.to_string()))?;
284        Ok(Some(entry))
285    }
286
287    fn is_stale_terminal_entry(&self, entry: &AgentEntry) -> bool {
288        if matches!(entry.status, AgentStatus::Active) {
289            return false;
290        }
291
292        let terminal_at = entry.completed_at.unwrap_or(entry.started_at);
293        terminal_at <= Utc::now() - chrono::Duration::days(STALE_AGENT_TTL_DAYS)
294    }
295
296    fn prune_stale_entry_path(&self, path: &Path) -> Result<()> {
297        if path.exists() {
298            std::fs::remove_file(path)?;
299        }
300        Ok(())
301    }
302
303    /// Liveness verdict for an active entry. Terminal entries (anything
304    /// other than `Active`) are reported as `Dead` so reaping logic can
305    /// skip them in one pass alongside crashed actives.
306    pub fn liveness_for(entry: &AgentEntry) -> Liveness {
307        if entry.status != AgentStatus::Active {
308            return Liveness::Dead;
309        }
310        is_owner_alive(entry.pid, entry.boot_id.as_deref())
311    }
312
313    /// Mark an active entry as `Abandoned`, recording the moment the
314    /// reservation was reaped.
315    fn abandon_active_entry(&self, mut entry: AgentEntry) -> Result<AgentEntry> {
316        entry.status = AgentStatus::Abandoned;
317        entry.completed_at = Some(Utc::now());
318        self.write_entry_file(&entry)?;
319        Ok(entry)
320    }
321
322    /// Sweep entries on a single thread and reap any whose recorded
323    /// owner is demonstrably dead. Returns the number of entries
324    /// transitioned to `Abandoned`. Caller must already hold the
325    /// registry write lock.
326    fn reap_dead_locked(&self, thread_filter: Option<&str>) -> Result<usize> {
327        if !self.agents_dir.exists() {
328            return Ok(0);
329        }
330        let mut reaped = 0usize;
331        for dir_entry in std::fs::read_dir(&self.agents_dir)? {
332            let dir_entry = dir_entry?;
333            let path = dir_entry.path();
334            if !path.extension().map(|e| e == "toml").unwrap_or(false) {
335                continue;
336            }
337            let Some(entry) = self.load_entry_from_path(&path)? else {
338                continue;
339            };
340            if entry.status != AgentStatus::Active {
341                continue;
342            }
343            if let Some(name) = thread_filter
344                && entry.thread != name
345            {
346                continue;
347            }
348            if Self::liveness_for(&entry) == Liveness::Dead {
349                self.abandon_active_entry(entry)?;
350                reaped += 1;
351            }
352        }
353        Ok(reaped)
354    }
355
356    /// Reap any active reservation on `thread` whose owning process is
357    /// no longer alive. Returns the number of entries abandoned.
358    ///
359    /// This is the read-side complement of [`try_reserve_thread`]:
360    /// callers that only need a fresh view of which sessions are alive
361    /// (e.g. `heddle agent list --alive-only`) can run it without
362    /// reserving.
363    pub fn reap_dead_for_thread(&self, thread: &str) -> Result<usize> {
364        let _lock = self.write_lock()?;
365        self.reap_dead_locked(Some(thread))
366    }
367
368    /// Reap dead reservations across every thread.
369    pub fn reap_dead(&self) -> Result<usize> {
370        let _lock = self.write_lock()?;
371        self.reap_dead_locked(None)
372    }
373
374    fn create_generated_entry_with<F, G>(
375        &self,
376        mut generate_id: G,
377        mut build_entry: F,
378    ) -> Result<AgentEntry>
379    where
380        F: FnMut(&str) -> Result<AgentEntry>,
381        G: FnMut() -> String,
382    {
383        let _lock = self.write_lock()?;
384
385        loop {
386            let session_id = generate_id();
387            let path = self.entry_path(&session_id)?;
388            if path.exists() {
389                continue;
390            }
391
392            let entry = build_entry(&session_id)?;
393            self.write_entry_file(&entry)?;
394            return Ok(entry);
395        }
396    }
397
398    /// Create and persist a new agent entry with a unique generated session ID.
399    pub fn create_generated_entry<F>(&self, build_entry: F) -> Result<AgentEntry>
400    where
401        F: FnMut(&str) -> Result<AgentEntry>,
402    {
403        self.create_generated_entry_with(generate_agent_id, build_entry)
404    }
405
406    /// Persist an agent entry.
407    ///
408    /// Atomic write: uses write-to-temp-then-rename so a crash mid-write
409    /// never leaves the TOML file truncated or partially written.
410    pub fn save(&self, entry: &AgentEntry) -> Result<()> {
411        let _lock = self.write_lock()?;
412        self.write_entry_file(entry)
413    }
414
415    /// Load a single agent entry by session ID.
416    pub fn load(&self, session_id: &str) -> Result<Option<AgentEntry>> {
417        let path = self.entry_path(session_id)?;
418        let Some(entry) = self.load_entry_from_path(&path)? else {
419            return Ok(None);
420        };
421
422        if self.is_stale_terminal_entry(&entry) {
423            let _lock = self.write_lock()?;
424            if let Some(latest) = self.load_entry_from_path(&path)?
425                && self.is_stale_terminal_entry(&latest)
426            {
427                self.prune_stale_entry_path(&path)?;
428                return Ok(None);
429            }
430        }
431
432        Ok(Some(entry))
433    }
434
435    /// List all agent entries, most-recently-started first.
436    pub fn list(&self) -> Result<Vec<AgentEntry>> {
437        if !self.agents_dir.exists() {
438            return Ok(Vec::new());
439        }
440
441        let mut stale_paths = Vec::new();
442        let mut entries = Vec::new();
443        for dir_entry in std::fs::read_dir(&self.agents_dir)? {
444            let dir_entry = dir_entry?;
445            let path = dir_entry.path();
446            if path.extension().map(|e| e == "toml").unwrap_or(false) {
447                let content = std::fs::read_to_string(&path)?;
448                let entry = toml::from_str::<AgentEntry>(&content).map_err(|err| {
449                    HeddleError::Config(format!(
450                        "failed to parse agent registry entry '{}': {err}",
451                        path.display()
452                    ))
453                })?;
454                if self.is_stale_terminal_entry(&entry) {
455                    stale_paths.push(path);
456                } else {
457                    entries.push(entry);
458                }
459            }
460        }
461
462        if !stale_paths.is_empty() {
463            let _lock = self.write_lock()?;
464            for path in stale_paths {
465                if let Some(entry) = self.load_entry_from_path(&path)?
466                    && self.is_stale_terminal_entry(&entry)
467                {
468                    self.prune_stale_entry_path(&path)?;
469                }
470            }
471        }
472
473        entries.sort_by_key(|a| std::cmp::Reverse(a.started_at));
474        Ok(entries)
475    }
476
477    /// Update the status of an agent entry in place.
478    pub fn update_status(&self, session_id: &str, status: AgentStatus) -> Result<()> {
479        let _lock = self.write_lock()?;
480        let path = self.entry_path(session_id)?;
481        if let Some(mut entry) = self.load_entry_from_path(&path)? {
482            entry.status = status;
483            entry.completed_at = match entry.status {
484                AgentStatus::Active => None,
485                AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
486                    Some(Utc::now())
487                }
488            };
489            self.write_entry_file(&entry)?;
490        }
491        Ok(())
492    }
493
494    /// Mutate an existing agent entry under the registry write lock.
495    pub fn update_entry<F>(&self, session_id: &str, mut update: F) -> Result<Option<AgentEntry>>
496    where
497        F: FnMut(&mut AgentEntry),
498    {
499        let _lock = self.write_lock()?;
500        let path = self.entry_path(session_id)?;
501        let Some(mut entry) = self.load_entry_from_path(&path)? else {
502            return Ok(None);
503        };
504        update(&mut entry);
505        self.write_entry_file(&entry)?;
506        Ok(Some(entry))
507    }
508
509    /// Under one registry write lock, reuse a matching active entry if one
510    /// exists; otherwise create a new generated entry.
511    pub fn find_or_create_active_entry<FMatch, FUpdate, FBuild>(
512        &self,
513        mut matches: FMatch,
514        mut update_existing: FUpdate,
515        mut build_entry: FBuild,
516    ) -> Result<(AgentEntry, bool)>
517    where
518        FMatch: FnMut(&AgentEntry) -> bool,
519        FUpdate: FnMut(&mut AgentEntry),
520        FBuild: FnMut(&str) -> Result<AgentEntry>,
521    {
522        let _lock = self.write_lock()?;
523        std::fs::create_dir_all(&self.agents_dir)?;
524
525        for dir_entry in std::fs::read_dir(&self.agents_dir)? {
526            let dir_entry = dir_entry?;
527            let path = dir_entry.path();
528            if !path.extension().map(|e| e == "toml").unwrap_or(false) {
529                continue;
530            }
531            let Some(mut entry) = self.load_entry_from_path(&path)? else {
532                continue;
533            };
534            if self.is_stale_terminal_entry(&entry) {
535                self.prune_stale_entry_path(&path)?;
536                continue;
537            }
538            if entry.status == AgentStatus::Active && matches(&entry) {
539                update_existing(&mut entry);
540                self.write_entry_file(&entry)?;
541                return Ok((entry, false));
542            }
543        }
544
545        loop {
546            let session_id = generate_agent_id();
547            let path = self.entry_path(&session_id)?;
548            if path.exists() {
549                continue;
550            }
551
552            let entry = build_entry(&session_id)?;
553            self.write_entry_file(&entry)?;
554            return Ok((entry, true));
555        }
556    }
557
558    /// Atomic "reap dead, then reserve" for a single thread.
559    ///
560    /// Under one registry write lock this:
561    /// 1. Prunes terminal-status entries past their TTL.
562    /// 2. Transitions any *active* entry whose recorded owner has died
563    ///    (per [`liveness_for`](Self::liveness_for)) to `Abandoned`.
564    /// 3. If a still-living active entry on `thread` remains, returns
565    ///    [`ReserveOutcome::LiveOwner`] with that entry.
566    /// 4. Otherwise builds and persists a new entry, returning
567    ///    [`ReserveOutcome::Reserved`].
568    ///
569    /// Both `cmd_agent_reserve` (the JSON API) and `start_thread`
570    /// route through here, so reaping and conflict detection share
571    /// exactly one code path.
572    pub fn try_reserve_thread<F>(&self, thread: &str, build_entry: F) -> Result<ReserveOutcome>
573    where
574        F: FnMut(&str) -> Result<AgentEntry>,
575    {
576        let _lock = self.write_lock()?;
577        std::fs::create_dir_all(&self.agents_dir)?;
578
579        let mut live_owner: Option<AgentEntry> = None;
580        for dir_entry in std::fs::read_dir(&self.agents_dir)? {
581            let dir_entry = dir_entry?;
582            let path = dir_entry.path();
583            if !path.extension().map(|e| e == "toml").unwrap_or(false) {
584                continue;
585            }
586            let Some(entry) = self.load_entry_from_path(&path)? else {
587                continue;
588            };
589            if self.is_stale_terminal_entry(&entry) {
590                self.prune_stale_entry_path(&path)?;
591                continue;
592            }
593            if entry.status != AgentStatus::Active || entry.thread != thread {
594                continue;
595            }
596            match Self::liveness_for(&entry) {
597                Liveness::Dead => {
598                    self.abandon_active_entry(entry)?;
599                }
600                Liveness::Alive | Liveness::Unknown => {
601                    // Unknown collapses to Alive here so we never
602                    // double-allocate a thread on insufficient evidence.
603                    live_owner = Some(entry);
604                    break;
605                }
606            }
607        }
608
609        if let Some(existing) = live_owner {
610            return Ok(ReserveOutcome::LiveOwner(existing));
611        }
612
613        let mut build_entry = build_entry;
614        loop {
615            let session_id = generate_agent_id();
616            let path = self.entry_path(&session_id)?;
617            if path.exists() {
618                continue;
619            }
620
621            let entry = build_entry(&session_id)?;
622            self.write_entry_file(&entry)?;
623            return Ok(ReserveOutcome::Reserved(entry));
624        }
625    }
626
627    /// Backwards-compatible thin wrapper around [`try_reserve_thread`]
628    /// that returns the same flat `Result<AgentEntry>` shape callers
629    /// historically expected. Live-owner conflicts surface as a
630    /// `HeddleError::Config(..)` with the same human-readable message
631    /// as before.
632    pub fn create_generated_entry_for_thread<F>(
633        &self,
634        thread: &str,
635        build_entry: F,
636    ) -> Result<AgentEntry>
637    where
638        F: FnMut(&str) -> Result<AgentEntry>,
639    {
640        match self.try_reserve_thread(thread, build_entry)? {
641            ReserveOutcome::Reserved(entry) => Ok(entry),
642            ReserveOutcome::LiveOwner(existing) => Err(HeddleError::Config(format!(
643                "thread '{}' already has active reservation {}. Use `heddle thread show {}` to inspect it, or release the session before starting another writer.",
644                thread, existing.session_id, thread
645            ))),
646        }
647    }
648
649    /// Find the active session whose visible or private execution root matches
650    /// the given worktree root.
651    pub fn find_active_by_path(&self, worktree_root: &Path) -> Result<Option<AgentEntry>> {
652        let canonical = worktree_root
653            .canonicalize()
654            .unwrap_or_else(|_| worktree_root.to_path_buf());
655        let entries = self.list()?;
656        Ok(entries
657            .into_iter()
658            .find(|e| e.status == AgentStatus::Active && entry_matches_root(e, &canonical)))
659    }
660
661    /// Find the active registry entry associated with the given Heddle session ID.
662    pub fn find_active_by_heddle_session_id(
663        &self,
664        heddle_session_id: &str,
665    ) -> Result<Option<AgentEntry>> {
666        let entries = self.list()?;
667        Ok(entries.into_iter().find(|entry| {
668            entry.status == AgentStatus::Active
669                && entry.heddle_session_id.as_deref() == Some(heddle_session_id)
670        }))
671    }
672
673    /// Find the active registry entry associated with a stable harness-side
674    /// client instance identifier.
675    pub fn find_active_by_client_instance_id(
676        &self,
677        client_instance_id: &str,
678    ) -> Result<Option<AgentEntry>> {
679        let entries = self.list()?;
680        Ok(entries.into_iter().find(|entry| {
681            entry.status == AgentStatus::Active
682                && entry.client_instance_id.as_deref() == Some(client_instance_id)
683        }))
684    }
685
686    /// Find the active registry entry associated with a harness-native actor key.
687    pub fn find_active_by_native_actor_key(
688        &self,
689        native_actor_key: &str,
690    ) -> Result<Option<AgentEntry>> {
691        let entries = self.list()?;
692        Ok(entries.into_iter().find(|entry| {
693            entry.status == AgentStatus::Active
694                && entry.native_actor_key.as_deref() == Some(native_actor_key)
695        }))
696    }
697
698    /// Return this actor's native parent chain, ordered root to leaf.
699    ///
700    /// The lookup intentionally follows harness-native actor keys rather than
701    /// thread names: subagents may work in lightweight directories or forked
702    /// threads, but the native parent key is the stable "who spawned whom"
703    /// edge that preserves Human -> agent -> agent attribution.
704    pub fn actor_chain_for_session(&self, session_id: &str) -> Result<Vec<ActorChainNode>> {
705        let entries = self.list()?;
706        let by_session: HashMap<&str, &AgentEntry> = entries
707            .iter()
708            .map(|entry| (entry.session_id.as_str(), entry))
709            .collect();
710        let by_native_key: HashMap<&str, &AgentEntry> = entries
711            .iter()
712            .filter_map(|entry| entry.native_actor_key.as_deref().map(|key| (key, entry)))
713            .collect();
714
715        let Some(mut current) = by_session.get(session_id).copied() else {
716            return Ok(Vec::new());
717        };
718        let mut leaf_to_root = vec![ActorChainNode::from(current)];
719        let mut seen = HashSet::from([current.session_id.as_str()]);
720
721        while let Some(parent_key) = current.native_parent_actor_key.as_deref() {
722            let Some(parent) = by_native_key.get(parent_key).copied() else {
723                break;
724            };
725            if !seen.insert(parent.session_id.as_str()) {
726                break;
727            }
728            leaf_to_root.push(ActorChainNode::from(parent));
729            current = parent;
730        }
731
732        leaf_to_root.reverse();
733        Ok(leaf_to_root)
734    }
735
736    /// Find the active registry entry associated with a harness-native instance
737    /// key inside the given worktree root.
738    pub fn find_active_by_native_instance_key_at_path(
739        &self,
740        native_instance_key: &str,
741        worktree_root: &Path,
742    ) -> Result<Option<AgentEntry>> {
743        let canonical = worktree_root
744            .canonicalize()
745            .unwrap_or_else(|_| worktree_root.to_path_buf());
746        let entries = self.list()?;
747        Ok(entries.into_iter().find(|entry| {
748            entry.status == AgentStatus::Active
749                && entry.native_instance_key.as_deref() == Some(native_instance_key)
750                && entry_matches_root(entry, &canonical)
751        }))
752    }
753
754    /// Append a context query to an active session's log.
755    ///
756    /// Best-effort: silently ignored if the session no longer exists or has completed.
757    pub fn log_context_query(&self, session_id: &str, query: ContextQueryEntry) -> Result<()> {
758        let _lock = self.write_lock()?;
759        let path = self.entry_path(session_id)?;
760        if let Some(mut entry) = self.load_entry_from_path(&path)?
761            && entry.status == AgentStatus::Active
762        {
763            entry.context_queries.push(query);
764            self.write_entry_file(&entry)?;
765        }
766        Ok(())
767    }
768
769    /// Delete an agent entry.
770    pub fn delete(&self, session_id: &str) -> Result<()> {
771        let path = self.entry_path(session_id)?;
772        if path.exists() {
773            std::fs::remove_file(path)?;
774        }
775        Ok(())
776    }
777}
778
779/// Generate a unique agent session identifier.
780///
781/// Uses 12 random bytes (96 bits) encoded as lowercase base32, giving
782/// a birthday-paradox collision probability of < 10⁻²⁰ at a million sessions.
783pub fn generate_agent_id() -> String {
784    let random_bytes: [u8; 12] = rand::random();
785    format!(
786        "agent-{}",
787        base32::encode(base32::Alphabet::Rfc4648 { padding: false }, &random_bytes).to_lowercase()
788    )
789}
790
791fn entry_matches_root(entry: &AgentEntry, canonical: &Path) -> bool {
792    entry
793        .path
794        .as_ref()
795        .map(|p| p.canonicalize().unwrap_or_else(|_| p.clone()) == canonical)
796        .unwrap_or(false)
797}
798
799#[cfg(test)]
800mod tests {
801    use tempfile::TempDir;
802
803    use super::*;
804
805    fn create_registry() -> (TempDir, AgentRegistry) {
806        let temp_dir = TempDir::new().unwrap();
807        let registry = AgentRegistry::new(&temp_dir.path().join(".heddle"));
808        (temp_dir, registry)
809    }
810
811    fn entry(session_id: &str, status: AgentStatus) -> AgentEntry {
812        AgentEntry {
813            session_id: session_id.to_string(),
814            client_instance_id: None,
815            native_actor_key: None,
816            native_parent_actor_key: None,
817            native_instance_key: None,
818            heddle_session_id: None,
819            thread_id: None,
820            thread: format!("agent/{session_id}"),
821            pid: None,
822            boot_id: None,
823            liveness_path: None,
824            heartbeat_at: None,
825            anchor_state: None,
826            anchor_root: None,
827            reservation_token: None,
828            path: None,
829            base_state: "hd-base".to_string(),
830            started_at: Utc::now(),
831            provider: None,
832            model: None,
833            harness: None,
834            thinking_level: None,
835            usage_summary: AgentUsageSummary::default(),
836            last_progress_at: None,
837            report_flush_state: None,
838            attach_reason: None,
839            task_assignment_id: None,
840            attach_precedence: vec![],
841            winning_attach_rule: None,
842            probe_source: None,
843            probe_confidence: None,
844            status,
845            completed_at: None,
846            context_queries: vec![],
847        }
848    }
849
850    #[test]
851    fn list_prunes_stale_completed_entries() {
852        let (_temp, registry) = create_registry();
853        let mut stale = entry("agent-stale", AgentStatus::Complete);
854        stale.completed_at = Some(Utc::now() - chrono::Duration::days(8));
855        let active = entry("agent-active", AgentStatus::Active);
856
857        registry.save(&stale).unwrap();
858        registry.save(&active).unwrap();
859
860        let entries = registry.list().unwrap();
861        assert_eq!(entries.len(), 1);
862        assert_eq!(entries[0].session_id, "agent-active");
863        assert!(registry.load("agent-stale").unwrap().is_none());
864    }
865
866    #[test]
867    fn load_returns_none_for_stale_completed_entry() {
868        let (_temp, registry) = create_registry();
869        let mut stale = entry("agent-stale", AgentStatus::Merged);
870        stale.completed_at = Some(Utc::now() - chrono::Duration::days(8));
871        registry.save(&stale).unwrap();
872
873        assert!(registry.load("agent-stale").unwrap().is_none());
874    }
875
876    #[test]
877    fn log_context_query_appends_to_active_session() {
878        let (_temp, registry) = create_registry();
879        let active = entry("agent-active", AgentStatus::Active);
880        registry.save(&active).unwrap();
881
882        let query = ContextQueryEntry {
883            path: "src/auth/session.rs".to_string(),
884            scope: Some("symbol:validate_token".to_string()),
885            queried_at: Utc::now(),
886        };
887        registry.log_context_query("agent-active", query).unwrap();
888
889        let loaded = registry.load("agent-active").unwrap().unwrap();
890        assert_eq!(loaded.context_queries.len(), 1);
891        assert_eq!(loaded.context_queries[0].path, "src/auth/session.rs");
892        assert_eq!(
893            loaded.context_queries[0].scope.as_deref(),
894            Some("symbol:validate_token")
895        );
896    }
897
898    #[test]
899    fn log_context_query_no_op_for_complete_session() {
900        let (_temp, registry) = create_registry();
901        let mut complete = entry("agent-done", AgentStatus::Complete);
902        complete.completed_at = Some(Utc::now());
903        registry.save(&complete).unwrap();
904
905        let query = ContextQueryEntry {
906            path: "src/lib.rs".to_string(),
907            scope: None,
908            queried_at: Utc::now(),
909        };
910        registry.log_context_query("agent-done", query).unwrap();
911
912        let loaded = registry.load("agent-done").unwrap().unwrap();
913        assert_eq!(loaded.context_queries.len(), 0);
914    }
915
916    #[test]
917    fn find_active_by_path_returns_matching_session() {
918        let (temp, registry) = create_registry();
919        let worktree = temp.path().join("checkout");
920        std::fs::create_dir_all(&worktree).unwrap();
921
922        let mut active = entry("agent-match", AgentStatus::Active);
923        active.path = Some(worktree.clone());
924        registry.save(&active).unwrap();
925
926        let mut other = entry("agent-other", AgentStatus::Active);
927        other.path = Some(temp.path().join("other-checkout"));
928        registry.save(&other).unwrap();
929
930        let found = registry.find_active_by_path(&worktree).unwrap();
931        assert!(found.is_some());
932        assert_eq!(found.unwrap().session_id, "agent-match");
933    }
934
935    #[test]
936    fn create_generated_entry_retries_collisions_under_lock() {
937        let (_temp, registry) = create_registry();
938        registry
939            .save(&entry("agent-existing", AgentStatus::Active))
940            .unwrap();
941
942        let mut ids = vec!["agent-existing".to_string(), "agent-new".to_string()].into_iter();
943        let created = registry
944            .create_generated_entry_with(
945                move || ids.next().unwrap(),
946                |session_id| {
947                    let mut entry = entry(session_id, AgentStatus::Active);
948                    entry.thread = format!("agent/{session_id}");
949                    Ok(entry)
950                },
951            )
952            .unwrap();
953
954        assert_eq!(created.session_id, "agent-new");
955        assert!(registry.load("agent-existing").unwrap().is_some());
956        assert!(registry.load("agent-new").unwrap().is_some());
957    }
958
959    /// Regression: `cmd_agent_reserve` reserves an Active entry via
960    /// `try_reserve_thread` *before* it advances the thread ref via
961    /// `set_thread_cas`. If the CAS races (another writer advanced the
962    /// thread between the pre-check and the CAS) and the caller doesn't
963    /// abandon the orphaned reservation, subsequent reservers see a
964    /// ghost live owner that can't be cleared until pid-liveness reaping
965    /// kicks in.
966    ///
967    /// The fix wraps the post-reserve work in a fallible block and
968    /// transitions the entry to `Abandoned` via `update_entry` if the
969    /// block returns Err. This test exercises that primitive: an
970    /// Abandoned entry must not block a fresh reservation, and the
971    /// next reserve must succeed cleanly.
972    #[test]
973    fn abandoning_active_entry_unblocks_subsequent_reserve_on_same_thread() {
974        let (_temp, registry) = create_registry();
975
976        // First reserve: Active entry is created. Stand in for what
977        // `cmd_agent_reserve`'s `try_reserve_thread` call writes.
978        let outcome = registry
979            .try_reserve_thread("feature/leak-repro", |session_id| {
980                let mut e = entry(session_id, AgentStatus::Active);
981                e.thread = "feature/leak-repro".to_string();
982                e.thread_id = Some("feature/leak-repro".to_string());
983                // pid 1 is always alive — keeps liveness honest if the
984                // test environment exposes a boot_id; we'll abandon
985                // explicitly below regardless.
986                e.pid = Some(1);
987                e.boot_id = crate::store::liveness::current_boot_id();
988                Ok(e)
989            })
990            .unwrap();
991        let session_id = match outcome {
992            ReserveOutcome::Reserved(entry) => entry.session_id,
993            ReserveOutcome::LiveOwner(_) => panic!("first reserve must succeed"),
994        };
995
996        // Simulate the cleanup path: post-reserve work failed (e.g.,
997        // `set_thread_cas` lost a race), so we transition the entry to
998        // Abandoned and bubble the original error to the caller. The
999        // caller never saw this session_id in JSON, so no orchestrator
1000        // is holding it.
1001        registry
1002            .update_entry(&session_id, |entry| {
1003                entry.status = AgentStatus::Abandoned;
1004                entry.completed_at = Some(Utc::now());
1005            })
1006            .unwrap();
1007
1008        // Second reserve on the same thread must succeed: `try_reserve_thread`
1009        // skips Abandoned entries when deciding live-owner vs reuse, so
1010        // no ghost row blocks us.
1011        let next = registry
1012            .try_reserve_thread("feature/leak-repro", |session_id| {
1013                let mut e = entry(session_id, AgentStatus::Active);
1014                e.thread = "feature/leak-repro".to_string();
1015                e.thread_id = Some("feature/leak-repro".to_string());
1016                e.pid = Some(1);
1017                e.boot_id = crate::store::liveness::current_boot_id();
1018                Ok(e)
1019            })
1020            .unwrap();
1021        assert!(
1022            matches!(next, ReserveOutcome::Reserved(_)),
1023            "after the orphaned reservation is abandoned, the next reserve must succeed: {next:?}"
1024        );
1025
1026        // The registry now holds exactly one Active entry on this
1027        // thread — the new one — plus the Abandoned husk. Counting
1028        // Active by thread should be 1, not 2.
1029        let active_count = registry
1030            .list()
1031            .unwrap()
1032            .into_iter()
1033            .filter(|e| e.thread == "feature/leak-repro" && e.status == AgentStatus::Active)
1034            .count();
1035        assert_eq!(
1036            active_count, 1,
1037            "exactly one Active reservation must own the thread after rollback + retry"
1038        );
1039    }
1040
1041    #[test]
1042    fn update_entry_persists_harness_metadata() {
1043        let (_temp, registry) = create_registry();
1044        let active = entry("agent-active", AgentStatus::Active);
1045        registry.save(&active).unwrap();
1046
1047        registry
1048            .update_entry("agent-active", |entry| {
1049                entry.heddle_session_id = Some("sess-123".to_string());
1050                entry.harness = Some("claude-code".to_string());
1051                entry.thinking_level = Some("deep".to_string());
1052                entry.report_flush_state = Some("pending-local".to_string());
1053                entry.attach_reason = Some("attached from test metadata update".to_string());
1054                entry.attach_precedence = vec!["matched-current-session".to_string()];
1055                entry.winning_attach_rule = Some("matched-current-session".to_string());
1056                entry.probe_source = Some("argv_env".to_string());
1057                entry.probe_confidence = Some(0.75);
1058                entry.last_progress_at = Some(Utc::now());
1059                entry.usage_summary.input_tokens = Some(42);
1060            })
1061            .unwrap();
1062
1063        let loaded = registry.load("agent-active").unwrap().unwrap();
1064        assert_eq!(loaded.heddle_session_id.as_deref(), Some("sess-123"));
1065        assert_eq!(loaded.harness.as_deref(), Some("claude-code"));
1066        assert_eq!(loaded.thinking_level.as_deref(), Some("deep"));
1067        assert_eq!(loaded.report_flush_state.as_deref(), Some("pending-local"));
1068        assert_eq!(
1069            loaded.attach_reason.as_deref(),
1070            Some("attached from test metadata update")
1071        );
1072        assert_eq!(loaded.attach_precedence, vec!["matched-current-session"]);
1073        assert_eq!(
1074            loaded.winning_attach_rule.as_deref(),
1075            Some("matched-current-session")
1076        );
1077        assert_eq!(loaded.probe_source.as_deref(), Some("argv_env"));
1078        assert_eq!(loaded.probe_confidence, Some(0.75));
1079        assert_eq!(loaded.usage_summary.input_tokens, Some(42));
1080        assert!(loaded.last_progress_at.is_some());
1081    }
1082
1083    #[test]
1084    fn find_active_by_client_instance_id_returns_matching_session() {
1085        let (_temp, registry) = create_registry();
1086        let mut active = entry("agent-client", AgentStatus::Active);
1087        active.client_instance_id = Some("client-a".to_string());
1088        registry.save(&active).unwrap();
1089
1090        let mut other = entry("agent-other", AgentStatus::Active);
1091        other.client_instance_id = Some("client-b".to_string());
1092        registry.save(&other).unwrap();
1093
1094        let found = registry
1095            .find_active_by_client_instance_id("client-a")
1096            .unwrap()
1097            .unwrap();
1098        assert_eq!(found.session_id, "agent-client");
1099    }
1100
1101    #[test]
1102    fn find_active_by_native_actor_key_returns_matching_session() {
1103        let (_temp, registry) = create_registry();
1104        let mut active = entry("agent-native", AgentStatus::Active);
1105        active.native_actor_key = Some("codex:thread:thr_123".to_string());
1106        registry.save(&active).unwrap();
1107
1108        let found = registry
1109            .find_active_by_native_actor_key("codex:thread:thr_123")
1110            .unwrap()
1111            .unwrap();
1112        assert_eq!(found.session_id, "agent-native");
1113    }
1114
1115    #[test]
1116    fn actor_chain_follows_native_parent_keys_root_to_leaf() {
1117        let (_temp, registry) = create_registry();
1118        let mut root = entry("agent-root", AgentStatus::Active);
1119        root.native_actor_key = Some("human:foo".to_string());
1120        root.provider = Some("human".to_string());
1121
1122        let mut parent = entry("agent-parent", AgentStatus::Active);
1123        parent.native_actor_key = Some("codex:thread:parent".to_string());
1124        parent.native_parent_actor_key = Some("human:foo".to_string());
1125        parent.provider = Some("openai".to_string());
1126        parent.model = Some("gpt-5".to_string());
1127
1128        let mut child = entry("agent-child", AgentStatus::Active);
1129        child.native_actor_key = Some("codex:thread:child".to_string());
1130        child.native_parent_actor_key = Some("codex:thread:parent".to_string());
1131        child.provider = Some("openai".to_string());
1132        child.model = Some("gpt-5-mini".to_string());
1133
1134        registry.save(&child).unwrap();
1135        registry.save(&root).unwrap();
1136        registry.save(&parent).unwrap();
1137
1138        let chain = registry.actor_chain_for_session("agent-child").unwrap();
1139        let ids: Vec<_> = chain.iter().map(|node| node.session_id.as_str()).collect();
1140        assert_eq!(ids, vec!["agent-root", "agent-parent", "agent-child"]);
1141        assert_eq!(
1142            chain[2].native_parent_actor_key.as_deref(),
1143            Some("codex:thread:parent")
1144        );
1145    }
1146
1147    #[test]
1148    fn try_reserve_thread_reaps_dead_active_entry_and_succeeds() {
1149        let (_temp, registry) = create_registry();
1150        let mut dead = entry("agent-dead", AgentStatus::Active);
1151        dead.thread = "feature/race".to_string();
1152        // PID 0x7fff_ffff is unassignable on Linux/macOS. Combined with
1153        // a stale boot id this is unambiguously dead.
1154        dead.pid = Some(0x7fff_ffff);
1155        dead.boot_id = Some("not-the-current-boot".to_string());
1156        registry.save(&dead).unwrap();
1157
1158        let outcome = registry
1159            .try_reserve_thread("feature/race", |session_id| {
1160                let mut new = entry(session_id, AgentStatus::Active);
1161                new.thread = "feature/race".to_string();
1162                new.pid = Some(std::process::id());
1163                new.boot_id = crate::store::liveness::current_boot_id();
1164                Ok(new)
1165            })
1166            .unwrap();
1167
1168        match outcome {
1169            ReserveOutcome::Reserved(entry) => assert_ne!(entry.session_id, "agent-dead"),
1170            ReserveOutcome::LiveOwner(_) => panic!("dead owner should have been reaped"),
1171        }
1172        let abandoned = registry.load("agent-dead").unwrap().unwrap();
1173        assert_eq!(abandoned.status, AgentStatus::Abandoned);
1174        assert!(abandoned.completed_at.is_some());
1175    }
1176
1177    #[test]
1178    fn try_reserve_thread_reports_live_owner_when_pid_is_alive() {
1179        let (_temp, registry) = create_registry();
1180        let mut alive = entry("agent-alive", AgentStatus::Active);
1181        alive.thread = "feature/busy".to_string();
1182        alive.pid = Some(std::process::id());
1183        alive.boot_id = crate::store::liveness::current_boot_id();
1184        registry.save(&alive).unwrap();
1185
1186        let outcome = registry
1187            .try_reserve_thread("feature/busy", |session_id| {
1188                let mut new = entry(session_id, AgentStatus::Active);
1189                new.thread = "feature/busy".to_string();
1190                Ok(new)
1191            })
1192            .unwrap();
1193
1194        match outcome {
1195            ReserveOutcome::Reserved(_) => panic!("live owner should have blocked reservation"),
1196            ReserveOutcome::LiveOwner(existing) => assert_eq!(existing.session_id, "agent-alive"),
1197        }
1198        let still_alive = registry.load("agent-alive").unwrap().unwrap();
1199        assert_eq!(still_alive.status, AgentStatus::Active);
1200    }
1201
1202    #[test]
1203    fn reap_dead_for_thread_only_touches_named_thread() {
1204        let (_temp, registry) = create_registry();
1205        let mut dead_a = entry("agent-dead-a", AgentStatus::Active);
1206        dead_a.thread = "feature/a".to_string();
1207        dead_a.pid = Some(0x7fff_ffff);
1208        dead_a.boot_id = Some("stale".to_string());
1209        let mut dead_b = entry("agent-dead-b", AgentStatus::Active);
1210        dead_b.thread = "feature/b".to_string();
1211        dead_b.pid = Some(0x7fff_ffff);
1212        dead_b.boot_id = Some("stale".to_string());
1213        registry.save(&dead_a).unwrap();
1214        registry.save(&dead_b).unwrap();
1215
1216        let reaped = registry.reap_dead_for_thread("feature/a").unwrap();
1217        assert_eq!(reaped, 1);
1218        assert_eq!(
1219            registry.load("agent-dead-a").unwrap().unwrap().status,
1220            AgentStatus::Abandoned
1221        );
1222        assert_eq!(
1223            registry.load("agent-dead-b").unwrap().unwrap().status,
1224            AgentStatus::Active,
1225            "untargeted thread should not be reaped"
1226        );
1227
1228        let reaped_all = registry.reap_dead().unwrap();
1229        assert_eq!(reaped_all, 1);
1230        assert_eq!(
1231            registry.load("agent-dead-b").unwrap().unwrap().status,
1232            AgentStatus::Abandoned
1233        );
1234    }
1235
1236    #[test]
1237    fn actor_chain_breaks_cycles_without_looping() {
1238        let (_temp, registry) = create_registry();
1239        let mut a = entry("agent-a", AgentStatus::Active);
1240        a.native_actor_key = Some("actor:a".to_string());
1241        a.native_parent_actor_key = Some("actor:b".to_string());
1242        let mut b = entry("agent-b", AgentStatus::Active);
1243        b.native_actor_key = Some("actor:b".to_string());
1244        b.native_parent_actor_key = Some("actor:a".to_string());
1245        registry.save(&a).unwrap();
1246        registry.save(&b).unwrap();
1247
1248        let chain = registry.actor_chain_for_session("agent-a").unwrap();
1249        assert_eq!(chain.len(), 2);
1250        assert_eq!(chain.last().unwrap().session_id, "agent-a");
1251    }
1252
1253    /// Concurrent-reservation soak. The W5b reservation contract:
1254    /// when N threads race to reserve the same thread name from the
1255    /// same anchor, exactly one wins (`ReserveOutcome::Reserved`) and
1256    /// the rest get `ReserveOutcome::LiveOwner` carrying the winner's
1257    /// session id. Tested 100× with 8 racers per round to catch any
1258    /// rare lock-acquisition flake.
1259    ///
1260    /// `#[ignore]` because this iterates 800 reservations and takes
1261    /// a few seconds; runs in `--include-ignored` sweeps and the
1262    /// nightly real-world matrix.
1263    #[test]
1264    #[ignore = "soak: 100× concurrent reservation race"]
1265    fn try_reserve_thread_under_concurrent_load_is_race_free() {
1266        use std::sync::{Arc, Barrier};
1267
1268        const ROUNDS: usize = 100;
1269        const RACERS: usize = 8;
1270
1271        for round in 0..ROUNDS {
1272            // Fresh registry each round so the previous round's
1273            // winner doesn't bias the next round (also models the
1274            // real "release between batches" pattern).
1275            let (_temp, registry) = create_registry();
1276            let registry = Arc::new(registry);
1277            let barrier = Arc::new(Barrier::new(RACERS));
1278            let thread_name = format!("feature/race-{round}");
1279
1280            let handles: Vec<_> = (0..RACERS)
1281                .map(|racer_idx| {
1282                    let registry = Arc::clone(&registry);
1283                    let barrier = Arc::clone(&barrier);
1284                    let thread_name = thread_name.clone();
1285                    std::thread::spawn(move || {
1286                        // All racers wait on the barrier so the
1287                        // contention is maximised — without this the
1288                        // OS scheduler can serialize them and the
1289                        // test passes trivially.
1290                        barrier.wait();
1291                        let outcome = registry.try_reserve_thread(&thread_name, |session_id| {
1292                            let mut entry = entry(
1293                                &format!("agent-{racer_idx}-{session_id}"),
1294                                AgentStatus::Active,
1295                            );
1296                            entry.thread = thread_name.clone();
1297                            entry.pid = Some(std::process::id());
1298                            entry.boot_id = crate::store::liveness::current_boot_id();
1299                            Ok(entry)
1300                        });
1301                        outcome.expect("reservation call must not error")
1302                    })
1303                })
1304                .collect();
1305
1306            let outcomes: Vec<ReserveOutcome> = handles
1307                .into_iter()
1308                .map(|h| h.join().expect("racer panic"))
1309                .collect();
1310
1311            let reserved_count = outcomes
1312                .iter()
1313                .filter(|o| matches!(o, ReserveOutcome::Reserved(_)))
1314                .count();
1315            let live_owner_count = outcomes
1316                .iter()
1317                .filter(|o| matches!(o, ReserveOutcome::LiveOwner(_)))
1318                .count();
1319
1320            assert_eq!(
1321                reserved_count, 1,
1322                "round {round}: exactly one racer must win the reservation; got {reserved_count}"
1323            );
1324            assert_eq!(
1325                live_owner_count,
1326                RACERS - 1,
1327                "round {round}: every loser must get a LiveOwner outcome; \
1328                 reserved={reserved_count} live_owner={live_owner_count}"
1329            );
1330
1331            // Strong invariant: every LiveOwner outcome's `existing`
1332            // entry must be the winner's session — not some stale
1333            // dead entry, not a different racer's losing-and-then-
1334            // dead entry.
1335            let winner_session = outcomes
1336                .iter()
1337                .find_map(|o| match o {
1338                    ReserveOutcome::Reserved(entry) => Some(entry.session_id.clone()),
1339                    _ => None,
1340                })
1341                .expect("a winner must exist");
1342            for outcome in &outcomes {
1343                if let ReserveOutcome::LiveOwner(existing) = outcome {
1344                    assert_eq!(
1345                        existing.session_id, winner_session,
1346                        "round {round}: LiveOwner conflicts must point at the actual winner; \
1347                         got {} expected {winner_session}",
1348                        existing.session_id
1349                    );
1350                }
1351            }
1352        }
1353    }
1354}