1use std::{
9 collections::{HashMap, HashSet},
10 path::{Path, PathBuf},
11};
12
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15
16use crate::{
17 fs_atomic::write_file_atomic,
18 lock::RepoLock,
19 store::{
20 HeddleError, Result,
21 liveness::{Liveness, is_owner_alive},
22 },
23};
24
25const STALE_AGENT_TTL_DAYS: i64 = 7;
26
27#[derive(Debug)]
35pub enum ReserveOutcome {
36 Reserved(AgentEntry),
38 LiveOwner(AgentEntry),
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ContextQueryEntry {
45 pub path: String,
47 pub scope: Option<String>,
49 pub queried_at: DateTime<Utc>,
51}
52
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct AgentUsageSummary {
55 #[serde(default)]
56 pub input_tokens: Option<u64>,
57 #[serde(default)]
58 pub output_tokens: Option<u64>,
59 #[serde(default)]
60 pub reasoning_tokens: Option<u64>,
61 #[serde(default)]
62 pub tool_calls: Option<u32>,
63 #[serde(default)]
64 pub cost_micros_usd: Option<u64>,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum AgentStatus {
71 Active,
73 Abandoned,
75 Complete,
77 Merged,
79}
80
81impl std::fmt::Display for AgentStatus {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 match self {
84 AgentStatus::Active => write!(f, "active"),
85 AgentStatus::Abandoned => write!(f, "abandoned"),
86 AgentStatus::Complete => write!(f, "complete"),
87 AgentStatus::Merged => write!(f, "merged"),
88 }
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct AgentEntry {
95 pub session_id: String,
97 #[serde(default)]
100 pub client_instance_id: Option<String>,
101 #[serde(default)]
103 pub native_actor_key: Option<String>,
104 #[serde(default)]
106 pub native_parent_actor_key: Option<String>,
107 #[serde(default)]
109 pub native_instance_key: Option<String>,
110 #[serde(default)]
113 pub heddle_session_id: Option<String>,
114 #[serde(default)]
116 pub thread_id: Option<String>,
117 pub thread: String,
119 #[serde(default)]
121 pub pid: Option<u32>,
122 #[serde(default)]
124 pub boot_id: Option<String>,
125 #[serde(default)]
127 pub liveness_path: Option<PathBuf>,
128 #[serde(default)]
130 pub heartbeat_at: Option<DateTime<Utc>>,
131 #[serde(default)]
133 pub anchor_state: Option<String>,
134 #[serde(default)]
136 pub anchor_root: Option<String>,
137 #[serde(default)]
139 pub reservation_token: Option<String>,
140 #[serde(default)]
142 pub path: Option<PathBuf>,
143 pub base_state: String,
145 pub started_at: DateTime<Utc>,
147 pub provider: Option<String>,
149 pub model: Option<String>,
151 #[serde(default)]
153 pub harness: Option<String>,
154 #[serde(default)]
156 pub thinking_level: Option<String>,
157 #[serde(default)]
159 pub usage_summary: AgentUsageSummary,
160 #[serde(default)]
162 pub last_progress_at: Option<DateTime<Utc>>,
163 #[serde(default)]
165 pub report_flush_state: Option<String>,
166 #[serde(default)]
169 pub attach_reason: Option<String>,
170 #[serde(default)]
172 pub task_assignment_id: Option<String>,
173 #[serde(default)]
175 pub attach_precedence: Vec<String>,
176 #[serde(default)]
178 pub winning_attach_rule: Option<String>,
179 #[serde(default)]
181 pub probe_source: Option<String>,
182 #[serde(default)]
184 pub probe_confidence: Option<f32>,
185 pub status: AgentStatus,
187 #[serde(default)]
189 pub completed_at: Option<DateTime<Utc>>,
190 #[serde(default)]
193 pub context_queries: Vec<ContextQueryEntry>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
198pub struct ActorChainNode {
199 pub session_id: String,
200 #[serde(default)]
201 pub native_actor_key: Option<String>,
202 #[serde(default)]
203 pub native_parent_actor_key: Option<String>,
204 pub thread: String,
205 pub status: AgentStatus,
206 #[serde(default)]
207 pub provider: Option<String>,
208 #[serde(default)]
209 pub model: Option<String>,
210 #[serde(default)]
211 pub harness: Option<String>,
212}
213
214impl From<&AgentEntry> for ActorChainNode {
215 fn from(entry: &AgentEntry) -> Self {
216 Self {
217 session_id: entry.session_id.clone(),
218 native_actor_key: entry.native_actor_key.clone(),
219 native_parent_actor_key: entry.native_parent_actor_key.clone(),
220 thread: entry.thread.clone(),
221 status: entry.status.clone(),
222 provider: entry.provider.clone(),
223 model: entry.model.clone(),
224 harness: entry.harness.clone(),
225 }
226 }
227}
228
229pub struct AgentRegistry {
231 agents_dir: PathBuf,
232}
233
234impl AgentRegistry {
235 pub fn new(heddle_dir: &Path) -> Self {
237 Self {
238 agents_dir: heddle_dir.join("agents"),
239 }
240 }
241
242 fn entry_path(&self, session_id: &str) -> Result<PathBuf> {
243 if session_id.is_empty()
247 || !session_id
248 .bytes()
249 .all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
250 {
251 return Err(HeddleError::Config(format!(
252 "invalid session ID '{}': only lowercase alphanumeric and hyphens allowed",
253 session_id
254 )));
255 }
256 Ok(self.agents_dir.join(format!("{}.toml", session_id)))
257 }
258
259 fn lock_path(&self) -> PathBuf {
260 self.agents_dir.join(".lock")
261 }
262
263 fn write_lock(&self) -> Result<crate::lock::WriteLockGuard> {
264 RepoLock::at(self.lock_path()).write().map_err(|err| {
265 HeddleError::Config(format!("failed to acquire agent registry lock: {err}"))
266 })
267 }
268
269 fn write_entry_file(&self, entry: &AgentEntry) -> Result<()> {
270 std::fs::create_dir_all(&self.agents_dir)?;
271 let path = self.entry_path(&entry.session_id)?;
272 let content =
273 toml::to_string_pretty(entry).map_err(|e| HeddleError::Config(e.to_string()))?;
274 Ok(write_file_atomic(&path, content.as_bytes())?)
275 }
276
277 fn load_entry_from_path(&self, path: &Path) -> Result<Option<AgentEntry>> {
278 if !path.exists() {
279 return Ok(None);
280 }
281
282 let content = std::fs::read_to_string(path)?;
283 let entry = toml::from_str(&content).map_err(|e| HeddleError::Config(e.to_string()))?;
284 Ok(Some(entry))
285 }
286
287 fn is_stale_terminal_entry(&self, entry: &AgentEntry) -> bool {
288 if matches!(entry.status, AgentStatus::Active) {
289 return false;
290 }
291
292 let terminal_at = entry.completed_at.unwrap_or(entry.started_at);
293 terminal_at <= Utc::now() - chrono::Duration::days(STALE_AGENT_TTL_DAYS)
294 }
295
296 fn prune_stale_entry_path(&self, path: &Path) -> Result<()> {
297 if path.exists() {
298 std::fs::remove_file(path)?;
299 }
300 Ok(())
301 }
302
303 pub fn liveness_for(entry: &AgentEntry) -> Liveness {
307 if entry.status != AgentStatus::Active {
308 return Liveness::Dead;
309 }
310 is_owner_alive(entry.pid, entry.boot_id.as_deref())
311 }
312
313 fn abandon_active_entry(&self, mut entry: AgentEntry) -> Result<AgentEntry> {
316 entry.status = AgentStatus::Abandoned;
317 entry.completed_at = Some(Utc::now());
318 self.write_entry_file(&entry)?;
319 Ok(entry)
320 }
321
322 fn reap_dead_locked(&self, thread_filter: Option<&str>) -> Result<usize> {
327 if !self.agents_dir.exists() {
328 return Ok(0);
329 }
330 let mut reaped = 0usize;
331 for dir_entry in std::fs::read_dir(&self.agents_dir)? {
332 let dir_entry = dir_entry?;
333 let path = dir_entry.path();
334 if !path.extension().map(|e| e == "toml").unwrap_or(false) {
335 continue;
336 }
337 let Some(entry) = self.load_entry_from_path(&path)? else {
338 continue;
339 };
340 if entry.status != AgentStatus::Active {
341 continue;
342 }
343 if let Some(name) = thread_filter
344 && entry.thread != name
345 {
346 continue;
347 }
348 if Self::liveness_for(&entry) == Liveness::Dead {
349 self.abandon_active_entry(entry)?;
350 reaped += 1;
351 }
352 }
353 Ok(reaped)
354 }
355
356 pub fn reap_dead_for_thread(&self, thread: &str) -> Result<usize> {
364 let _lock = self.write_lock()?;
365 self.reap_dead_locked(Some(thread))
366 }
367
368 pub fn reap_dead(&self) -> Result<usize> {
370 let _lock = self.write_lock()?;
371 self.reap_dead_locked(None)
372 }
373
374 fn create_generated_entry_with<F, G>(
375 &self,
376 mut generate_id: G,
377 mut build_entry: F,
378 ) -> Result<AgentEntry>
379 where
380 F: FnMut(&str) -> Result<AgentEntry>,
381 G: FnMut() -> String,
382 {
383 let _lock = self.write_lock()?;
384
385 loop {
386 let session_id = generate_id();
387 let path = self.entry_path(&session_id)?;
388 if path.exists() {
389 continue;
390 }
391
392 let entry = build_entry(&session_id)?;
393 self.write_entry_file(&entry)?;
394 return Ok(entry);
395 }
396 }
397
398 pub fn create_generated_entry<F>(&self, build_entry: F) -> Result<AgentEntry>
400 where
401 F: FnMut(&str) -> Result<AgentEntry>,
402 {
403 self.create_generated_entry_with(generate_agent_id, build_entry)
404 }
405
406 pub fn save(&self, entry: &AgentEntry) -> Result<()> {
411 let _lock = self.write_lock()?;
412 self.write_entry_file(entry)
413 }
414
415 pub fn load(&self, session_id: &str) -> Result<Option<AgentEntry>> {
417 let path = self.entry_path(session_id)?;
418 let Some(entry) = self.load_entry_from_path(&path)? else {
419 return Ok(None);
420 };
421
422 if self.is_stale_terminal_entry(&entry) {
423 let _lock = self.write_lock()?;
424 if let Some(latest) = self.load_entry_from_path(&path)?
425 && self.is_stale_terminal_entry(&latest)
426 {
427 self.prune_stale_entry_path(&path)?;
428 return Ok(None);
429 }
430 }
431
432 Ok(Some(entry))
433 }
434
435 pub fn list(&self) -> Result<Vec<AgentEntry>> {
437 if !self.agents_dir.exists() {
438 return Ok(Vec::new());
439 }
440
441 let mut stale_paths = Vec::new();
442 let mut entries = Vec::new();
443 for dir_entry in std::fs::read_dir(&self.agents_dir)? {
444 let dir_entry = dir_entry?;
445 let path = dir_entry.path();
446 if path.extension().map(|e| e == "toml").unwrap_or(false) {
447 let content = std::fs::read_to_string(&path)?;
448 let entry = toml::from_str::<AgentEntry>(&content).map_err(|err| {
449 HeddleError::Config(format!(
450 "failed to parse agent registry entry '{}': {err}",
451 path.display()
452 ))
453 })?;
454 if self.is_stale_terminal_entry(&entry) {
455 stale_paths.push(path);
456 } else {
457 entries.push(entry);
458 }
459 }
460 }
461
462 if !stale_paths.is_empty() {
463 let _lock = self.write_lock()?;
464 for path in stale_paths {
465 if let Some(entry) = self.load_entry_from_path(&path)?
466 && self.is_stale_terminal_entry(&entry)
467 {
468 self.prune_stale_entry_path(&path)?;
469 }
470 }
471 }
472
473 entries.sort_by_key(|a| std::cmp::Reverse(a.started_at));
474 Ok(entries)
475 }
476
477 pub fn update_status(&self, session_id: &str, status: AgentStatus) -> Result<()> {
479 let _lock = self.write_lock()?;
480 let path = self.entry_path(session_id)?;
481 if let Some(mut entry) = self.load_entry_from_path(&path)? {
482 entry.status = status;
483 entry.completed_at = match entry.status {
484 AgentStatus::Active => None,
485 AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
486 Some(Utc::now())
487 }
488 };
489 self.write_entry_file(&entry)?;
490 }
491 Ok(())
492 }
493
494 pub fn update_entry<F>(&self, session_id: &str, mut update: F) -> Result<Option<AgentEntry>>
496 where
497 F: FnMut(&mut AgentEntry),
498 {
499 let _lock = self.write_lock()?;
500 let path = self.entry_path(session_id)?;
501 let Some(mut entry) = self.load_entry_from_path(&path)? else {
502 return Ok(None);
503 };
504 update(&mut entry);
505 self.write_entry_file(&entry)?;
506 Ok(Some(entry))
507 }
508
509 pub fn find_or_create_active_entry<FMatch, FUpdate, FBuild>(
512 &self,
513 mut matches: FMatch,
514 mut update_existing: FUpdate,
515 mut build_entry: FBuild,
516 ) -> Result<(AgentEntry, bool)>
517 where
518 FMatch: FnMut(&AgentEntry) -> bool,
519 FUpdate: FnMut(&mut AgentEntry),
520 FBuild: FnMut(&str) -> Result<AgentEntry>,
521 {
522 let _lock = self.write_lock()?;
523 std::fs::create_dir_all(&self.agents_dir)?;
524
525 for dir_entry in std::fs::read_dir(&self.agents_dir)? {
526 let dir_entry = dir_entry?;
527 let path = dir_entry.path();
528 if !path.extension().map(|e| e == "toml").unwrap_or(false) {
529 continue;
530 }
531 let Some(mut entry) = self.load_entry_from_path(&path)? else {
532 continue;
533 };
534 if self.is_stale_terminal_entry(&entry) {
535 self.prune_stale_entry_path(&path)?;
536 continue;
537 }
538 if entry.status == AgentStatus::Active && matches(&entry) {
539 update_existing(&mut entry);
540 self.write_entry_file(&entry)?;
541 return Ok((entry, false));
542 }
543 }
544
545 loop {
546 let session_id = generate_agent_id();
547 let path = self.entry_path(&session_id)?;
548 if path.exists() {
549 continue;
550 }
551
552 let entry = build_entry(&session_id)?;
553 self.write_entry_file(&entry)?;
554 return Ok((entry, true));
555 }
556 }
557
558 pub fn try_reserve_thread<F>(&self, thread: &str, build_entry: F) -> Result<ReserveOutcome>
573 where
574 F: FnMut(&str) -> Result<AgentEntry>,
575 {
576 let _lock = self.write_lock()?;
577 std::fs::create_dir_all(&self.agents_dir)?;
578
579 let mut live_owner: Option<AgentEntry> = None;
580 for dir_entry in std::fs::read_dir(&self.agents_dir)? {
581 let dir_entry = dir_entry?;
582 let path = dir_entry.path();
583 if !path.extension().map(|e| e == "toml").unwrap_or(false) {
584 continue;
585 }
586 let Some(entry) = self.load_entry_from_path(&path)? else {
587 continue;
588 };
589 if self.is_stale_terminal_entry(&entry) {
590 self.prune_stale_entry_path(&path)?;
591 continue;
592 }
593 if entry.status != AgentStatus::Active || entry.thread != thread {
594 continue;
595 }
596 match Self::liveness_for(&entry) {
597 Liveness::Dead => {
598 self.abandon_active_entry(entry)?;
599 }
600 Liveness::Alive | Liveness::Unknown => {
601 live_owner = Some(entry);
604 break;
605 }
606 }
607 }
608
609 if let Some(existing) = live_owner {
610 return Ok(ReserveOutcome::LiveOwner(existing));
611 }
612
613 let mut build_entry = build_entry;
614 loop {
615 let session_id = generate_agent_id();
616 let path = self.entry_path(&session_id)?;
617 if path.exists() {
618 continue;
619 }
620
621 let entry = build_entry(&session_id)?;
622 self.write_entry_file(&entry)?;
623 return Ok(ReserveOutcome::Reserved(entry));
624 }
625 }
626
627 pub fn create_generated_entry_for_thread<F>(
633 &self,
634 thread: &str,
635 build_entry: F,
636 ) -> Result<AgentEntry>
637 where
638 F: FnMut(&str) -> Result<AgentEntry>,
639 {
640 match self.try_reserve_thread(thread, build_entry)? {
641 ReserveOutcome::Reserved(entry) => Ok(entry),
642 ReserveOutcome::LiveOwner(existing) => Err(HeddleError::Config(format!(
643 "thread '{}' already has active reservation {}. Use `heddle thread show {}` to inspect it, or release the session before starting another writer.",
644 thread, existing.session_id, thread
645 ))),
646 }
647 }
648
649 pub fn find_active_by_path(&self, worktree_root: &Path) -> Result<Option<AgentEntry>> {
652 let canonical = worktree_root
653 .canonicalize()
654 .unwrap_or_else(|_| worktree_root.to_path_buf());
655 let entries = self.list()?;
656 Ok(entries
657 .into_iter()
658 .find(|e| e.status == AgentStatus::Active && entry_matches_root(e, &canonical)))
659 }
660
661 pub fn find_active_by_heddle_session_id(
663 &self,
664 heddle_session_id: &str,
665 ) -> Result<Option<AgentEntry>> {
666 let entries = self.list()?;
667 Ok(entries.into_iter().find(|entry| {
668 entry.status == AgentStatus::Active
669 && entry.heddle_session_id.as_deref() == Some(heddle_session_id)
670 }))
671 }
672
673 pub fn find_active_by_client_instance_id(
676 &self,
677 client_instance_id: &str,
678 ) -> Result<Option<AgentEntry>> {
679 let entries = self.list()?;
680 Ok(entries.into_iter().find(|entry| {
681 entry.status == AgentStatus::Active
682 && entry.client_instance_id.as_deref() == Some(client_instance_id)
683 }))
684 }
685
686 pub fn find_active_by_native_actor_key(
688 &self,
689 native_actor_key: &str,
690 ) -> Result<Option<AgentEntry>> {
691 let entries = self.list()?;
692 Ok(entries.into_iter().find(|entry| {
693 entry.status == AgentStatus::Active
694 && entry.native_actor_key.as_deref() == Some(native_actor_key)
695 }))
696 }
697
698 pub fn actor_chain_for_session(&self, session_id: &str) -> Result<Vec<ActorChainNode>> {
705 let entries = self.list()?;
706 let by_session: HashMap<&str, &AgentEntry> = entries
707 .iter()
708 .map(|entry| (entry.session_id.as_str(), entry))
709 .collect();
710 let by_native_key: HashMap<&str, &AgentEntry> = entries
711 .iter()
712 .filter_map(|entry| entry.native_actor_key.as_deref().map(|key| (key, entry)))
713 .collect();
714
715 let Some(mut current) = by_session.get(session_id).copied() else {
716 return Ok(Vec::new());
717 };
718 let mut leaf_to_root = vec![ActorChainNode::from(current)];
719 let mut seen = HashSet::from([current.session_id.as_str()]);
720
721 while let Some(parent_key) = current.native_parent_actor_key.as_deref() {
722 let Some(parent) = by_native_key.get(parent_key).copied() else {
723 break;
724 };
725 if !seen.insert(parent.session_id.as_str()) {
726 break;
727 }
728 leaf_to_root.push(ActorChainNode::from(parent));
729 current = parent;
730 }
731
732 leaf_to_root.reverse();
733 Ok(leaf_to_root)
734 }
735
736 pub fn find_active_by_native_instance_key_at_path(
739 &self,
740 native_instance_key: &str,
741 worktree_root: &Path,
742 ) -> Result<Option<AgentEntry>> {
743 let canonical = worktree_root
744 .canonicalize()
745 .unwrap_or_else(|_| worktree_root.to_path_buf());
746 let entries = self.list()?;
747 Ok(entries.into_iter().find(|entry| {
748 entry.status == AgentStatus::Active
749 && entry.native_instance_key.as_deref() == Some(native_instance_key)
750 && entry_matches_root(entry, &canonical)
751 }))
752 }
753
754 pub fn log_context_query(&self, session_id: &str, query: ContextQueryEntry) -> Result<()> {
758 let _lock = self.write_lock()?;
759 let path = self.entry_path(session_id)?;
760 if let Some(mut entry) = self.load_entry_from_path(&path)?
761 && entry.status == AgentStatus::Active
762 {
763 entry.context_queries.push(query);
764 self.write_entry_file(&entry)?;
765 }
766 Ok(())
767 }
768
769 pub fn delete(&self, session_id: &str) -> Result<()> {
771 let path = self.entry_path(session_id)?;
772 if path.exists() {
773 std::fs::remove_file(path)?;
774 }
775 Ok(())
776 }
777}
778
779pub fn generate_agent_id() -> String {
784 let random_bytes: [u8; 12] = rand::random();
785 format!(
786 "agent-{}",
787 base32::encode(base32::Alphabet::Rfc4648 { padding: false }, &random_bytes).to_lowercase()
788 )
789}
790
791fn entry_matches_root(entry: &AgentEntry, canonical: &Path) -> bool {
792 entry
793 .path
794 .as_ref()
795 .map(|p| p.canonicalize().unwrap_or_else(|_| p.clone()) == canonical)
796 .unwrap_or(false)
797}
798
799#[cfg(test)]
800mod tests {
801 use tempfile::TempDir;
802
803 use super::*;
804
805 fn create_registry() -> (TempDir, AgentRegistry) {
806 let temp_dir = TempDir::new().unwrap();
807 let registry = AgentRegistry::new(&temp_dir.path().join(".heddle"));
808 (temp_dir, registry)
809 }
810
811 fn entry(session_id: &str, status: AgentStatus) -> AgentEntry {
812 AgentEntry {
813 session_id: session_id.to_string(),
814 client_instance_id: None,
815 native_actor_key: None,
816 native_parent_actor_key: None,
817 native_instance_key: None,
818 heddle_session_id: None,
819 thread_id: None,
820 thread: format!("agent/{session_id}"),
821 pid: None,
822 boot_id: None,
823 liveness_path: None,
824 heartbeat_at: None,
825 anchor_state: None,
826 anchor_root: None,
827 reservation_token: None,
828 path: None,
829 base_state: "hd-base".to_string(),
830 started_at: Utc::now(),
831 provider: None,
832 model: None,
833 harness: None,
834 thinking_level: None,
835 usage_summary: AgentUsageSummary::default(),
836 last_progress_at: None,
837 report_flush_state: None,
838 attach_reason: None,
839 task_assignment_id: None,
840 attach_precedence: vec![],
841 winning_attach_rule: None,
842 probe_source: None,
843 probe_confidence: None,
844 status,
845 completed_at: None,
846 context_queries: vec![],
847 }
848 }
849
850 #[test]
851 fn list_prunes_stale_completed_entries() {
852 let (_temp, registry) = create_registry();
853 let mut stale = entry("agent-stale", AgentStatus::Complete);
854 stale.completed_at = Some(Utc::now() - chrono::Duration::days(8));
855 let active = entry("agent-active", AgentStatus::Active);
856
857 registry.save(&stale).unwrap();
858 registry.save(&active).unwrap();
859
860 let entries = registry.list().unwrap();
861 assert_eq!(entries.len(), 1);
862 assert_eq!(entries[0].session_id, "agent-active");
863 assert!(registry.load("agent-stale").unwrap().is_none());
864 }
865
866 #[test]
867 fn load_returns_none_for_stale_completed_entry() {
868 let (_temp, registry) = create_registry();
869 let mut stale = entry("agent-stale", AgentStatus::Merged);
870 stale.completed_at = Some(Utc::now() - chrono::Duration::days(8));
871 registry.save(&stale).unwrap();
872
873 assert!(registry.load("agent-stale").unwrap().is_none());
874 }
875
876 #[test]
877 fn log_context_query_appends_to_active_session() {
878 let (_temp, registry) = create_registry();
879 let active = entry("agent-active", AgentStatus::Active);
880 registry.save(&active).unwrap();
881
882 let query = ContextQueryEntry {
883 path: "src/auth/session.rs".to_string(),
884 scope: Some("symbol:validate_token".to_string()),
885 queried_at: Utc::now(),
886 };
887 registry.log_context_query("agent-active", query).unwrap();
888
889 let loaded = registry.load("agent-active").unwrap().unwrap();
890 assert_eq!(loaded.context_queries.len(), 1);
891 assert_eq!(loaded.context_queries[0].path, "src/auth/session.rs");
892 assert_eq!(
893 loaded.context_queries[0].scope.as_deref(),
894 Some("symbol:validate_token")
895 );
896 }
897
898 #[test]
899 fn log_context_query_no_op_for_complete_session() {
900 let (_temp, registry) = create_registry();
901 let mut complete = entry("agent-done", AgentStatus::Complete);
902 complete.completed_at = Some(Utc::now());
903 registry.save(&complete).unwrap();
904
905 let query = ContextQueryEntry {
906 path: "src/lib.rs".to_string(),
907 scope: None,
908 queried_at: Utc::now(),
909 };
910 registry.log_context_query("agent-done", query).unwrap();
911
912 let loaded = registry.load("agent-done").unwrap().unwrap();
913 assert_eq!(loaded.context_queries.len(), 0);
914 }
915
916 #[test]
917 fn find_active_by_path_returns_matching_session() {
918 let (temp, registry) = create_registry();
919 let worktree = temp.path().join("checkout");
920 std::fs::create_dir_all(&worktree).unwrap();
921
922 let mut active = entry("agent-match", AgentStatus::Active);
923 active.path = Some(worktree.clone());
924 registry.save(&active).unwrap();
925
926 let mut other = entry("agent-other", AgentStatus::Active);
927 other.path = Some(temp.path().join("other-checkout"));
928 registry.save(&other).unwrap();
929
930 let found = registry.find_active_by_path(&worktree).unwrap();
931 assert!(found.is_some());
932 assert_eq!(found.unwrap().session_id, "agent-match");
933 }
934
935 #[test]
936 fn create_generated_entry_retries_collisions_under_lock() {
937 let (_temp, registry) = create_registry();
938 registry
939 .save(&entry("agent-existing", AgentStatus::Active))
940 .unwrap();
941
942 let mut ids = vec!["agent-existing".to_string(), "agent-new".to_string()].into_iter();
943 let created = registry
944 .create_generated_entry_with(
945 move || ids.next().unwrap(),
946 |session_id| {
947 let mut entry = entry(session_id, AgentStatus::Active);
948 entry.thread = format!("agent/{session_id}");
949 Ok(entry)
950 },
951 )
952 .unwrap();
953
954 assert_eq!(created.session_id, "agent-new");
955 assert!(registry.load("agent-existing").unwrap().is_some());
956 assert!(registry.load("agent-new").unwrap().is_some());
957 }
958
959 #[test]
973 fn abandoning_active_entry_unblocks_subsequent_reserve_on_same_thread() {
974 let (_temp, registry) = create_registry();
975
976 let outcome = registry
979 .try_reserve_thread("feature/leak-repro", |session_id| {
980 let mut e = entry(session_id, AgentStatus::Active);
981 e.thread = "feature/leak-repro".to_string();
982 e.thread_id = Some("feature/leak-repro".to_string());
983 e.pid = Some(1);
987 e.boot_id = crate::store::liveness::current_boot_id();
988 Ok(e)
989 })
990 .unwrap();
991 let session_id = match outcome {
992 ReserveOutcome::Reserved(entry) => entry.session_id,
993 ReserveOutcome::LiveOwner(_) => panic!("first reserve must succeed"),
994 };
995
996 registry
1002 .update_entry(&session_id, |entry| {
1003 entry.status = AgentStatus::Abandoned;
1004 entry.completed_at = Some(Utc::now());
1005 })
1006 .unwrap();
1007
1008 let next = registry
1012 .try_reserve_thread("feature/leak-repro", |session_id| {
1013 let mut e = entry(session_id, AgentStatus::Active);
1014 e.thread = "feature/leak-repro".to_string();
1015 e.thread_id = Some("feature/leak-repro".to_string());
1016 e.pid = Some(1);
1017 e.boot_id = crate::store::liveness::current_boot_id();
1018 Ok(e)
1019 })
1020 .unwrap();
1021 assert!(
1022 matches!(next, ReserveOutcome::Reserved(_)),
1023 "after the orphaned reservation is abandoned, the next reserve must succeed: {next:?}"
1024 );
1025
1026 let active_count = registry
1030 .list()
1031 .unwrap()
1032 .into_iter()
1033 .filter(|e| e.thread == "feature/leak-repro" && e.status == AgentStatus::Active)
1034 .count();
1035 assert_eq!(
1036 active_count, 1,
1037 "exactly one Active reservation must own the thread after rollback + retry"
1038 );
1039 }
1040
1041 #[test]
1042 fn update_entry_persists_harness_metadata() {
1043 let (_temp, registry) = create_registry();
1044 let active = entry("agent-active", AgentStatus::Active);
1045 registry.save(&active).unwrap();
1046
1047 registry
1048 .update_entry("agent-active", |entry| {
1049 entry.heddle_session_id = Some("sess-123".to_string());
1050 entry.harness = Some("claude-code".to_string());
1051 entry.thinking_level = Some("deep".to_string());
1052 entry.report_flush_state = Some("pending-local".to_string());
1053 entry.attach_reason = Some("attached from test metadata update".to_string());
1054 entry.attach_precedence = vec!["matched-current-session".to_string()];
1055 entry.winning_attach_rule = Some("matched-current-session".to_string());
1056 entry.probe_source = Some("argv_env".to_string());
1057 entry.probe_confidence = Some(0.75);
1058 entry.last_progress_at = Some(Utc::now());
1059 entry.usage_summary.input_tokens = Some(42);
1060 })
1061 .unwrap();
1062
1063 let loaded = registry.load("agent-active").unwrap().unwrap();
1064 assert_eq!(loaded.heddle_session_id.as_deref(), Some("sess-123"));
1065 assert_eq!(loaded.harness.as_deref(), Some("claude-code"));
1066 assert_eq!(loaded.thinking_level.as_deref(), Some("deep"));
1067 assert_eq!(loaded.report_flush_state.as_deref(), Some("pending-local"));
1068 assert_eq!(
1069 loaded.attach_reason.as_deref(),
1070 Some("attached from test metadata update")
1071 );
1072 assert_eq!(loaded.attach_precedence, vec!["matched-current-session"]);
1073 assert_eq!(
1074 loaded.winning_attach_rule.as_deref(),
1075 Some("matched-current-session")
1076 );
1077 assert_eq!(loaded.probe_source.as_deref(), Some("argv_env"));
1078 assert_eq!(loaded.probe_confidence, Some(0.75));
1079 assert_eq!(loaded.usage_summary.input_tokens, Some(42));
1080 assert!(loaded.last_progress_at.is_some());
1081 }
1082
1083 #[test]
1084 fn find_active_by_client_instance_id_returns_matching_session() {
1085 let (_temp, registry) = create_registry();
1086 let mut active = entry("agent-client", AgentStatus::Active);
1087 active.client_instance_id = Some("client-a".to_string());
1088 registry.save(&active).unwrap();
1089
1090 let mut other = entry("agent-other", AgentStatus::Active);
1091 other.client_instance_id = Some("client-b".to_string());
1092 registry.save(&other).unwrap();
1093
1094 let found = registry
1095 .find_active_by_client_instance_id("client-a")
1096 .unwrap()
1097 .unwrap();
1098 assert_eq!(found.session_id, "agent-client");
1099 }
1100
1101 #[test]
1102 fn find_active_by_native_actor_key_returns_matching_session() {
1103 let (_temp, registry) = create_registry();
1104 let mut active = entry("agent-native", AgentStatus::Active);
1105 active.native_actor_key = Some("codex:thread:thr_123".to_string());
1106 registry.save(&active).unwrap();
1107
1108 let found = registry
1109 .find_active_by_native_actor_key("codex:thread:thr_123")
1110 .unwrap()
1111 .unwrap();
1112 assert_eq!(found.session_id, "agent-native");
1113 }
1114
1115 #[test]
1116 fn actor_chain_follows_native_parent_keys_root_to_leaf() {
1117 let (_temp, registry) = create_registry();
1118 let mut root = entry("agent-root", AgentStatus::Active);
1119 root.native_actor_key = Some("human:foo".to_string());
1120 root.provider = Some("human".to_string());
1121
1122 let mut parent = entry("agent-parent", AgentStatus::Active);
1123 parent.native_actor_key = Some("codex:thread:parent".to_string());
1124 parent.native_parent_actor_key = Some("human:foo".to_string());
1125 parent.provider = Some("openai".to_string());
1126 parent.model = Some("gpt-5".to_string());
1127
1128 let mut child = entry("agent-child", AgentStatus::Active);
1129 child.native_actor_key = Some("codex:thread:child".to_string());
1130 child.native_parent_actor_key = Some("codex:thread:parent".to_string());
1131 child.provider = Some("openai".to_string());
1132 child.model = Some("gpt-5-mini".to_string());
1133
1134 registry.save(&child).unwrap();
1135 registry.save(&root).unwrap();
1136 registry.save(&parent).unwrap();
1137
1138 let chain = registry.actor_chain_for_session("agent-child").unwrap();
1139 let ids: Vec<_> = chain.iter().map(|node| node.session_id.as_str()).collect();
1140 assert_eq!(ids, vec!["agent-root", "agent-parent", "agent-child"]);
1141 assert_eq!(
1142 chain[2].native_parent_actor_key.as_deref(),
1143 Some("codex:thread:parent")
1144 );
1145 }
1146
1147 #[test]
1148 fn try_reserve_thread_reaps_dead_active_entry_and_succeeds() {
1149 let (_temp, registry) = create_registry();
1150 let mut dead = entry("agent-dead", AgentStatus::Active);
1151 dead.thread = "feature/race".to_string();
1152 dead.pid = Some(0x7fff_ffff);
1155 dead.boot_id = Some("not-the-current-boot".to_string());
1156 registry.save(&dead).unwrap();
1157
1158 let outcome = registry
1159 .try_reserve_thread("feature/race", |session_id| {
1160 let mut new = entry(session_id, AgentStatus::Active);
1161 new.thread = "feature/race".to_string();
1162 new.pid = Some(std::process::id());
1163 new.boot_id = crate::store::liveness::current_boot_id();
1164 Ok(new)
1165 })
1166 .unwrap();
1167
1168 match outcome {
1169 ReserveOutcome::Reserved(entry) => assert_ne!(entry.session_id, "agent-dead"),
1170 ReserveOutcome::LiveOwner(_) => panic!("dead owner should have been reaped"),
1171 }
1172 let abandoned = registry.load("agent-dead").unwrap().unwrap();
1173 assert_eq!(abandoned.status, AgentStatus::Abandoned);
1174 assert!(abandoned.completed_at.is_some());
1175 }
1176
1177 #[test]
1178 fn try_reserve_thread_reports_live_owner_when_pid_is_alive() {
1179 let (_temp, registry) = create_registry();
1180 let mut alive = entry("agent-alive", AgentStatus::Active);
1181 alive.thread = "feature/busy".to_string();
1182 alive.pid = Some(std::process::id());
1183 alive.boot_id = crate::store::liveness::current_boot_id();
1184 registry.save(&alive).unwrap();
1185
1186 let outcome = registry
1187 .try_reserve_thread("feature/busy", |session_id| {
1188 let mut new = entry(session_id, AgentStatus::Active);
1189 new.thread = "feature/busy".to_string();
1190 Ok(new)
1191 })
1192 .unwrap();
1193
1194 match outcome {
1195 ReserveOutcome::Reserved(_) => panic!("live owner should have blocked reservation"),
1196 ReserveOutcome::LiveOwner(existing) => assert_eq!(existing.session_id, "agent-alive"),
1197 }
1198 let still_alive = registry.load("agent-alive").unwrap().unwrap();
1199 assert_eq!(still_alive.status, AgentStatus::Active);
1200 }
1201
1202 #[test]
1203 fn reap_dead_for_thread_only_touches_named_thread() {
1204 let (_temp, registry) = create_registry();
1205 let mut dead_a = entry("agent-dead-a", AgentStatus::Active);
1206 dead_a.thread = "feature/a".to_string();
1207 dead_a.pid = Some(0x7fff_ffff);
1208 dead_a.boot_id = Some("stale".to_string());
1209 let mut dead_b = entry("agent-dead-b", AgentStatus::Active);
1210 dead_b.thread = "feature/b".to_string();
1211 dead_b.pid = Some(0x7fff_ffff);
1212 dead_b.boot_id = Some("stale".to_string());
1213 registry.save(&dead_a).unwrap();
1214 registry.save(&dead_b).unwrap();
1215
1216 let reaped = registry.reap_dead_for_thread("feature/a").unwrap();
1217 assert_eq!(reaped, 1);
1218 assert_eq!(
1219 registry.load("agent-dead-a").unwrap().unwrap().status,
1220 AgentStatus::Abandoned
1221 );
1222 assert_eq!(
1223 registry.load("agent-dead-b").unwrap().unwrap().status,
1224 AgentStatus::Active,
1225 "untargeted thread should not be reaped"
1226 );
1227
1228 let reaped_all = registry.reap_dead().unwrap();
1229 assert_eq!(reaped_all, 1);
1230 assert_eq!(
1231 registry.load("agent-dead-b").unwrap().unwrap().status,
1232 AgentStatus::Abandoned
1233 );
1234 }
1235
1236 #[test]
1237 fn actor_chain_breaks_cycles_without_looping() {
1238 let (_temp, registry) = create_registry();
1239 let mut a = entry("agent-a", AgentStatus::Active);
1240 a.native_actor_key = Some("actor:a".to_string());
1241 a.native_parent_actor_key = Some("actor:b".to_string());
1242 let mut b = entry("agent-b", AgentStatus::Active);
1243 b.native_actor_key = Some("actor:b".to_string());
1244 b.native_parent_actor_key = Some("actor:a".to_string());
1245 registry.save(&a).unwrap();
1246 registry.save(&b).unwrap();
1247
1248 let chain = registry.actor_chain_for_session("agent-a").unwrap();
1249 assert_eq!(chain.len(), 2);
1250 assert_eq!(chain.last().unwrap().session_id, "agent-a");
1251 }
1252
1253 #[test]
1264 #[ignore = "soak: 100× concurrent reservation race"]
1265 fn try_reserve_thread_under_concurrent_load_is_race_free() {
1266 use std::sync::{Arc, Barrier};
1267
1268 const ROUNDS: usize = 100;
1269 const RACERS: usize = 8;
1270
1271 for round in 0..ROUNDS {
1272 let (_temp, registry) = create_registry();
1276 let registry = Arc::new(registry);
1277 let barrier = Arc::new(Barrier::new(RACERS));
1278 let thread_name = format!("feature/race-{round}");
1279
1280 let handles: Vec<_> = (0..RACERS)
1281 .map(|racer_idx| {
1282 let registry = Arc::clone(®istry);
1283 let barrier = Arc::clone(&barrier);
1284 let thread_name = thread_name.clone();
1285 std::thread::spawn(move || {
1286 barrier.wait();
1291 let outcome = registry.try_reserve_thread(&thread_name, |session_id| {
1292 let mut entry = entry(
1293 &format!("agent-{racer_idx}-{session_id}"),
1294 AgentStatus::Active,
1295 );
1296 entry.thread = thread_name.clone();
1297 entry.pid = Some(std::process::id());
1298 entry.boot_id = crate::store::liveness::current_boot_id();
1299 Ok(entry)
1300 });
1301 outcome.expect("reservation call must not error")
1302 })
1303 })
1304 .collect();
1305
1306 let outcomes: Vec<ReserveOutcome> = handles
1307 .into_iter()
1308 .map(|h| h.join().expect("racer panic"))
1309 .collect();
1310
1311 let reserved_count = outcomes
1312 .iter()
1313 .filter(|o| matches!(o, ReserveOutcome::Reserved(_)))
1314 .count();
1315 let live_owner_count = outcomes
1316 .iter()
1317 .filter(|o| matches!(o, ReserveOutcome::LiveOwner(_)))
1318 .count();
1319
1320 assert_eq!(
1321 reserved_count, 1,
1322 "round {round}: exactly one racer must win the reservation; got {reserved_count}"
1323 );
1324 assert_eq!(
1325 live_owner_count,
1326 RACERS - 1,
1327 "round {round}: every loser must get a LiveOwner outcome; \
1328 reserved={reserved_count} live_owner={live_owner_count}"
1329 );
1330
1331 let winner_session = outcomes
1336 .iter()
1337 .find_map(|o| match o {
1338 ReserveOutcome::Reserved(entry) => Some(entry.session_id.clone()),
1339 _ => None,
1340 })
1341 .expect("a winner must exist");
1342 for outcome in &outcomes {
1343 if let ReserveOutcome::LiveOwner(existing) = outcome {
1344 assert_eq!(
1345 existing.session_id, winner_session,
1346 "round {round}: LiveOwner conflicts must point at the actual winner; \
1347 got {} expected {winner_session}",
1348 existing.session_id
1349 );
1350 }
1351 }
1352 }
1353 }
1354}