1use std::cell::RefCell;
20use std::collections::{BTreeMap, BTreeSet};
21use std::rc::Rc;
22
23use serde_json::Value;
24
25use crate::event_log::{EventLog, LogEvent, Topic};
26use crate::value::VmClosure;
27
28pub const LIFECYCLE_AUDIT_TOPIC: &str = "pipeline.lifecycle.audit";
29
30thread_local! {
31 static PIPELINE_ON_FINISH: RefCell<Option<Rc<VmClosure>>> = const { RefCell::new(None) };
32 static LIFECYCLE_AUDIT_LOG: RefCell<Vec<LifecycleAuditEntry>> = const { RefCell::new(Vec::new()) };
33 static PARTIAL_HANDOFF_REGISTRY: RefCell<Vec<PartialHandoffEnvelope>> = const { RefCell::new(Vec::new()) };
34 static PIPELINE_DISPOSITION: RefCell<Option<Value>> = const { RefCell::new(None) };
35 static LIFECYCLE_SEQ: RefCell<u64> = const { RefCell::new(0) };
36}
37
38pub fn set_pipeline_on_finish(callback: Rc<VmClosure>) {
41 PIPELINE_ON_FINISH.with(|slot| *slot.borrow_mut() = Some(callback));
42}
43
44pub fn take_pipeline_on_finish() -> Option<Rc<VmClosure>> {
47 PIPELINE_ON_FINISH.with(|slot| slot.borrow_mut().take())
48}
49
50pub fn clear_pipeline_on_finish() {
57 PIPELINE_ON_FINISH.with(|slot| *slot.borrow_mut() = None);
58 LIFECYCLE_AUDIT_LOG.with(|log| log.borrow_mut().clear());
59 PARTIAL_HANDOFF_REGISTRY.with(|reg| reg.borrow_mut().clear());
60 PIPELINE_DISPOSITION.with(|slot| *slot.borrow_mut() = None);
61 LIFECYCLE_SEQ.with(|seq| *seq.borrow_mut() = 0);
62}
63
64#[derive(Debug, Default, Clone)]
71pub struct UnsettledStateSnapshot {
72 pub suspended_subagents: Vec<Value>,
73 pub queued_triggers: Vec<Value>,
74 pub partial_handoffs: Vec<Value>,
75 pub in_flight_llm_calls: Vec<Value>,
76 pub pool_pending_tasks: Vec<Value>,
77}
78
79impl UnsettledStateSnapshot {
80 pub fn is_empty(&self) -> bool {
81 self.suspended_subagents.is_empty()
82 && self.queued_triggers.is_empty()
83 && self.partial_handoffs.is_empty()
84 && self.in_flight_llm_calls.is_empty()
85 && self.pool_pending_tasks.is_empty()
86 }
87
88 pub fn to_json(&self) -> Value {
89 serde_json::json!({
90 "suspended_subagents": self.suspended_subagents,
91 "queued_triggers": self.queued_triggers,
92 "partial_handoffs": self.partial_handoffs,
93 "in_flight_llm_calls": self.in_flight_llm_calls,
94 "pool_pending_tasks": self.pool_pending_tasks,
95 })
96 }
97
98 pub fn counts_json(&self) -> Value {
99 serde_json::json!({
100 "suspended": self.suspended_subagents.len(),
101 "queued": self.queued_triggers.len(),
102 "partial": self.partial_handoffs.len(),
103 "in_flight": self.in_flight_llm_calls.len(),
104 "pool_pending": self.pool_pending_tasks.len(),
105 })
106 }
107
108 pub fn summary(&self) -> String {
109 let suspended = self.suspended_subagents.len();
110 let queued = self.queued_triggers.len();
111 let partial = self.partial_handoffs.len();
112 let in_flight = self.in_flight_llm_calls.len();
113 let pool_pending = self.pool_pending_tasks.len();
114 if suspended == 0 && queued == 0 && partial == 0 && in_flight == 0 && pool_pending == 0 {
115 "no unsettled work".to_string()
116 } else {
117 format!(
118 "unsettled work: {suspended} suspended subagents, {queued} queued triggers, {partial} partial handoffs, {in_flight} in-flight llm calls, {pool_pending} pool pending tasks"
119 )
120 }
121 }
122}
123
124pub fn unsettled_state_snapshot() -> UnsettledStateSnapshot {
129 unsettled_state_snapshot_base(Vec::new())
130}
131
132pub async fn unsettled_state_snapshot_async() -> UnsettledStateSnapshot {
135 unsettled_state_snapshot_base(queued_trigger_snapshot_json().await)
136}
137
138fn unsettled_state_snapshot_base(queued_triggers: Vec<Value>) -> UnsettledStateSnapshot {
139 UnsettledStateSnapshot {
140 suspended_subagents: crate::stdlib::agents::snapshot_suspended_subagents(),
141 queued_triggers,
142 partial_handoffs: partial_handoff_snapshot_json(),
143 in_flight_llm_calls: crate::llm::snapshot_in_flight_llm_calls(),
144 pool_pending_tasks: crate::stdlib::pool::snapshot_pending_tasks(),
145 }
146}
147
148#[derive(Debug, Clone)]
152pub struct LifecycleAuditEntry {
153 pub seq: u64,
154 pub kind: String,
155 pub payload: Value,
156 pub pipeline_id: Option<String>,
157}
158
159impl LifecycleAuditEntry {
160 pub fn to_json(&self) -> Value {
161 serde_json::json!({
162 "seq": self.seq,
163 "kind": self.kind,
164 "payload": self.payload,
165 "pipeline_id": self.pipeline_id,
166 })
167 }
168}
169
170pub fn record_lifecycle_audit(kind: impl Into<String>, payload: Value) -> LifecycleAuditEntry {
173 let entry = LifecycleAuditEntry {
174 seq: next_seq(),
175 kind: kind.into(),
176 payload,
177 pipeline_id: crate::orchestration::current_mutation_session()
178 .and_then(|session| session.run_id.or(Some(session.session_id))),
179 };
180 LIFECYCLE_AUDIT_LOG.with(|log| log.borrow_mut().push(entry.clone()));
181 persist_lifecycle_audit_entry(&entry);
182 entry
183}
184
185pub fn take_lifecycle_audit_log() -> Vec<LifecycleAuditEntry> {
188 LIFECYCLE_AUDIT_LOG.with(|log| std::mem::take(&mut *log.borrow_mut()))
189}
190
191pub fn lifecycle_audit_log_snapshot() -> Vec<LifecycleAuditEntry> {
193 LIFECYCLE_AUDIT_LOG.with(|log| log.borrow().clone())
194}
195
196#[derive(Debug, Clone)]
200pub struct PartialHandoffEnvelope {
201 pub envelope_id: String,
202 pub target_pipeline: String,
203 pub origin_pipeline: Option<String>,
204 pub payload: Value,
205 pub seq: u64,
206 pub queued_at_ms: i64,
207}
208
209impl PartialHandoffEnvelope {
210 pub fn to_json(&self) -> Value {
211 self.to_json_at(crate::stdlib::clock::now_wall_ms())
212 }
213
214 pub fn to_json_at(&self, now_ms: i64) -> Value {
215 serde_json::json!({
216 "envelope_id": self.envelope_id,
217 "from": self.origin_pipeline,
218 "to": self.target_pipeline,
219 "payload_summary": payload_summary(&self.payload),
220 "queued_at_ms": self.queued_at_ms,
221 "age_ms": now_ms.saturating_sub(self.queued_at_ms).max(0),
222 "target_pipeline": self.target_pipeline,
223 "origin_pipeline": self.origin_pipeline,
224 "payload": self.payload,
225 "seq": self.seq,
226 })
227 }
228}
229
230pub fn record_partial_handoff(
234 target_pipeline: impl Into<String>,
235 payload: Value,
236) -> PartialHandoffEnvelope {
237 let seq = next_seq();
238 let envelope = PartialHandoffEnvelope {
239 envelope_id: format!("envelope_{seq}"),
240 target_pipeline: target_pipeline.into(),
241 origin_pipeline: crate::orchestration::current_mutation_session()
242 .and_then(|session| session.run_id.or(Some(session.session_id))),
243 payload,
244 seq,
245 queued_at_ms: crate::stdlib::clock::now_wall_ms(),
246 };
247 PARTIAL_HANDOFF_REGISTRY.with(|reg| reg.borrow_mut().push(envelope.clone()));
248 envelope
249}
250
251pub fn acknowledge_partial_handoff(
254 envelope_id: &str,
255 decision: Value,
256) -> Option<PartialHandoffEnvelope> {
257 let removed = PARTIAL_HANDOFF_REGISTRY.with(|reg| {
258 let mut reg = reg.borrow_mut();
259 let index = reg
260 .iter()
261 .position(|entry| entry.envelope_id == envelope_id)?;
262 Some(reg.remove(index))
263 })?;
264 record_lifecycle_audit(
265 "handoff_acknowledged",
266 serde_json::json!({
267 "envelope_id": envelope_id,
268 "decision": decision,
269 }),
270 );
271 Some(removed)
272}
273
274pub fn finalize_pipeline_disposition(disposition: Value) -> Value {
275 PIPELINE_DISPOSITION.with(|slot| *slot.borrow_mut() = Some(disposition.clone()));
276 let entry = record_lifecycle_audit(
277 "pipeline_finalized",
278 serde_json::json!({
279 "disposition": disposition,
280 }),
281 );
282 serde_json::json!({
283 "status": "finalized",
284 "method": "finalize",
285 "entry": entry.to_json(),
286 })
287}
288
289pub fn pipeline_disposition_snapshot() -> Option<Value> {
290 PIPELINE_DISPOSITION.with(|slot| slot.borrow().clone())
291}
292
293fn partial_handoff_snapshot_json() -> Vec<Value> {
294 let now_ms = crate::stdlib::clock::now_wall_ms();
295 PARTIAL_HANDOFF_REGISTRY.with(|reg| reg.borrow().iter().map(|e| e.to_json_at(now_ms)).collect())
296}
297
298fn next_seq() -> u64 {
299 LIFECYCLE_SEQ.with(|seq| {
300 let mut slot = seq.borrow_mut();
301 *slot += 1;
302 *slot
303 })
304}
305
306fn persist_lifecycle_audit_entry(entry: &LifecycleAuditEntry) {
307 let Some(log) = crate::event_log::active_event_log() else {
308 return;
309 };
310 let Ok(topic) = Topic::new(LIFECYCLE_AUDIT_TOPIC) else {
311 return;
312 };
313 let mut headers = BTreeMap::new();
314 headers.insert("kind".to_string(), entry.kind.clone());
315 headers.insert("seq".to_string(), entry.seq.to_string());
316 if let Some(pipeline_id) = entry.pipeline_id.as_ref() {
317 headers.insert("pipeline_id".to_string(), pipeline_id.clone());
318 }
319 let _ = futures::executor::block_on(log.append(
320 &topic,
321 LogEvent::new("lifecycle_audit", entry.to_json()).with_headers(headers),
322 ));
323}
324
325async fn queued_trigger_snapshot_json() -> Vec<Value> {
326 let Some(log) = crate::event_log::active_event_log() else {
327 return Vec::new();
328 };
329 let now_ms = lifecycle_now_ms();
330 let mut out = Vec::new();
331 out.extend(snapshot_inbox_triggers(log.as_ref(), now_ms).await);
332 out.extend(snapshot_worker_queue_triggers(log, now_ms).await);
333 out.sort_by(|left, right| {
334 let left_key = (
335 left.get("queued_at_ms")
336 .and_then(Value::as_i64)
337 .unwrap_or(i64::MAX),
338 left.get("id").and_then(Value::as_str).unwrap_or_default(),
339 );
340 let right_key = (
341 right
342 .get("queued_at_ms")
343 .and_then(Value::as_i64)
344 .unwrap_or(i64::MAX),
345 right.get("id").and_then(Value::as_str).unwrap_or_default(),
346 );
347 left_key.cmp(&right_key)
348 });
349 out
350}
351
352fn lifecycle_now_ms() -> i64 {
353 crate::clock_mock::now_ms()
354}
355
356async fn snapshot_inbox_triggers(log: &crate::event_log::AnyEventLog, now_ms: i64) -> Vec<Value> {
357 let Ok(inbox_topic) = Topic::new(crate::triggers::TRIGGER_INBOX_ENVELOPES_TOPIC) else {
358 return Vec::new();
359 };
360 let Ok(outbox_topic) = Topic::new(crate::triggers::TRIGGER_OUTBOX_TOPIC) else {
361 return Vec::new();
362 };
363 let Ok(cancel_topic) = Topic::new(crate::triggers::TRIGGER_CANCEL_REQUESTS_TOPIC) else {
364 return Vec::new();
365 };
366 let inbox = log
367 .read_range(&inbox_topic, None, usize::MAX)
368 .await
369 .unwrap_or_default();
370 let outbox = log
371 .read_range(&outbox_topic, None, usize::MAX)
372 .await
373 .unwrap_or_default();
374 let cancels = log
375 .read_range(&cancel_topic, None, usize::MAX)
376 .await
377 .unwrap_or_default();
378
379 let completed_events = outbox
380 .into_iter()
381 .filter_map(|(_, event)| {
382 let event_id = event
383 .headers
384 .get("event_id")
385 .cloned()
386 .or_else(|| json_string(&event.payload, &["event_id"]))?;
387 let binding_key = event
388 .headers
389 .get("binding_key")
390 .cloned()
391 .or_else(|| json_string(&event.payload, &["binding_key"]))
392 .unwrap_or_default();
393 Some((binding_key, event_id))
394 })
395 .collect::<BTreeSet<_>>();
396 let cancelled_events = cancels
397 .into_iter()
398 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
399 .filter_map(|(_, event)| {
400 let event_id = json_string(&event.payload, &["event_id"])?;
401 let binding_key = json_string(&event.payload, &["binding_key"]).unwrap_or_default();
402 Some((binding_key, event_id))
403 })
404 .collect::<BTreeSet<_>>();
405
406 inbox
407 .into_iter()
408 .filter(|(_, event)| event.kind == "event_ingested")
409 .filter_map(|(_, event)| {
410 let trigger = event
411 .payload
412 .get("event")
413 .cloned()
414 .unwrap_or_else(|| event.payload.clone());
415 let event_id = json_string(&trigger, &["id"])?;
416 let binding_key = event
417 .headers
418 .get("binding_key")
419 .cloned()
420 .or_else(|| {
421 let trigger_id = event
422 .headers
423 .get("trigger_id")
424 .cloned()
425 .or_else(|| json_string(&event.payload, &["trigger_id"]))?;
426 let version = json_u64(&event.payload, &["binding_version"])?;
427 Some(format!("{trigger_id}@v{version}"))
428 });
429 if event_is_settled(&completed_events, binding_key.as_deref(), &event_id)
430 || event_is_settled(&cancelled_events, binding_key.as_deref(), &event_id)
431 {
432 return None;
433 }
434 let trigger_id = event
435 .headers
436 .get("trigger_id")
437 .cloned()
438 .or_else(|| json_string(&event.payload, &["trigger_id"]));
439 let queued_at_ms = event.occurred_at_ms;
440 let provider = json_string(&trigger, &["provider"]).unwrap_or_default();
441 let kind = json_string(&trigger, &["kind"]).unwrap_or_default();
442 let id = binding_key
443 .as_ref()
444 .map(|key| format!("trigger://{key}/{event_id}"))
445 .unwrap_or_else(|| format!("trigger://{event_id}"));
446 Some(serde_json::json!({
447 "id": id,
448 "event_id": event_id,
449 "trigger_id": trigger_id,
450 "binding_key": binding_key,
451 "spec_summary": trigger_spec_summary(provider.as_str(), kind.as_str(), trigger_id.as_deref()),
452 "queued_at_ms": queued_at_ms,
453 "age_ms": now_ms.saturating_sub(queued_at_ms).max(0),
454 "source": "trigger_inbox",
455 }))
456 })
457 .collect()
458}
459
460fn event_is_settled(
461 settled_events: &BTreeSet<(String, String)>,
462 binding_key: Option<&str>,
463 event_id: &str,
464) -> bool {
465 let scoped_key = binding_key.unwrap_or_default().to_string();
466 settled_events.contains(&(scoped_key, event_id.to_string()))
467 || settled_events.contains(&(String::new(), event_id.to_string()))
468}
469
470async fn snapshot_worker_queue_triggers(
471 log: std::sync::Arc<crate::event_log::AnyEventLog>,
472 now_ms: i64,
473) -> Vec<Value> {
474 let queue = crate::triggers::WorkerQueue::new(log);
475 let Ok(queues) = queue.known_queues().await else {
476 return Vec::new();
477 };
478 let mut out = Vec::new();
479 for queue_name in queues {
480 let Ok(state) = queue.queue_state(&queue_name).await else {
481 continue;
482 };
483 let queue_label = state.queue.clone();
484 for job in state.jobs {
485 if job.acked || job.purged {
486 continue;
487 }
488 let event_id = job.job.event.id.0.clone();
489 let id = format!("worker://{}/{}", queue_label, job.job_event_id);
490 out.push(serde_json::json!({
491 "id": id,
492 "event_id": event_id,
493 "trigger_id": job.job.trigger_id,
494 "binding_key": job.job.binding_key,
495 "spec_summary": trigger_spec_summary(
496 job.job.event.provider.0.as_str(),
497 job.job.event.kind.as_str(),
498 Some(job.job.trigger_id.as_str())
499 ),
500 "queued_at_ms": job.enqueued_at_ms,
501 "age_ms": now_ms.saturating_sub(job.enqueued_at_ms).max(0),
502 "source": "worker_queue",
503 "queue": queue_label.clone(),
504 "job_event_id": job.job_event_id,
505 "claimed": job.active_claim.is_some(),
506 }));
507 }
508 }
509 out
510}
511
512fn trigger_spec_summary(provider: &str, kind: &str, trigger_id: Option<&str>) -> String {
513 match (
514 trigger_id.filter(|id| !id.is_empty()),
515 provider.is_empty(),
516 kind.is_empty(),
517 ) {
518 (Some(id), false, false) => format!("{id}: {provider}.{kind}"),
519 (Some(id), _, _) => id.to_string(),
520 (None, false, false) => format!("{provider}.{kind}"),
521 (None, false, true) => provider.to_string(),
522 (None, true, false) => kind.to_string(),
523 (None, true, true) => "trigger event".to_string(),
524 }
525}
526
527fn json_string(value: &Value, path: &[&str]) -> Option<String> {
528 let mut cursor = value;
529 for key in path {
530 cursor = cursor.get(*key)?;
531 }
532 cursor.as_str().map(ToString::to_string)
533}
534
535fn json_u64(value: &Value, path: &[&str]) -> Option<u64> {
536 let mut cursor = value;
537 for key in path {
538 cursor = cursor.get(*key)?;
539 }
540 cursor.as_u64()
541}
542
543fn payload_summary(payload: &Value) -> String {
544 match payload {
545 Value::Null => "nil".to_string(),
546 Value::Bool(value) => value.to_string(),
547 Value::Number(value) => value.to_string(),
548 Value::String(value) => {
549 let mut chars = value.chars();
550 let preview: String = chars.by_ref().take(80).collect();
551 if chars.next().is_some() {
552 format!("{preview}...")
553 } else {
554 preview
555 }
556 }
557 Value::Array(items) => format!("list(len={})", items.len()),
558 Value::Object(map) => {
559 let keys = map.keys().take(6).cloned().collect::<Vec<_>>().join(",");
560 if map.len() > 6 {
561 format!("object(keys={keys},...)")
562 } else {
563 format!("object(keys={keys})")
564 }
565 }
566 }
567}