Skip to main content

harn_vm/orchestration/
pipeline_lifecycle.rs

1//! Pipeline-finish lifecycle state.
2//!
3//! The pipeline DSL accepts a single `on_finish` callback that runs after the
4//! pipeline's declared steps complete but before the pipeline returns. The
5//! callback receives `(harness, return_value)` and may transform the value.
6//! Storage is a thread-local one-shot slot: `Vm::execute` consumes the
7//! registered closure with `take_pipeline_on_finish` exactly once, so a stale
8//! registration cannot leak across consecutive runs.
9//!
10//! `unsettled_state_snapshot` exposes the pipeline-finish harness view of
11//! work that can outlive the main pipeline body.
12//!
13//! Beyond the snapshot, drain callbacks need two write-side surfaces:
14//! `record_lifecycle_audit` (which `harness.emit_audit` routes to) and
15//! `record_partial_handoff` (which `harness.handoff_to` routes to). Both are
16//! thread-local because pipeline execution is single-threaded per run and we
17//! want deterministic ordering for replay/conformance.
18
19use std::cell::RefCell;
20use std::collections::{BTreeMap, BTreeSet};
21use std::rc::Rc;
22
23use serde_json::Value;
24
25use crate::event_log::{EventLog, LogEvent, Topic};
26use crate::value::VmClosure;
27
28pub const LIFECYCLE_AUDIT_TOPIC: &str = "pipeline.lifecycle.audit";
29
30thread_local! {
31    static PIPELINE_ON_FINISH: RefCell<Option<Rc<VmClosure>>> = const { RefCell::new(None) };
32    static LIFECYCLE_AUDIT_LOG: RefCell<Vec<LifecycleAuditEntry>> = const { RefCell::new(Vec::new()) };
33    static PARTIAL_HANDOFF_REGISTRY: RefCell<Vec<PartialHandoffEnvelope>> = const { RefCell::new(Vec::new()) };
34    static PIPELINE_DISPOSITION: RefCell<Option<Value>> = const { RefCell::new(None) };
35    static LIFECYCLE_SEQ: RefCell<u64> = const { RefCell::new(0) };
36}
37
38/// Register the callback `Vm::execute` will invoke after the pipeline's
39/// declared steps complete. Last-write-wins.
40pub fn set_pipeline_on_finish(callback: Rc<VmClosure>) {
41    PIPELINE_ON_FINISH.with(|slot| *slot.borrow_mut() = Some(callback));
42}
43
44/// Consume the pending callback, leaving the slot empty. Returns `None` when
45/// no callback was registered.
46pub fn take_pipeline_on_finish() -> Option<Rc<VmClosure>> {
47    PIPELINE_ON_FINISH.with(|slot| slot.borrow_mut().take())
48}
49
50/// Drop any pending callback and every captured lifecycle audit entry,
51/// partial-handoff envelope, and seq counter. Called from
52/// `reset_thread_local_state` so test harnesses don't carry registrations
53/// across runs, and from `Vm::execute` on the error exit path so a
54/// failed pipeline doesn't leak in-progress lifecycle state into the
55/// next run.
56pub fn clear_pipeline_on_finish() {
57    PIPELINE_ON_FINISH.with(|slot| *slot.borrow_mut() = None);
58    LIFECYCLE_AUDIT_LOG.with(|log| log.borrow_mut().clear());
59    PARTIAL_HANDOFF_REGISTRY.with(|reg| reg.borrow_mut().clear());
60    PIPELINE_DISPOSITION.with(|slot| *slot.borrow_mut() = None);
61    LIFECYCLE_SEQ.with(|seq| *seq.borrow_mut() = 0);
62}
63
64/// Snapshot of unsettled work that the pipeline `on_finish` harness exposes.
65///
66/// Buckets intentionally stay JSON-shaped at this boundary: each producer
67/// owns its richer Rust types, while callbacks need a stable Harn dict/list
68/// contract. Producers without a durable per-item registry yet return a typed
69/// empty list rather than inventing storage in the lifecycle layer.
70#[derive(Debug, Default, Clone)]
71pub struct UnsettledStateSnapshot {
72    pub suspended_subagents: Vec<Value>,
73    pub queued_triggers: Vec<Value>,
74    pub partial_handoffs: Vec<Value>,
75    pub in_flight_llm_calls: Vec<Value>,
76    pub pool_pending_tasks: Vec<Value>,
77}
78
79impl UnsettledStateSnapshot {
80    pub fn is_empty(&self) -> bool {
81        self.suspended_subagents.is_empty()
82            && self.queued_triggers.is_empty()
83            && self.partial_handoffs.is_empty()
84            && self.in_flight_llm_calls.is_empty()
85            && self.pool_pending_tasks.is_empty()
86    }
87
88    pub fn to_json(&self) -> Value {
89        serde_json::json!({
90            "suspended_subagents": self.suspended_subagents,
91            "queued_triggers": self.queued_triggers,
92            "partial_handoffs": self.partial_handoffs,
93            "in_flight_llm_calls": self.in_flight_llm_calls,
94            "pool_pending_tasks": self.pool_pending_tasks,
95        })
96    }
97
98    pub fn counts_json(&self) -> Value {
99        serde_json::json!({
100            "suspended": self.suspended_subagents.len(),
101            "queued": self.queued_triggers.len(),
102            "partial": self.partial_handoffs.len(),
103            "in_flight": self.in_flight_llm_calls.len(),
104            "pool_pending": self.pool_pending_tasks.len(),
105        })
106    }
107
108    pub fn summary(&self) -> String {
109        let suspended = self.suspended_subagents.len();
110        let queued = self.queued_triggers.len();
111        let partial = self.partial_handoffs.len();
112        let in_flight = self.in_flight_llm_calls.len();
113        let pool_pending = self.pool_pending_tasks.len();
114        if suspended == 0 && queued == 0 && partial == 0 && in_flight == 0 && pool_pending == 0 {
115            "no unsettled work".to_string()
116        } else {
117            format!(
118                "unsettled work: {suspended} suspended subagents, {queued} queued triggers, {partial} partial handoffs, {in_flight} in-flight llm calls, {pool_pending} pool pending tasks"
119            )
120        }
121    }
122}
123
124/// Return the current unsettled-state snapshot. This synchronous variant only
125/// reads in-memory registries; VM lifecycle code should prefer
126/// `unsettled_state_snapshot_async` so event-log-backed trigger queues are
127/// included when available.
128pub fn unsettled_state_snapshot() -> UnsettledStateSnapshot {
129    unsettled_state_snapshot_base(Vec::new())
130}
131
132/// Return the current unsettled-state snapshot, including event-log-backed
133/// trigger queues when an active event log is installed for this thread.
134pub async fn unsettled_state_snapshot_async() -> UnsettledStateSnapshot {
135    unsettled_state_snapshot_base(queued_trigger_snapshot_json().await)
136}
137
138fn unsettled_state_snapshot_base(queued_triggers: Vec<Value>) -> UnsettledStateSnapshot {
139    UnsettledStateSnapshot {
140        suspended_subagents: crate::stdlib::agents::snapshot_suspended_subagents(),
141        queued_triggers,
142        partial_handoffs: partial_handoff_snapshot_json(),
143        in_flight_llm_calls: crate::llm::snapshot_in_flight_llm_calls(),
144        pool_pending_tasks: crate::stdlib::pool::snapshot_pending_tasks(),
145    }
146}
147
148/// One recorded `harness.emit_audit` call. `seq` is a per-pipeline-run
149/// monotonic counter so conformance fixtures and replay can match entries by
150/// shape rather than wall-clock time.
151#[derive(Debug, Clone)]
152pub struct LifecycleAuditEntry {
153    pub seq: u64,
154    pub kind: String,
155    pub payload: Value,
156    pub pipeline_id: Option<String>,
157}
158
159impl LifecycleAuditEntry {
160    pub fn to_json(&self) -> Value {
161        serde_json::json!({
162            "seq": self.seq,
163            "kind": self.kind,
164            "payload": self.payload,
165            "pipeline_id": self.pipeline_id,
166        })
167    }
168}
169
170/// Record an audit entry from a lifecycle callback. Returns the entry's seq
171/// number so the caller can echo it back as a receipt.
172pub fn record_lifecycle_audit(kind: impl Into<String>, payload: Value) -> LifecycleAuditEntry {
173    let entry = LifecycleAuditEntry {
174        seq: next_seq(),
175        kind: kind.into(),
176        payload,
177        pipeline_id: crate::orchestration::current_mutation_session()
178            .and_then(|session| session.run_id.or(Some(session.session_id))),
179    };
180    LIFECYCLE_AUDIT_LOG.with(|log| log.borrow_mut().push(entry.clone()));
181    persist_lifecycle_audit_entry(&entry);
182    entry
183}
184
185/// Drain the audit log, returning all entries recorded since the last drain.
186/// Used by conformance fixtures and replay oracles.
187pub fn take_lifecycle_audit_log() -> Vec<LifecycleAuditEntry> {
188    LIFECYCLE_AUDIT_LOG.with(|log| std::mem::take(&mut *log.borrow_mut()))
189}
190
191/// Non-destructive read of the audit log for introspection.
192pub fn lifecycle_audit_log_snapshot() -> Vec<LifecycleAuditEntry> {
193    LIFECYCLE_AUDIT_LOG.with(|log| log.borrow().clone())
194}
195
196/// One recorded `harness.handoff_to` call. The runtime stores envelopes in
197/// the thread-local registry so a follow-on pipeline (or the conformance
198/// suite) can inspect them via `unsettled_state_snapshot()`.
199#[derive(Debug, Clone)]
200pub struct PartialHandoffEnvelope {
201    pub envelope_id: String,
202    pub target_pipeline: String,
203    pub origin_pipeline: Option<String>,
204    pub payload: Value,
205    pub seq: u64,
206    pub queued_at_ms: i64,
207}
208
209impl PartialHandoffEnvelope {
210    pub fn to_json(&self) -> Value {
211        self.to_json_at(crate::stdlib::clock::now_wall_ms())
212    }
213
214    pub fn to_json_at(&self, now_ms: i64) -> Value {
215        serde_json::json!({
216            "envelope_id": self.envelope_id,
217            "from": self.origin_pipeline,
218            "to": self.target_pipeline,
219            "payload_summary": payload_summary(&self.payload),
220            "queued_at_ms": self.queued_at_ms,
221            "age_ms": now_ms.saturating_sub(self.queued_at_ms).max(0),
222            "target_pipeline": self.target_pipeline,
223            "origin_pipeline": self.origin_pipeline,
224            "payload": self.payload,
225            "seq": self.seq,
226        })
227    }
228}
229
230/// Record a partial-handoff envelope and return the entry. Allocates a
231/// deterministic envelope id derived from the lifecycle seq counter so test
232/// fixtures don't depend on wall-clock time or uuids.
233pub fn record_partial_handoff(
234    target_pipeline: impl Into<String>,
235    payload: Value,
236) -> PartialHandoffEnvelope {
237    let seq = next_seq();
238    let envelope = PartialHandoffEnvelope {
239        envelope_id: format!("envelope_{seq}"),
240        target_pipeline: target_pipeline.into(),
241        origin_pipeline: crate::orchestration::current_mutation_session()
242            .and_then(|session| session.run_id.or(Some(session.session_id))),
243        payload,
244        seq,
245        queued_at_ms: crate::stdlib::clock::now_wall_ms(),
246    };
247    PARTIAL_HANDOFF_REGISTRY.with(|reg| reg.borrow_mut().push(envelope.clone()));
248    envelope
249}
250
251/// Acknowledge a partial handoff envelope by removing it from the unsettled
252/// registry and recording the settlement decision in the lifecycle audit log.
253pub fn acknowledge_partial_handoff(
254    envelope_id: &str,
255    decision: Value,
256) -> Option<PartialHandoffEnvelope> {
257    let removed = PARTIAL_HANDOFF_REGISTRY.with(|reg| {
258        let mut reg = reg.borrow_mut();
259        let index = reg
260            .iter()
261            .position(|entry| entry.envelope_id == envelope_id)?;
262        Some(reg.remove(index))
263    })?;
264    record_lifecycle_audit(
265        "handoff_acknowledged",
266        serde_json::json!({
267            "envelope_id": envelope_id,
268            "decision": decision,
269        }),
270    );
271    Some(removed)
272}
273
274pub fn finalize_pipeline_disposition(disposition: Value) -> Value {
275    PIPELINE_DISPOSITION.with(|slot| *slot.borrow_mut() = Some(disposition.clone()));
276    let entry = record_lifecycle_audit(
277        "pipeline_finalized",
278        serde_json::json!({
279            "disposition": disposition,
280        }),
281    );
282    serde_json::json!({
283        "status": "finalized",
284        "method": "finalize",
285        "entry": entry.to_json(),
286    })
287}
288
289pub fn pipeline_disposition_snapshot() -> Option<Value> {
290    PIPELINE_DISPOSITION.with(|slot| slot.borrow().clone())
291}
292
293fn partial_handoff_snapshot_json() -> Vec<Value> {
294    let now_ms = crate::stdlib::clock::now_wall_ms();
295    PARTIAL_HANDOFF_REGISTRY.with(|reg| reg.borrow().iter().map(|e| e.to_json_at(now_ms)).collect())
296}
297
298fn next_seq() -> u64 {
299    LIFECYCLE_SEQ.with(|seq| {
300        let mut slot = seq.borrow_mut();
301        *slot += 1;
302        *slot
303    })
304}
305
306fn persist_lifecycle_audit_entry(entry: &LifecycleAuditEntry) {
307    let Some(log) = crate::event_log::active_event_log() else {
308        return;
309    };
310    let Ok(topic) = Topic::new(LIFECYCLE_AUDIT_TOPIC) else {
311        return;
312    };
313    let mut headers = BTreeMap::new();
314    headers.insert("kind".to_string(), entry.kind.clone());
315    headers.insert("seq".to_string(), entry.seq.to_string());
316    if let Some(pipeline_id) = entry.pipeline_id.as_ref() {
317        headers.insert("pipeline_id".to_string(), pipeline_id.clone());
318    }
319    let _ = futures::executor::block_on(log.append(
320        &topic,
321        LogEvent::new("lifecycle_audit", entry.to_json()).with_headers(headers),
322    ));
323}
324
325async fn queued_trigger_snapshot_json() -> Vec<Value> {
326    let Some(log) = crate::event_log::active_event_log() else {
327        return Vec::new();
328    };
329    let now_ms = lifecycle_now_ms();
330    let mut out = Vec::new();
331    out.extend(snapshot_inbox_triggers(log.as_ref(), now_ms).await);
332    out.extend(snapshot_worker_queue_triggers(log, now_ms).await);
333    out.sort_by(|left, right| {
334        let left_key = (
335            left.get("queued_at_ms")
336                .and_then(Value::as_i64)
337                .unwrap_or(i64::MAX),
338            left.get("id").and_then(Value::as_str).unwrap_or_default(),
339        );
340        let right_key = (
341            right
342                .get("queued_at_ms")
343                .and_then(Value::as_i64)
344                .unwrap_or(i64::MAX),
345            right.get("id").and_then(Value::as_str).unwrap_or_default(),
346        );
347        left_key.cmp(&right_key)
348    });
349    out
350}
351
352fn lifecycle_now_ms() -> i64 {
353    crate::clock_mock::now_ms()
354}
355
356async fn snapshot_inbox_triggers(log: &crate::event_log::AnyEventLog, now_ms: i64) -> Vec<Value> {
357    let Ok(inbox_topic) = Topic::new(crate::triggers::TRIGGER_INBOX_ENVELOPES_TOPIC) else {
358        return Vec::new();
359    };
360    let Ok(outbox_topic) = Topic::new(crate::triggers::TRIGGER_OUTBOX_TOPIC) else {
361        return Vec::new();
362    };
363    let Ok(cancel_topic) = Topic::new(crate::triggers::TRIGGER_CANCEL_REQUESTS_TOPIC) else {
364        return Vec::new();
365    };
366    let inbox = log
367        .read_range(&inbox_topic, None, usize::MAX)
368        .await
369        .unwrap_or_default();
370    let outbox = log
371        .read_range(&outbox_topic, None, usize::MAX)
372        .await
373        .unwrap_or_default();
374    let cancels = log
375        .read_range(&cancel_topic, None, usize::MAX)
376        .await
377        .unwrap_or_default();
378
379    let completed_events = outbox
380        .into_iter()
381        .filter_map(|(_, event)| {
382            let event_id = event
383                .headers
384                .get("event_id")
385                .cloned()
386                .or_else(|| json_string(&event.payload, &["event_id"]))?;
387            let binding_key = event
388                .headers
389                .get("binding_key")
390                .cloned()
391                .or_else(|| json_string(&event.payload, &["binding_key"]))
392                .unwrap_or_default();
393            Some((binding_key, event_id))
394        })
395        .collect::<BTreeSet<_>>();
396    let cancelled_events = cancels
397        .into_iter()
398        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
399        .filter_map(|(_, event)| {
400            let event_id = json_string(&event.payload, &["event_id"])?;
401            let binding_key = json_string(&event.payload, &["binding_key"]).unwrap_or_default();
402            Some((binding_key, event_id))
403        })
404        .collect::<BTreeSet<_>>();
405
406    inbox
407        .into_iter()
408        .filter(|(_, event)| event.kind == "event_ingested")
409        .filter_map(|(_, event)| {
410            let trigger = event
411                .payload
412                .get("event")
413                .cloned()
414                .unwrap_or_else(|| event.payload.clone());
415            let event_id = json_string(&trigger, &["id"])?;
416            let binding_key = event
417                .headers
418                .get("binding_key")
419                .cloned()
420                .or_else(|| {
421                    let trigger_id = event
422                        .headers
423                        .get("trigger_id")
424                        .cloned()
425                        .or_else(|| json_string(&event.payload, &["trigger_id"]))?;
426                    let version = json_u64(&event.payload, &["binding_version"])?;
427                    Some(format!("{trigger_id}@v{version}"))
428                });
429            if event_is_settled(&completed_events, binding_key.as_deref(), &event_id)
430                || event_is_settled(&cancelled_events, binding_key.as_deref(), &event_id)
431            {
432                return None;
433            }
434            let trigger_id = event
435                .headers
436                .get("trigger_id")
437                .cloned()
438                .or_else(|| json_string(&event.payload, &["trigger_id"]));
439            let queued_at_ms = event.occurred_at_ms;
440            let provider = json_string(&trigger, &["provider"]).unwrap_or_default();
441            let kind = json_string(&trigger, &["kind"]).unwrap_or_default();
442            let id = binding_key
443                .as_ref()
444                .map(|key| format!("trigger://{key}/{event_id}"))
445                .unwrap_or_else(|| format!("trigger://{event_id}"));
446            Some(serde_json::json!({
447                "id": id,
448                "event_id": event_id,
449                "trigger_id": trigger_id,
450                "binding_key": binding_key,
451                "spec_summary": trigger_spec_summary(provider.as_str(), kind.as_str(), trigger_id.as_deref()),
452                "queued_at_ms": queued_at_ms,
453                "age_ms": now_ms.saturating_sub(queued_at_ms).max(0),
454                "source": "trigger_inbox",
455            }))
456        })
457        .collect()
458}
459
460fn event_is_settled(
461    settled_events: &BTreeSet<(String, String)>,
462    binding_key: Option<&str>,
463    event_id: &str,
464) -> bool {
465    let scoped_key = binding_key.unwrap_or_default().to_string();
466    settled_events.contains(&(scoped_key, event_id.to_string()))
467        || settled_events.contains(&(String::new(), event_id.to_string()))
468}
469
470async fn snapshot_worker_queue_triggers(
471    log: std::sync::Arc<crate::event_log::AnyEventLog>,
472    now_ms: i64,
473) -> Vec<Value> {
474    let queue = crate::triggers::WorkerQueue::new(log);
475    let Ok(queues) = queue.known_queues().await else {
476        return Vec::new();
477    };
478    let mut out = Vec::new();
479    for queue_name in queues {
480        let Ok(state) = queue.queue_state(&queue_name).await else {
481            continue;
482        };
483        let queue_label = state.queue.clone();
484        for job in state.jobs {
485            if job.acked || job.purged {
486                continue;
487            }
488            let event_id = job.job.event.id.0.clone();
489            let id = format!("worker://{}/{}", queue_label, job.job_event_id);
490            out.push(serde_json::json!({
491                "id": id,
492                "event_id": event_id,
493                "trigger_id": job.job.trigger_id,
494                "binding_key": job.job.binding_key,
495                "spec_summary": trigger_spec_summary(
496                    job.job.event.provider.0.as_str(),
497                    job.job.event.kind.as_str(),
498                    Some(job.job.trigger_id.as_str())
499                ),
500                "queued_at_ms": job.enqueued_at_ms,
501                "age_ms": now_ms.saturating_sub(job.enqueued_at_ms).max(0),
502                "source": "worker_queue",
503                "queue": queue_label.clone(),
504                "job_event_id": job.job_event_id,
505                "claimed": job.active_claim.is_some(),
506            }));
507        }
508    }
509    out
510}
511
512fn trigger_spec_summary(provider: &str, kind: &str, trigger_id: Option<&str>) -> String {
513    match (
514        trigger_id.filter(|id| !id.is_empty()),
515        provider.is_empty(),
516        kind.is_empty(),
517    ) {
518        (Some(id), false, false) => format!("{id}: {provider}.{kind}"),
519        (Some(id), _, _) => id.to_string(),
520        (None, false, false) => format!("{provider}.{kind}"),
521        (None, false, true) => provider.to_string(),
522        (None, true, false) => kind.to_string(),
523        (None, true, true) => "trigger event".to_string(),
524    }
525}
526
527fn json_string(value: &Value, path: &[&str]) -> Option<String> {
528    let mut cursor = value;
529    for key in path {
530        cursor = cursor.get(*key)?;
531    }
532    cursor.as_str().map(ToString::to_string)
533}
534
535fn json_u64(value: &Value, path: &[&str]) -> Option<u64> {
536    let mut cursor = value;
537    for key in path {
538        cursor = cursor.get(*key)?;
539    }
540    cursor.as_u64()
541}
542
543fn payload_summary(payload: &Value) -> String {
544    match payload {
545        Value::Null => "nil".to_string(),
546        Value::Bool(value) => value.to_string(),
547        Value::Number(value) => value.to_string(),
548        Value::String(value) => {
549            let mut chars = value.chars();
550            let preview: String = chars.by_ref().take(80).collect();
551            if chars.next().is_some() {
552                format!("{preview}...")
553            } else {
554                preview
555            }
556        }
557        Value::Array(items) => format!("list(len={})", items.len()),
558        Value::Object(map) => {
559            let keys = map.keys().take(6).cloned().collect::<Vec<_>>().join(",");
560            if map.len() > 6 {
561                format!("object(keys={keys},...)")
562            } else {
563                format!("object(keys={keys})")
564            }
565        }
566    }
567}