use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use harn_vm::agent_events::{AgentEvent, AgentEventSink, WildcardSinkHandle, WorkerEvent};
pub(crate) const SUBAGENT_THREAD_ID_BASE: u64 = 100;
#[derive(Clone, Debug)]
pub(crate) struct SubagentThread {
pub(crate) thread_id: u64,
pub(crate) name: String,
pub(crate) parent_worker_id: Option<String>,
pub(crate) last_status: String,
pub(crate) suspend_reason: Option<String>,
pub(crate) exited: bool,
}
#[derive(Clone, Debug)]
pub(crate) struct SubagentObservation {
pub(crate) worker_id: String,
pub(crate) worker_name: String,
pub(crate) event: WorkerEvent,
pub(crate) status: String,
pub(crate) parent_worker_id: Option<String>,
pub(crate) suspend_reason: Option<String>,
}
#[derive(Debug, Default)]
pub(crate) struct SubagentTrackerInner {
pub(crate) pending: Vec<SubagentObservation>,
pub(crate) threads: BTreeMap<String, SubagentThread>,
pub(crate) next_thread_id: u64,
}
impl SubagentTrackerInner {
fn new() -> Self {
Self {
pending: Vec::new(),
threads: BTreeMap::new(),
next_thread_id: SUBAGENT_THREAD_ID_BASE,
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct SubagentTracker {
inner: Arc<Mutex<SubagentTrackerInner>>,
}
impl SubagentTracker {
pub(crate) fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(SubagentTrackerInner::new())),
}
}
pub(crate) fn drain(&self) -> Vec<SubagentObservation> {
let mut guard = self.inner.lock().expect("subagent tracker poisoned");
std::mem::take(&mut guard.pending)
}
pub(crate) fn upsert_thread(
&self,
worker_id: &str,
worker_name: &str,
parent_worker_id: Option<&str>,
status: &str,
) -> SubagentThread {
let mut guard = self.inner.lock().expect("subagent tracker poisoned");
if let Some(existing) = guard.threads.get_mut(worker_id) {
existing.last_status = status.to_string();
if !worker_name.is_empty() {
existing.name = worker_name.to_string();
}
if parent_worker_id.is_some() {
existing.parent_worker_id = parent_worker_id.map(str::to_string);
}
existing.clone()
} else {
let id = guard.next_thread_id;
guard.next_thread_id += 1;
let record = SubagentThread {
thread_id: id,
name: worker_name.to_string(),
parent_worker_id: parent_worker_id.map(str::to_string),
last_status: status.to_string(),
suspend_reason: None,
exited: false,
};
guard.threads.insert(worker_id.to_string(), record.clone());
record
}
}
pub(crate) fn mark_suspended(&self, worker_id: &str, reason: Option<&str>) {
let mut guard = self.inner.lock().expect("subagent tracker poisoned");
if let Some(thread) = guard.threads.get_mut(worker_id) {
thread.last_status = "suspended".to_string();
thread.suspend_reason = reason.map(str::to_string);
}
}
pub(crate) fn mark_resumed(&self, worker_id: &str) {
let mut guard = self.inner.lock().expect("subagent tracker poisoned");
if let Some(thread) = guard.threads.get_mut(worker_id) {
thread.last_status = "running".to_string();
thread.suspend_reason = None;
}
}
pub(crate) fn mark_exited(&self, worker_id: &str) {
let mut guard = self.inner.lock().expect("subagent tracker poisoned");
if let Some(thread) = guard.threads.get_mut(worker_id) {
thread.exited = true;
thread.suspend_reason = None;
}
}
pub(crate) fn snapshot_threads(&self) -> Vec<(String, SubagentThread)> {
let mut guard = self.inner.lock().expect("subagent tracker poisoned");
let snapshot: Vec<(String, SubagentThread)> = guard
.threads
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
guard.threads.retain(|_, t| !t.exited);
snapshot
}
#[allow(dead_code)]
pub(crate) fn thread_id_for_worker(&self, worker_id: &str) -> Option<u64> {
let guard = self.inner.lock().expect("subagent tracker poisoned");
guard.threads.get(worker_id).map(|t| t.thread_id)
}
#[cfg(test)]
pub(crate) fn pending_len(&self) -> usize {
self.inner.lock().unwrap().pending.len()
}
#[cfg(test)]
pub(crate) fn inner_for_test(&self) -> std::sync::MutexGuard<'_, SubagentTrackerInner> {
self.inner.lock().expect("subagent tracker poisoned")
}
}
pub(crate) struct SubagentEventSink {
tracker: SubagentTracker,
}
impl SubagentEventSink {
pub(crate) fn new(tracker: SubagentTracker) -> Self {
Self { tracker }
}
}
impl AgentEventSink for SubagentEventSink {
fn handle_event(&self, event: &AgentEvent) {
let AgentEvent::WorkerUpdate {
worker_id,
worker_name,
event,
status,
metadata,
..
} = event
else {
return;
};
let parent_worker_id = metadata
.get("parent_worker_id")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(str::to_string);
let suspend_reason = metadata
.get("suspension")
.and_then(|v| v.get("reason"))
.and_then(|v| v.as_str())
.map(str::to_string);
let observation = SubagentObservation {
worker_id: worker_id.clone(),
worker_name: worker_name.clone(),
event: *event,
status: status.clone(),
parent_worker_id,
suspend_reason,
};
let mut guard = self
.tracker
.inner
.lock()
.expect("subagent tracker poisoned");
guard.pending.push(observation);
}
}
pub(crate) struct SubagentSinkRegistration {
handle: WildcardSinkHandle,
}
impl SubagentSinkRegistration {
pub(crate) fn install(tracker: SubagentTracker) -> Self {
let sink: Arc<dyn AgentEventSink> = Arc::new(SubagentEventSink::new(tracker));
let handle = harn_vm::agent_events::register_wildcard_sink(sink);
Self { handle }
}
}
impl Drop for SubagentSinkRegistration {
fn drop(&mut self) {
harn_vm::agent_events::unregister_wildcard_sink(self.handle);
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn make_update(
worker_id: &str,
worker_name: &str,
event: WorkerEvent,
parent: Option<&str>,
suspend_reason: Option<&str>,
) -> AgentEvent {
let mut metadata = serde_json::Map::new();
if let Some(p) = parent {
metadata.insert("parent_worker_id".to_string(), json!(p));
}
if let Some(r) = suspend_reason {
metadata.insert("suspension".to_string(), json!({ "reason": r }));
}
AgentEvent::WorkerUpdate {
session_id: "session-test".to_string(),
worker_id: worker_id.to_string(),
worker_name: worker_name.to_string(),
worker_task: "task".to_string(),
worker_mode: "mode".to_string(),
event,
status: event.as_status().to_string(),
metadata: serde_json::Value::Object(metadata),
audit: None,
}
}
#[allow(dead_code)]
fn _construct_with_mode_field_compiles() {
let _ = make_update("w", "n", WorkerEvent::WorkerSpawned, None, None);
}
#[test]
fn tracker_drain_returns_pending_then_clears() {
let tracker = SubagentTracker::new();
let sink = SubagentEventSink::new(tracker.clone());
sink.handle_event(&make_update(
"w1",
"alpha",
WorkerEvent::WorkerSpawned,
None,
None,
));
sink.handle_event(&make_update(
"w1",
"alpha",
WorkerEvent::WorkerSuspended,
None,
Some("budget"),
));
assert_eq!(tracker.pending_len(), 2);
let drained = tracker.drain();
assert_eq!(drained.len(), 2);
assert_eq!(tracker.pending_len(), 0);
assert_eq!(drained[0].event, WorkerEvent::WorkerSpawned);
assert_eq!(drained[1].event, WorkerEvent::WorkerSuspended);
assert_eq!(drained[1].suspend_reason.as_deref(), Some("budget"));
}
#[test]
fn upsert_thread_allocates_unique_ids_per_worker() {
let tracker = SubagentTracker::new();
let a = tracker.upsert_thread("w1", "alpha", None, "running");
let b = tracker.upsert_thread("w2", "beta", Some("w1"), "running");
assert_ne!(a.thread_id, b.thread_id);
assert!(a.thread_id >= SUBAGENT_THREAD_ID_BASE);
assert!(b.thread_id >= SUBAGENT_THREAD_ID_BASE);
assert_eq!(b.parent_worker_id.as_deref(), Some("w1"));
let a2 = tracker.upsert_thread("w1", "alpha", None, "suspended");
assert_eq!(a.thread_id, a2.thread_id);
assert_eq!(a2.last_status, "suspended");
}
#[test]
fn mark_resumed_clears_suspend_reason() {
let tracker = SubagentTracker::new();
tracker.upsert_thread("w1", "alpha", None, "running");
tracker.mark_suspended("w1", Some("backoff"));
let snapshot = tracker.snapshot_threads();
assert_eq!(snapshot[0].1.suspend_reason.as_deref(), Some("backoff"));
tracker.mark_resumed("w1");
let snapshot = tracker.snapshot_threads();
assert_eq!(snapshot[0].1.suspend_reason, None);
assert_eq!(snapshot[0].1.last_status, "running");
}
#[test]
fn snapshot_threads_sweeps_exited_after_emission() {
let tracker = SubagentTracker::new();
tracker.upsert_thread("w1", "alpha", None, "running");
tracker.upsert_thread("w2", "beta", None, "running");
tracker.mark_exited("w1");
let first = tracker.snapshot_threads();
assert_eq!(first.len(), 2);
let second = tracker.snapshot_threads();
assert_eq!(second.len(), 1, "exited row swept after first snapshot");
assert_eq!(second[0].1.name, "beta");
}
#[test]
fn thread_id_for_worker_resolves_parent_lineage() {
let tracker = SubagentTracker::new();
let parent = tracker.upsert_thread("w1", "alpha", None, "running");
let child = tracker.upsert_thread("w2", "beta", Some("w1"), "running");
assert_eq!(tracker.thread_id_for_worker("w1"), Some(parent.thread_id));
assert_eq!(tracker.thread_id_for_worker("w2"), Some(child.thread_id));
assert_eq!(tracker.thread_id_for_worker("nonexistent"), None);
}
#[test]
fn sink_skips_non_worker_events() {
let tracker = SubagentTracker::new();
let sink = SubagentEventSink::new(tracker.clone());
sink.handle_event(&AgentEvent::TurnStart {
session_id: "s".to_string(),
iteration: 1,
});
assert_eq!(tracker.pending_len(), 0);
}
}