1use std::{
9 collections::{HashMap, HashSet},
10 path::{Path, PathBuf},
11};
12
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15
16use crate::{
17 lock::RepoLock,
18 store::{
19 HeddleError, Result,
20 atomic::write_file_atomic,
21 liveness::{Liveness, is_owner_alive},
22 },
23};
24
25const STALE_AGENT_TTL_DAYS: i64 = 7;
26
27#[derive(Debug)]
35pub enum ReserveOutcome {
36 Reserved(AgentEntry),
38 LiveOwner(AgentEntry),
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ContextQueryEntry {
45 pub path: String,
47 pub scope: Option<String>,
49 pub queried_at: DateTime<Utc>,
51}
52
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct AgentUsageSummary {
55 #[serde(default)]
56 pub input_tokens: Option<u64>,
57 #[serde(default)]
58 pub output_tokens: Option<u64>,
59 #[serde(default)]
60 pub reasoning_tokens: Option<u64>,
61 #[serde(default)]
62 pub tool_calls: Option<u32>,
63 #[serde(default)]
64 pub cost_micros_usd: Option<u64>,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum AgentStatus {
71 Active,
73 Abandoned,
75 Complete,
77 Merged,
79}
80
81impl std::fmt::Display for AgentStatus {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 match self {
84 AgentStatus::Active => write!(f, "active"),
85 AgentStatus::Abandoned => write!(f, "abandoned"),
86 AgentStatus::Complete => write!(f, "complete"),
87 AgentStatus::Merged => write!(f, "merged"),
88 }
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct AgentEntry {
95 pub session_id: String,
97 #[serde(default)]
100 pub client_instance_id: Option<String>,
101 #[serde(default)]
103 pub native_actor_key: Option<String>,
104 #[serde(default)]
106 pub native_parent_actor_key: Option<String>,
107 #[serde(default)]
109 pub native_instance_key: Option<String>,
110 #[serde(default)]
113 pub heddle_session_id: Option<String>,
114 #[serde(default)]
116 pub thread_id: Option<String>,
117 pub thread: String,
119 #[serde(default)]
121 pub pid: Option<u32>,
122 #[serde(default)]
124 pub boot_id: Option<String>,
125 #[serde(default)]
127 pub liveness_path: Option<PathBuf>,
128 #[serde(default)]
130 pub heartbeat_at: Option<DateTime<Utc>>,
131 #[serde(default)]
133 pub anchor_state: Option<String>,
134 #[serde(default)]
136 pub anchor_root: Option<String>,
137 #[serde(default)]
139 pub reservation_token: Option<String>,
140 #[serde(default)]
142 pub path: Option<PathBuf>,
143 pub base_state: String,
145 pub started_at: DateTime<Utc>,
147 pub provider: Option<String>,
149 pub model: Option<String>,
151 #[serde(default)]
153 pub harness: Option<String>,
154 #[serde(default)]
156 pub thinking_level: Option<String>,
157 #[serde(default)]
159 pub usage_summary: AgentUsageSummary,
160 #[serde(default)]
162 pub last_progress_at: Option<DateTime<Utc>>,
163 #[serde(default)]
165 pub report_flush_state: Option<String>,
166 #[serde(default)]
169 pub attach_reason: Option<String>,
170 #[serde(default)]
172 pub attach_precedence: Vec<String>,
173 #[serde(default)]
175 pub winning_attach_rule: Option<String>,
176 #[serde(default)]
178 pub probe_source: Option<String>,
179 #[serde(default)]
181 pub probe_confidence: Option<f32>,
182 pub status: AgentStatus,
184 #[serde(default)]
186 pub completed_at: Option<DateTime<Utc>>,
187 #[serde(default)]
190 pub context_queries: Vec<ContextQueryEntry>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub struct ActorChainNode {
196 pub session_id: String,
197 #[serde(default)]
198 pub native_actor_key: Option<String>,
199 #[serde(default)]
200 pub native_parent_actor_key: Option<String>,
201 pub thread: String,
202 pub status: AgentStatus,
203 #[serde(default)]
204 pub provider: Option<String>,
205 #[serde(default)]
206 pub model: Option<String>,
207 #[serde(default)]
208 pub harness: Option<String>,
209}
210
211impl From<&AgentEntry> for ActorChainNode {
212 fn from(entry: &AgentEntry) -> Self {
213 Self {
214 session_id: entry.session_id.clone(),
215 native_actor_key: entry.native_actor_key.clone(),
216 native_parent_actor_key: entry.native_parent_actor_key.clone(),
217 thread: entry.thread.clone(),
218 status: entry.status.clone(),
219 provider: entry.provider.clone(),
220 model: entry.model.clone(),
221 harness: entry.harness.clone(),
222 }
223 }
224}
225
226pub struct AgentRegistry {
228 agents_dir: PathBuf,
229}
230
231impl AgentRegistry {
232 pub fn new(heddle_dir: &Path) -> Self {
234 Self {
235 agents_dir: heddle_dir.join("agents"),
236 }
237 }
238
239 fn entry_path(&self, session_id: &str) -> Result<PathBuf> {
240 if session_id.is_empty()
244 || !session_id
245 .bytes()
246 .all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
247 {
248 return Err(HeddleError::Config(format!(
249 "invalid session ID '{}': only lowercase alphanumeric and hyphens allowed",
250 session_id
251 )));
252 }
253 Ok(self.agents_dir.join(format!("{}.toml", session_id)))
254 }
255
256 fn lock_path(&self) -> PathBuf {
257 self.agents_dir.join(".lock")
258 }
259
260 fn write_lock(&self) -> Result<crate::lock::WriteLockGuard> {
261 RepoLock::at(self.lock_path()).write().map_err(|err| {
262 HeddleError::Config(format!("failed to acquire agent registry lock: {err}"))
263 })
264 }
265
266 fn write_entry_file(&self, entry: &AgentEntry) -> Result<()> {
267 std::fs::create_dir_all(&self.agents_dir)?;
268 let path = self.entry_path(&entry.session_id)?;
269 let content =
270 toml::to_string_pretty(entry).map_err(|e| HeddleError::Config(e.to_string()))?;
271 Ok(write_file_atomic(&path, content.as_bytes())?)
272 }
273
274 fn load_entry_from_path(&self, path: &Path) -> Result<Option<AgentEntry>> {
275 if !path.exists() {
276 return Ok(None);
277 }
278
279 let content = std::fs::read_to_string(path)?;
280 let entry = toml::from_str(&content).map_err(|e| HeddleError::Config(e.to_string()))?;
281 Ok(Some(entry))
282 }
283
284 fn is_stale_terminal_entry(&self, entry: &AgentEntry) -> bool {
285 if matches!(entry.status, AgentStatus::Active) {
286 return false;
287 }
288
289 let terminal_at = entry.completed_at.unwrap_or(entry.started_at);
290 terminal_at <= Utc::now() - chrono::Duration::days(STALE_AGENT_TTL_DAYS)
291 }
292
293 fn prune_stale_entry_path(&self, path: &Path) -> Result<()> {
294 if path.exists() {
295 std::fs::remove_file(path)?;
296 }
297 Ok(())
298 }
299
300 pub fn liveness_for(entry: &AgentEntry) -> Liveness {
304 if entry.status != AgentStatus::Active {
305 return Liveness::Dead;
306 }
307 is_owner_alive(entry.pid, entry.boot_id.as_deref())
308 }
309
310 fn abandon_active_entry(&self, mut entry: AgentEntry) -> Result<AgentEntry> {
313 entry.status = AgentStatus::Abandoned;
314 entry.completed_at = Some(Utc::now());
315 self.write_entry_file(&entry)?;
316 Ok(entry)
317 }
318
319 fn reap_dead_locked(&self, thread_filter: Option<&str>) -> Result<usize> {
324 if !self.agents_dir.exists() {
325 return Ok(0);
326 }
327 let mut reaped = 0usize;
328 for dir_entry in std::fs::read_dir(&self.agents_dir)? {
329 let dir_entry = dir_entry?;
330 let path = dir_entry.path();
331 if !path.extension().map(|e| e == "toml").unwrap_or(false) {
332 continue;
333 }
334 let Some(entry) = self.load_entry_from_path(&path)? else {
335 continue;
336 };
337 if entry.status != AgentStatus::Active {
338 continue;
339 }
340 if let Some(name) = thread_filter
341 && entry.thread != name
342 {
343 continue;
344 }
345 if Self::liveness_for(&entry) == Liveness::Dead {
346 self.abandon_active_entry(entry)?;
347 reaped += 1;
348 }
349 }
350 Ok(reaped)
351 }
352
353 pub fn reap_dead_for_thread(&self, thread: &str) -> Result<usize> {
361 let _lock = self.write_lock()?;
362 self.reap_dead_locked(Some(thread))
363 }
364
365 pub fn reap_dead(&self) -> Result<usize> {
367 let _lock = self.write_lock()?;
368 self.reap_dead_locked(None)
369 }
370
371 fn create_generated_entry_with<F, G>(
372 &self,
373 mut generate_id: G,
374 mut build_entry: F,
375 ) -> Result<AgentEntry>
376 where
377 F: FnMut(&str) -> Result<AgentEntry>,
378 G: FnMut() -> String,
379 {
380 let _lock = self.write_lock()?;
381
382 loop {
383 let session_id = generate_id();
384 let path = self.entry_path(&session_id)?;
385 if path.exists() {
386 continue;
387 }
388
389 let entry = build_entry(&session_id)?;
390 self.write_entry_file(&entry)?;
391 return Ok(entry);
392 }
393 }
394
395 pub fn create_generated_entry<F>(&self, build_entry: F) -> Result<AgentEntry>
397 where
398 F: FnMut(&str) -> Result<AgentEntry>,
399 {
400 self.create_generated_entry_with(generate_agent_id, build_entry)
401 }
402
403 pub fn save(&self, entry: &AgentEntry) -> Result<()> {
408 let _lock = self.write_lock()?;
409 self.write_entry_file(entry)
410 }
411
412 pub fn load(&self, session_id: &str) -> Result<Option<AgentEntry>> {
414 let path = self.entry_path(session_id)?;
415 let Some(entry) = self.load_entry_from_path(&path)? else {
416 return Ok(None);
417 };
418
419 if self.is_stale_terminal_entry(&entry) {
420 let _lock = self.write_lock()?;
421 if let Some(latest) = self.load_entry_from_path(&path)?
422 && self.is_stale_terminal_entry(&latest)
423 {
424 self.prune_stale_entry_path(&path)?;
425 return Ok(None);
426 }
427 }
428
429 Ok(Some(entry))
430 }
431
432 pub fn list(&self) -> Result<Vec<AgentEntry>> {
434 if !self.agents_dir.exists() {
435 return Ok(Vec::new());
436 }
437
438 let mut stale_paths = Vec::new();
439 let mut entries = Vec::new();
440 for dir_entry in std::fs::read_dir(&self.agents_dir)? {
441 let dir_entry = dir_entry?;
442 let path = dir_entry.path();
443 if path.extension().map(|e| e == "toml").unwrap_or(false)
444 && let Ok(content) = std::fs::read_to_string(&path)
445 && let Ok(entry) = toml::from_str::<AgentEntry>(&content)
446 {
447 if self.is_stale_terminal_entry(&entry) {
448 stale_paths.push(path);
449 } else {
450 entries.push(entry);
451 }
452 }
453 }
454
455 if !stale_paths.is_empty() {
456 let _lock = self.write_lock()?;
457 for path in stale_paths {
458 if let Some(entry) = self.load_entry_from_path(&path)?
459 && self.is_stale_terminal_entry(&entry)
460 {
461 self.prune_stale_entry_path(&path)?;
462 }
463 }
464 }
465
466 entries.sort_by_key(|a| std::cmp::Reverse(a.started_at));
467 Ok(entries)
468 }
469
470 pub fn update_status(&self, session_id: &str, status: AgentStatus) -> Result<()> {
472 let _lock = self.write_lock()?;
473 let path = self.entry_path(session_id)?;
474 if let Some(mut entry) = self.load_entry_from_path(&path)? {
475 entry.status = status;
476 entry.completed_at = match entry.status {
477 AgentStatus::Active => None,
478 AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
479 Some(Utc::now())
480 }
481 };
482 self.write_entry_file(&entry)?;
483 }
484 Ok(())
485 }
486
487 pub fn update_entry<F>(&self, session_id: &str, mut update: F) -> Result<Option<AgentEntry>>
489 where
490 F: FnMut(&mut AgentEntry),
491 {
492 let _lock = self.write_lock()?;
493 let path = self.entry_path(session_id)?;
494 let Some(mut entry) = self.load_entry_from_path(&path)? else {
495 return Ok(None);
496 };
497 update(&mut entry);
498 self.write_entry_file(&entry)?;
499 Ok(Some(entry))
500 }
501
502 pub fn find_or_create_active_entry<FMatch, FUpdate, FBuild>(
505 &self,
506 mut matches: FMatch,
507 mut update_existing: FUpdate,
508 mut build_entry: FBuild,
509 ) -> Result<(AgentEntry, bool)>
510 where
511 FMatch: FnMut(&AgentEntry) -> bool,
512 FUpdate: FnMut(&mut AgentEntry),
513 FBuild: FnMut(&str) -> Result<AgentEntry>,
514 {
515 let _lock = self.write_lock()?;
516 std::fs::create_dir_all(&self.agents_dir)?;
517
518 for dir_entry in std::fs::read_dir(&self.agents_dir)? {
519 let dir_entry = dir_entry?;
520 let path = dir_entry.path();
521 if !path.extension().map(|e| e == "toml").unwrap_or(false) {
522 continue;
523 }
524 let Some(mut entry) = self.load_entry_from_path(&path)? else {
525 continue;
526 };
527 if self.is_stale_terminal_entry(&entry) {
528 self.prune_stale_entry_path(&path)?;
529 continue;
530 }
531 if entry.status == AgentStatus::Active && matches(&entry) {
532 update_existing(&mut entry);
533 self.write_entry_file(&entry)?;
534 return Ok((entry, false));
535 }
536 }
537
538 loop {
539 let session_id = generate_agent_id();
540 let path = self.entry_path(&session_id)?;
541 if path.exists() {
542 continue;
543 }
544
545 let entry = build_entry(&session_id)?;
546 self.write_entry_file(&entry)?;
547 return Ok((entry, true));
548 }
549 }
550
551 pub fn try_reserve_thread<F>(&self, thread: &str, build_entry: F) -> Result<ReserveOutcome>
566 where
567 F: FnMut(&str) -> Result<AgentEntry>,
568 {
569 let _lock = self.write_lock()?;
570 std::fs::create_dir_all(&self.agents_dir)?;
571
572 let mut live_owner: Option<AgentEntry> = None;
573 for dir_entry in std::fs::read_dir(&self.agents_dir)? {
574 let dir_entry = dir_entry?;
575 let path = dir_entry.path();
576 if !path.extension().map(|e| e == "toml").unwrap_or(false) {
577 continue;
578 }
579 let Some(entry) = self.load_entry_from_path(&path)? else {
580 continue;
581 };
582 if self.is_stale_terminal_entry(&entry) {
583 self.prune_stale_entry_path(&path)?;
584 continue;
585 }
586 if entry.status != AgentStatus::Active || entry.thread != thread {
587 continue;
588 }
589 match Self::liveness_for(&entry) {
590 Liveness::Dead => {
591 self.abandon_active_entry(entry)?;
592 }
593 Liveness::Alive | Liveness::Unknown => {
594 live_owner = Some(entry);
597 break;
598 }
599 }
600 }
601
602 if let Some(existing) = live_owner {
603 return Ok(ReserveOutcome::LiveOwner(existing));
604 }
605
606 let mut build_entry = build_entry;
607 loop {
608 let session_id = generate_agent_id();
609 let path = self.entry_path(&session_id)?;
610 if path.exists() {
611 continue;
612 }
613
614 let entry = build_entry(&session_id)?;
615 self.write_entry_file(&entry)?;
616 return Ok(ReserveOutcome::Reserved(entry));
617 }
618 }
619
620 pub fn create_generated_entry_for_thread<F>(
626 &self,
627 thread: &str,
628 build_entry: F,
629 ) -> Result<AgentEntry>
630 where
631 F: FnMut(&str) -> Result<AgentEntry>,
632 {
633 match self.try_reserve_thread(thread, build_entry)? {
634 ReserveOutcome::Reserved(entry) => Ok(entry),
635 ReserveOutcome::LiveOwner(existing) => Err(HeddleError::Config(format!(
636 "thread '{}' already has active reservation {}. Use `heddle thread show {}` to inspect it, or release the session before starting another writer.",
637 thread, existing.session_id, thread
638 ))),
639 }
640 }
641
642 pub fn find_active_by_path(&self, worktree_root: &Path) -> Result<Option<AgentEntry>> {
645 let canonical = worktree_root
646 .canonicalize()
647 .unwrap_or_else(|_| worktree_root.to_path_buf());
648 let entries = self.list()?;
649 Ok(entries
650 .into_iter()
651 .find(|e| e.status == AgentStatus::Active && entry_matches_root(e, &canonical)))
652 }
653
654 pub fn find_active_by_heddle_session_id(
656 &self,
657 heddle_session_id: &str,
658 ) -> Result<Option<AgentEntry>> {
659 let entries = self.list()?;
660 Ok(entries.into_iter().find(|entry| {
661 entry.status == AgentStatus::Active
662 && entry.heddle_session_id.as_deref() == Some(heddle_session_id)
663 }))
664 }
665
666 pub fn find_active_by_client_instance_id(
669 &self,
670 client_instance_id: &str,
671 ) -> Result<Option<AgentEntry>> {
672 let entries = self.list()?;
673 Ok(entries.into_iter().find(|entry| {
674 entry.status == AgentStatus::Active
675 && entry.client_instance_id.as_deref() == Some(client_instance_id)
676 }))
677 }
678
679 pub fn find_active_by_native_actor_key(
681 &self,
682 native_actor_key: &str,
683 ) -> Result<Option<AgentEntry>> {
684 let entries = self.list()?;
685 Ok(entries.into_iter().find(|entry| {
686 entry.status == AgentStatus::Active
687 && entry.native_actor_key.as_deref() == Some(native_actor_key)
688 }))
689 }
690
691 pub fn actor_chain_for_session(&self, session_id: &str) -> Result<Vec<ActorChainNode>> {
698 let entries = self.list()?;
699 let by_session: HashMap<&str, &AgentEntry> = entries
700 .iter()
701 .map(|entry| (entry.session_id.as_str(), entry))
702 .collect();
703 let by_native_key: HashMap<&str, &AgentEntry> = entries
704 .iter()
705 .filter_map(|entry| entry.native_actor_key.as_deref().map(|key| (key, entry)))
706 .collect();
707
708 let Some(mut current) = by_session.get(session_id).copied() else {
709 return Ok(Vec::new());
710 };
711 let mut leaf_to_root = vec![ActorChainNode::from(current)];
712 let mut seen = HashSet::from([current.session_id.as_str()]);
713
714 while let Some(parent_key) = current.native_parent_actor_key.as_deref() {
715 let Some(parent) = by_native_key.get(parent_key).copied() else {
716 break;
717 };
718 if !seen.insert(parent.session_id.as_str()) {
719 break;
720 }
721 leaf_to_root.push(ActorChainNode::from(parent));
722 current = parent;
723 }
724
725 leaf_to_root.reverse();
726 Ok(leaf_to_root)
727 }
728
729 pub fn find_active_by_native_instance_key_at_path(
732 &self,
733 native_instance_key: &str,
734 worktree_root: &Path,
735 ) -> Result<Option<AgentEntry>> {
736 let canonical = worktree_root
737 .canonicalize()
738 .unwrap_or_else(|_| worktree_root.to_path_buf());
739 let entries = self.list()?;
740 Ok(entries.into_iter().find(|entry| {
741 entry.status == AgentStatus::Active
742 && entry.native_instance_key.as_deref() == Some(native_instance_key)
743 && entry_matches_root(entry, &canonical)
744 }))
745 }
746
747 pub fn log_context_query(&self, session_id: &str, query: ContextQueryEntry) -> Result<()> {
751 let _lock = self.write_lock()?;
752 let path = self.entry_path(session_id)?;
753 if let Some(mut entry) = self.load_entry_from_path(&path)?
754 && entry.status == AgentStatus::Active
755 {
756 entry.context_queries.push(query);
757 self.write_entry_file(&entry)?;
758 }
759 Ok(())
760 }
761
762 pub fn delete(&self, session_id: &str) -> Result<()> {
764 let path = self.entry_path(session_id)?;
765 if path.exists() {
766 std::fs::remove_file(path)?;
767 }
768 Ok(())
769 }
770}
771
772pub fn generate_agent_id() -> String {
777 let random_bytes: [u8; 12] = rand::random();
778 format!(
779 "agent-{}",
780 base32::encode(base32::Alphabet::Rfc4648 { padding: false }, &random_bytes).to_lowercase()
781 )
782}
783
784fn entry_matches_root(entry: &AgentEntry, canonical: &Path) -> bool {
785 entry
786 .path
787 .as_ref()
788 .map(|p| p.canonicalize().unwrap_or_else(|_| p.clone()) == canonical)
789 .unwrap_or(false)
790}
791
792#[cfg(test)]
793mod tests {
794 use tempfile::TempDir;
795
796 use super::*;
797
798 fn create_registry() -> (TempDir, AgentRegistry) {
799 let temp_dir = TempDir::new().unwrap();
800 let registry = AgentRegistry::new(&temp_dir.path().join(".heddle"));
801 (temp_dir, registry)
802 }
803
804 fn entry(session_id: &str, status: AgentStatus) -> AgentEntry {
805 AgentEntry {
806 session_id: session_id.to_string(),
807 client_instance_id: None,
808 native_actor_key: None,
809 native_parent_actor_key: None,
810 native_instance_key: None,
811 heddle_session_id: None,
812 thread_id: None,
813 thread: format!("agent/{session_id}"),
814 pid: None,
815 boot_id: None,
816 liveness_path: None,
817 heartbeat_at: None,
818 anchor_state: None,
819 anchor_root: None,
820 reservation_token: None,
821 path: None,
822 base_state: "hd-base".to_string(),
823 started_at: Utc::now(),
824 provider: None,
825 model: None,
826 harness: None,
827 thinking_level: None,
828 usage_summary: AgentUsageSummary::default(),
829 last_progress_at: None,
830 report_flush_state: None,
831 attach_reason: None,
832 attach_precedence: vec![],
833 winning_attach_rule: None,
834 probe_source: None,
835 probe_confidence: None,
836 status,
837 completed_at: None,
838 context_queries: vec![],
839 }
840 }
841
842 #[test]
843 fn list_prunes_stale_completed_entries() {
844 let (_temp, registry) = create_registry();
845 let mut stale = entry("agent-stale", AgentStatus::Complete);
846 stale.completed_at = Some(Utc::now() - chrono::Duration::days(8));
847 let active = entry("agent-active", AgentStatus::Active);
848
849 registry.save(&stale).unwrap();
850 registry.save(&active).unwrap();
851
852 let entries = registry.list().unwrap();
853 assert_eq!(entries.len(), 1);
854 assert_eq!(entries[0].session_id, "agent-active");
855 assert!(registry.load("agent-stale").unwrap().is_none());
856 }
857
858 #[test]
859 fn load_returns_none_for_stale_completed_entry() {
860 let (_temp, registry) = create_registry();
861 let mut stale = entry("agent-stale", AgentStatus::Merged);
862 stale.completed_at = Some(Utc::now() - chrono::Duration::days(8));
863 registry.save(&stale).unwrap();
864
865 assert!(registry.load("agent-stale").unwrap().is_none());
866 }
867
868 #[test]
869 fn log_context_query_appends_to_active_session() {
870 let (_temp, registry) = create_registry();
871 let active = entry("agent-active", AgentStatus::Active);
872 registry.save(&active).unwrap();
873
874 let query = ContextQueryEntry {
875 path: "src/auth/session.rs".to_string(),
876 scope: Some("symbol:validate_token".to_string()),
877 queried_at: Utc::now(),
878 };
879 registry.log_context_query("agent-active", query).unwrap();
880
881 let loaded = registry.load("agent-active").unwrap().unwrap();
882 assert_eq!(loaded.context_queries.len(), 1);
883 assert_eq!(loaded.context_queries[0].path, "src/auth/session.rs");
884 assert_eq!(
885 loaded.context_queries[0].scope.as_deref(),
886 Some("symbol:validate_token")
887 );
888 }
889
890 #[test]
891 fn log_context_query_no_op_for_complete_session() {
892 let (_temp, registry) = create_registry();
893 let mut complete = entry("agent-done", AgentStatus::Complete);
894 complete.completed_at = Some(Utc::now());
895 registry.save(&complete).unwrap();
896
897 let query = ContextQueryEntry {
898 path: "src/lib.rs".to_string(),
899 scope: None,
900 queried_at: Utc::now(),
901 };
902 registry.log_context_query("agent-done", query).unwrap();
903
904 let loaded = registry.load("agent-done").unwrap().unwrap();
905 assert_eq!(loaded.context_queries.len(), 0);
906 }
907
908 #[test]
909 fn find_active_by_path_returns_matching_session() {
910 let (temp, registry) = create_registry();
911 let worktree = temp.path().join("checkout");
912 std::fs::create_dir_all(&worktree).unwrap();
913
914 let mut active = entry("agent-match", AgentStatus::Active);
915 active.path = Some(worktree.clone());
916 registry.save(&active).unwrap();
917
918 let mut other = entry("agent-other", AgentStatus::Active);
919 other.path = Some(temp.path().join("other-checkout"));
920 registry.save(&other).unwrap();
921
922 let found = registry.find_active_by_path(&worktree).unwrap();
923 assert!(found.is_some());
924 assert_eq!(found.unwrap().session_id, "agent-match");
925 }
926
927 #[test]
928 fn create_generated_entry_retries_collisions_under_lock() {
929 let (_temp, registry) = create_registry();
930 registry
931 .save(&entry("agent-existing", AgentStatus::Active))
932 .unwrap();
933
934 let mut ids = vec!["agent-existing".to_string(), "agent-new".to_string()].into_iter();
935 let created = registry
936 .create_generated_entry_with(
937 move || ids.next().unwrap(),
938 |session_id| {
939 let mut entry = entry(session_id, AgentStatus::Active);
940 entry.thread = format!("agent/{session_id}");
941 Ok(entry)
942 },
943 )
944 .unwrap();
945
946 assert_eq!(created.session_id, "agent-new");
947 assert!(registry.load("agent-existing").unwrap().is_some());
948 assert!(registry.load("agent-new").unwrap().is_some());
949 }
950
951 #[test]
965 fn abandoning_active_entry_unblocks_subsequent_reserve_on_same_thread() {
966 let (_temp, registry) = create_registry();
967
968 let outcome = registry
971 .try_reserve_thread("feature/leak-repro", |session_id| {
972 let mut e = entry(session_id, AgentStatus::Active);
973 e.thread = "feature/leak-repro".to_string();
974 e.thread_id = Some("feature/leak-repro".to_string());
975 e.pid = Some(1);
979 e.boot_id = crate::store::liveness::current_boot_id();
980 Ok(e)
981 })
982 .unwrap();
983 let session_id = match outcome {
984 ReserveOutcome::Reserved(entry) => entry.session_id,
985 ReserveOutcome::LiveOwner(_) => panic!("first reserve must succeed"),
986 };
987
988 registry
994 .update_entry(&session_id, |entry| {
995 entry.status = AgentStatus::Abandoned;
996 entry.completed_at = Some(Utc::now());
997 })
998 .unwrap();
999
1000 let next = registry
1004 .try_reserve_thread("feature/leak-repro", |session_id| {
1005 let mut e = entry(session_id, AgentStatus::Active);
1006 e.thread = "feature/leak-repro".to_string();
1007 e.thread_id = Some("feature/leak-repro".to_string());
1008 e.pid = Some(1);
1009 e.boot_id = crate::store::liveness::current_boot_id();
1010 Ok(e)
1011 })
1012 .unwrap();
1013 assert!(
1014 matches!(next, ReserveOutcome::Reserved(_)),
1015 "after the orphaned reservation is abandoned, the next reserve must succeed: {next:?}"
1016 );
1017
1018 let active_count = registry
1022 .list()
1023 .unwrap()
1024 .into_iter()
1025 .filter(|e| e.thread == "feature/leak-repro" && e.status == AgentStatus::Active)
1026 .count();
1027 assert_eq!(
1028 active_count, 1,
1029 "exactly one Active reservation must own the thread after rollback + retry"
1030 );
1031 }
1032
1033 #[test]
1034 fn update_entry_persists_harness_metadata() {
1035 let (_temp, registry) = create_registry();
1036 let active = entry("agent-active", AgentStatus::Active);
1037 registry.save(&active).unwrap();
1038
1039 registry
1040 .update_entry("agent-active", |entry| {
1041 entry.heddle_session_id = Some("sess-123".to_string());
1042 entry.harness = Some("claude-code".to_string());
1043 entry.thinking_level = Some("deep".to_string());
1044 entry.report_flush_state = Some("pending-local".to_string());
1045 entry.attach_reason = Some("attached from test metadata update".to_string());
1046 entry.attach_precedence = vec!["matched-current-session".to_string()];
1047 entry.winning_attach_rule = Some("matched-current-session".to_string());
1048 entry.probe_source = Some("argv_env".to_string());
1049 entry.probe_confidence = Some(0.75);
1050 entry.last_progress_at = Some(Utc::now());
1051 entry.usage_summary.input_tokens = Some(42);
1052 })
1053 .unwrap();
1054
1055 let loaded = registry.load("agent-active").unwrap().unwrap();
1056 assert_eq!(loaded.heddle_session_id.as_deref(), Some("sess-123"));
1057 assert_eq!(loaded.harness.as_deref(), Some("claude-code"));
1058 assert_eq!(loaded.thinking_level.as_deref(), Some("deep"));
1059 assert_eq!(loaded.report_flush_state.as_deref(), Some("pending-local"));
1060 assert_eq!(
1061 loaded.attach_reason.as_deref(),
1062 Some("attached from test metadata update")
1063 );
1064 assert_eq!(loaded.attach_precedence, vec!["matched-current-session"]);
1065 assert_eq!(
1066 loaded.winning_attach_rule.as_deref(),
1067 Some("matched-current-session")
1068 );
1069 assert_eq!(loaded.probe_source.as_deref(), Some("argv_env"));
1070 assert_eq!(loaded.probe_confidence, Some(0.75));
1071 assert_eq!(loaded.usage_summary.input_tokens, Some(42));
1072 assert!(loaded.last_progress_at.is_some());
1073 }
1074
1075 #[test]
1076 fn find_active_by_client_instance_id_returns_matching_session() {
1077 let (_temp, registry) = create_registry();
1078 let mut active = entry("agent-client", AgentStatus::Active);
1079 active.client_instance_id = Some("client-a".to_string());
1080 registry.save(&active).unwrap();
1081
1082 let mut other = entry("agent-other", AgentStatus::Active);
1083 other.client_instance_id = Some("client-b".to_string());
1084 registry.save(&other).unwrap();
1085
1086 let found = registry
1087 .find_active_by_client_instance_id("client-a")
1088 .unwrap()
1089 .unwrap();
1090 assert_eq!(found.session_id, "agent-client");
1091 }
1092
1093 #[test]
1094 fn find_active_by_native_actor_key_returns_matching_session() {
1095 let (_temp, registry) = create_registry();
1096 let mut active = entry("agent-native", AgentStatus::Active);
1097 active.native_actor_key = Some("codex:thread:thr_123".to_string());
1098 registry.save(&active).unwrap();
1099
1100 let found = registry
1101 .find_active_by_native_actor_key("codex:thread:thr_123")
1102 .unwrap()
1103 .unwrap();
1104 assert_eq!(found.session_id, "agent-native");
1105 }
1106
1107 #[test]
1108 fn actor_chain_follows_native_parent_keys_root_to_leaf() {
1109 let (_temp, registry) = create_registry();
1110 let mut root = entry("agent-root", AgentStatus::Active);
1111 root.native_actor_key = Some("human:lukethorne".to_string());
1112 root.provider = Some("human".to_string());
1113
1114 let mut parent = entry("agent-parent", AgentStatus::Active);
1115 parent.native_actor_key = Some("codex:thread:parent".to_string());
1116 parent.native_parent_actor_key = Some("human:lukethorne".to_string());
1117 parent.provider = Some("openai".to_string());
1118 parent.model = Some("gpt-5".to_string());
1119
1120 let mut child = entry("agent-child", AgentStatus::Active);
1121 child.native_actor_key = Some("codex:thread:child".to_string());
1122 child.native_parent_actor_key = Some("codex:thread:parent".to_string());
1123 child.provider = Some("openai".to_string());
1124 child.model = Some("gpt-5-mini".to_string());
1125
1126 registry.save(&child).unwrap();
1127 registry.save(&root).unwrap();
1128 registry.save(&parent).unwrap();
1129
1130 let chain = registry.actor_chain_for_session("agent-child").unwrap();
1131 let ids: Vec<_> = chain.iter().map(|node| node.session_id.as_str()).collect();
1132 assert_eq!(ids, vec!["agent-root", "agent-parent", "agent-child"]);
1133 assert_eq!(
1134 chain[2].native_parent_actor_key.as_deref(),
1135 Some("codex:thread:parent")
1136 );
1137 }
1138
1139 #[test]
1140 fn try_reserve_thread_reaps_dead_active_entry_and_succeeds() {
1141 let (_temp, registry) = create_registry();
1142 let mut dead = entry("agent-dead", AgentStatus::Active);
1143 dead.thread = "feature/race".to_string();
1144 dead.pid = Some(0x7fff_ffff);
1147 dead.boot_id = Some("not-the-current-boot".to_string());
1148 registry.save(&dead).unwrap();
1149
1150 let outcome = registry
1151 .try_reserve_thread("feature/race", |session_id| {
1152 let mut new = entry(session_id, AgentStatus::Active);
1153 new.thread = "feature/race".to_string();
1154 new.pid = Some(std::process::id());
1155 new.boot_id = crate::store::liveness::current_boot_id();
1156 Ok(new)
1157 })
1158 .unwrap();
1159
1160 match outcome {
1161 ReserveOutcome::Reserved(entry) => assert_ne!(entry.session_id, "agent-dead"),
1162 ReserveOutcome::LiveOwner(_) => panic!("dead owner should have been reaped"),
1163 }
1164 let abandoned = registry.load("agent-dead").unwrap().unwrap();
1165 assert_eq!(abandoned.status, AgentStatus::Abandoned);
1166 assert!(abandoned.completed_at.is_some());
1167 }
1168
1169 #[test]
1170 fn try_reserve_thread_reports_live_owner_when_pid_is_alive() {
1171 let (_temp, registry) = create_registry();
1172 let mut alive = entry("agent-alive", AgentStatus::Active);
1173 alive.thread = "feature/busy".to_string();
1174 alive.pid = Some(std::process::id());
1175 alive.boot_id = crate::store::liveness::current_boot_id();
1176 registry.save(&alive).unwrap();
1177
1178 let outcome = registry
1179 .try_reserve_thread("feature/busy", |session_id| {
1180 let mut new = entry(session_id, AgentStatus::Active);
1181 new.thread = "feature/busy".to_string();
1182 Ok(new)
1183 })
1184 .unwrap();
1185
1186 match outcome {
1187 ReserveOutcome::Reserved(_) => panic!("live owner should have blocked reservation"),
1188 ReserveOutcome::LiveOwner(existing) => assert_eq!(existing.session_id, "agent-alive"),
1189 }
1190 let still_alive = registry.load("agent-alive").unwrap().unwrap();
1191 assert_eq!(still_alive.status, AgentStatus::Active);
1192 }
1193
1194 #[test]
1195 fn reap_dead_for_thread_only_touches_named_thread() {
1196 let (_temp, registry) = create_registry();
1197 let mut dead_a = entry("agent-dead-a", AgentStatus::Active);
1198 dead_a.thread = "feature/a".to_string();
1199 dead_a.pid = Some(0x7fff_ffff);
1200 dead_a.boot_id = Some("stale".to_string());
1201 let mut dead_b = entry("agent-dead-b", AgentStatus::Active);
1202 dead_b.thread = "feature/b".to_string();
1203 dead_b.pid = Some(0x7fff_ffff);
1204 dead_b.boot_id = Some("stale".to_string());
1205 registry.save(&dead_a).unwrap();
1206 registry.save(&dead_b).unwrap();
1207
1208 let reaped = registry.reap_dead_for_thread("feature/a").unwrap();
1209 assert_eq!(reaped, 1);
1210 assert_eq!(
1211 registry.load("agent-dead-a").unwrap().unwrap().status,
1212 AgentStatus::Abandoned
1213 );
1214 assert_eq!(
1215 registry.load("agent-dead-b").unwrap().unwrap().status,
1216 AgentStatus::Active,
1217 "untargeted thread should not be reaped"
1218 );
1219
1220 let reaped_all = registry.reap_dead().unwrap();
1221 assert_eq!(reaped_all, 1);
1222 assert_eq!(
1223 registry.load("agent-dead-b").unwrap().unwrap().status,
1224 AgentStatus::Abandoned
1225 );
1226 }
1227
1228 #[test]
1229 fn actor_chain_breaks_cycles_without_looping() {
1230 let (_temp, registry) = create_registry();
1231 let mut a = entry("agent-a", AgentStatus::Active);
1232 a.native_actor_key = Some("actor:a".to_string());
1233 a.native_parent_actor_key = Some("actor:b".to_string());
1234 let mut b = entry("agent-b", AgentStatus::Active);
1235 b.native_actor_key = Some("actor:b".to_string());
1236 b.native_parent_actor_key = Some("actor:a".to_string());
1237 registry.save(&a).unwrap();
1238 registry.save(&b).unwrap();
1239
1240 let chain = registry.actor_chain_for_session("agent-a").unwrap();
1241 assert_eq!(chain.len(), 2);
1242 assert_eq!(chain.last().unwrap().session_id, "agent-a");
1243 }
1244
1245 #[test]
1256 #[ignore = "soak: 100× concurrent reservation race"]
1257 fn try_reserve_thread_under_concurrent_load_is_race_free() {
1258 use std::sync::{Arc, Barrier};
1259
1260 const ROUNDS: usize = 100;
1261 const RACERS: usize = 8;
1262
1263 for round in 0..ROUNDS {
1264 let (_temp, registry) = create_registry();
1268 let registry = Arc::new(registry);
1269 let barrier = Arc::new(Barrier::new(RACERS));
1270 let thread_name = format!("feature/race-{round}");
1271
1272 let handles: Vec<_> = (0..RACERS)
1273 .map(|racer_idx| {
1274 let registry = Arc::clone(®istry);
1275 let barrier = Arc::clone(&barrier);
1276 let thread_name = thread_name.clone();
1277 std::thread::spawn(move || {
1278 barrier.wait();
1283 let outcome = registry.try_reserve_thread(&thread_name, |session_id| {
1284 let mut entry = entry(
1285 &format!("agent-{racer_idx}-{session_id}"),
1286 AgentStatus::Active,
1287 );
1288 entry.thread = thread_name.clone();
1289 entry.pid = Some(std::process::id());
1290 entry.boot_id = crate::store::liveness::current_boot_id();
1291 Ok(entry)
1292 });
1293 outcome.expect("reservation call must not error")
1294 })
1295 })
1296 .collect();
1297
1298 let outcomes: Vec<ReserveOutcome> = handles
1299 .into_iter()
1300 .map(|h| h.join().expect("racer panic"))
1301 .collect();
1302
1303 let reserved_count = outcomes
1304 .iter()
1305 .filter(|o| matches!(o, ReserveOutcome::Reserved(_)))
1306 .count();
1307 let live_owner_count = outcomes
1308 .iter()
1309 .filter(|o| matches!(o, ReserveOutcome::LiveOwner(_)))
1310 .count();
1311
1312 assert_eq!(
1313 reserved_count, 1,
1314 "round {round}: exactly one racer must win the reservation; got {reserved_count}"
1315 );
1316 assert_eq!(
1317 live_owner_count,
1318 RACERS - 1,
1319 "round {round}: every loser must get a LiveOwner outcome; \
1320 reserved={reserved_count} live_owner={live_owner_count}"
1321 );
1322
1323 let winner_session = outcomes
1328 .iter()
1329 .find_map(|o| match o {
1330 ReserveOutcome::Reserved(entry) => Some(entry.session_id.clone()),
1331 _ => None,
1332 })
1333 .expect("a winner must exist");
1334 for outcome in &outcomes {
1335 if let ReserveOutcome::LiveOwner(existing) = outcome {
1336 assert_eq!(
1337 existing.session_id, winner_session,
1338 "round {round}: LiveOwner conflicts must point at the actual winner; \
1339 got {} expected {winner_session}",
1340 existing.session_id
1341 );
1342 }
1343 }
1344 }
1345 }
1346}