use std::cell::RefCell;
use std::rc::Rc;
use serde_json::Value;
use crate::value::VmClosure;
thread_local! {
static PIPELINE_ON_FINISH: RefCell<Option<Rc<VmClosure>>> = const { RefCell::new(None) };
static LIFECYCLE_AUDIT_LOG: RefCell<Vec<LifecycleAuditEntry>> = const { RefCell::new(Vec::new()) };
static PARTIAL_HANDOFF_REGISTRY: RefCell<Vec<PartialHandoffEnvelope>> = const { RefCell::new(Vec::new()) };
static LIFECYCLE_SEQ: RefCell<u64> = const { RefCell::new(0) };
}
pub fn set_pipeline_on_finish(callback: Rc<VmClosure>) {
PIPELINE_ON_FINISH.with(|slot| *slot.borrow_mut() = Some(callback));
}
pub fn take_pipeline_on_finish() -> Option<Rc<VmClosure>> {
PIPELINE_ON_FINISH.with(|slot| slot.borrow_mut().take())
}
pub fn clear_pipeline_on_finish() {
PIPELINE_ON_FINISH.with(|slot| *slot.borrow_mut() = None);
LIFECYCLE_AUDIT_LOG.with(|log| log.borrow_mut().clear());
PARTIAL_HANDOFF_REGISTRY.with(|reg| reg.borrow_mut().clear());
LIFECYCLE_SEQ.with(|seq| *seq.borrow_mut() = 0);
}
#[derive(Debug, Default, Clone)]
pub struct UnsettledStateSnapshot {
pub suspended_subagents: Vec<Value>,
pub queued_triggers: Vec<Value>,
pub partial_handoffs: Vec<Value>,
pub in_flight_llm_calls: Vec<Value>,
}
impl UnsettledStateSnapshot {
pub fn is_empty(&self) -> bool {
self.suspended_subagents.is_empty()
&& self.queued_triggers.is_empty()
&& self.partial_handoffs.is_empty()
&& self.in_flight_llm_calls.is_empty()
}
pub fn to_json(&self) -> Value {
serde_json::json!({
"suspended_subagents": self.suspended_subagents,
"queued_triggers": self.queued_triggers,
"partial_handoffs": self.partial_handoffs,
"in_flight_llm_calls": self.in_flight_llm_calls,
})
}
pub fn counts_json(&self) -> Value {
serde_json::json!({
"suspended": self.suspended_subagents.len(),
"queued": self.queued_triggers.len(),
"partial": self.partial_handoffs.len(),
"in_flight": self.in_flight_llm_calls.len(),
})
}
pub fn summary(&self) -> String {
let suspended = self.suspended_subagents.len();
let queued = self.queued_triggers.len();
let partial = self.partial_handoffs.len();
let in_flight = self.in_flight_llm_calls.len();
if suspended == 0 && queued == 0 && partial == 0 && in_flight == 0 {
"no unsettled work".to_string()
} else {
format!(
"unsettled work: {suspended} suspended subagents, {queued} queued triggers, {partial} partial handoffs, {in_flight} in-flight llm calls"
)
}
}
}
pub fn unsettled_state_snapshot() -> UnsettledStateSnapshot {
UnsettledStateSnapshot {
suspended_subagents: crate::stdlib::agents::snapshot_suspended_subagents(),
queued_triggers: Vec::new(),
partial_handoffs: partial_handoff_snapshot_json(),
in_flight_llm_calls: crate::llm::snapshot_in_flight_llm_calls(),
}
}
#[derive(Debug, Clone)]
pub struct LifecycleAuditEntry {
pub seq: u64,
pub kind: String,
pub payload: Value,
pub pipeline_id: Option<String>,
}
impl LifecycleAuditEntry {
pub fn to_json(&self) -> Value {
serde_json::json!({
"seq": self.seq,
"kind": self.kind,
"payload": self.payload,
"pipeline_id": self.pipeline_id,
})
}
}
pub fn record_lifecycle_audit(kind: impl Into<String>, payload: Value) -> LifecycleAuditEntry {
let entry = LifecycleAuditEntry {
seq: next_seq(),
kind: kind.into(),
payload,
pipeline_id: crate::orchestration::current_mutation_session()
.and_then(|session| session.run_id.or(Some(session.session_id))),
};
LIFECYCLE_AUDIT_LOG.with(|log| log.borrow_mut().push(entry.clone()));
entry
}
pub fn take_lifecycle_audit_log() -> Vec<LifecycleAuditEntry> {
LIFECYCLE_AUDIT_LOG.with(|log| std::mem::take(&mut *log.borrow_mut()))
}
pub fn lifecycle_audit_log_snapshot() -> Vec<LifecycleAuditEntry> {
LIFECYCLE_AUDIT_LOG.with(|log| log.borrow().clone())
}
#[derive(Debug, Clone)]
pub struct PartialHandoffEnvelope {
pub envelope_id: String,
pub target_pipeline: String,
pub origin_pipeline: Option<String>,
pub payload: Value,
pub seq: u64,
}
impl PartialHandoffEnvelope {
pub fn to_json(&self) -> Value {
serde_json::json!({
"envelope_id": self.envelope_id,
"target_pipeline": self.target_pipeline,
"origin_pipeline": self.origin_pipeline,
"payload": self.payload,
"seq": self.seq,
})
}
}
pub fn record_partial_handoff(
target_pipeline: impl Into<String>,
payload: Value,
) -> PartialHandoffEnvelope {
let seq = next_seq();
let envelope = PartialHandoffEnvelope {
envelope_id: format!("envelope_{seq}"),
target_pipeline: target_pipeline.into(),
origin_pipeline: crate::orchestration::current_mutation_session()
.and_then(|session| session.run_id.or(Some(session.session_id))),
payload,
seq,
};
PARTIAL_HANDOFF_REGISTRY.with(|reg| reg.borrow_mut().push(envelope.clone()));
envelope
}
fn partial_handoff_snapshot_json() -> Vec<Value> {
PARTIAL_HANDOFF_REGISTRY.with(|reg| reg.borrow().iter().map(|e| e.to_json()).collect())
}
fn next_seq() -> u64 {
LIFECYCLE_SEQ.with(|seq| {
let mut slot = seq.borrow_mut();
*slot += 1;
*slot
})
}