Skip to main content

harn_cli/commands/trigger/
replay.rs

1use std::collections::BTreeMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use serde::Serialize;
6use serde_json::{json, Value as JsonValue};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10use harn_vm::corrections::{append_correction_record, CorrectionRecord, CorrectionScope};
11use harn_vm::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
12
13use crate::cli::TriggerReplayArgs;
14use crate::commands::trigger::ops::{
15    build_operation_audit, install_trigger_runtime, load_bulk_targets,
16    workspace_root_and_event_log, BulkTriggerTarget, ProgressReporter, RateLimiter,
17    TriggerEventRecord,
18};
19use crate::package;
20
21const TRIGGER_EVENTS_TOPIC: &str = "triggers.events";
22const TRIGGER_OUTBOX_TOPIC: &str = "trigger.outbox";
23const TRIGGER_DLQ_TOPIC: &str = "trigger.dlq";
24const ACTION_GRAPH_TOPIC: &str = "observability.action_graph";
25
26#[derive(Clone, Debug, Serialize)]
27pub struct DispatchOutcomeSummary {
28    pub status: String,
29    pub attempt_count: u32,
30    pub handler_kind: String,
31    pub target_uri: Option<String>,
32    pub result: Option<JsonValue>,
33    pub error: Option<String>,
34}
35
36#[derive(Clone, Debug, Serialize)]
37pub struct DriftField {
38    pub original: JsonValue,
39    pub replayed: JsonValue,
40}
41
42#[derive(Clone, Debug, Serialize)]
43pub struct DriftReport {
44    pub changed: bool,
45    pub fields: BTreeMap<String, DriftField>,
46}
47
48#[derive(Clone, Debug, Serialize)]
49pub struct TriggerReplayReport {
50    pub event_id: String,
51    pub binding_id: String,
52    pub binding_version: u32,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub as_of: Option<String>,
55    pub replay: DispatchOutcomeSummary,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub original: Option<DispatchOutcomeSummary>,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub drift: Option<DriftReport>,
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub correction: Option<CorrectionRecord>,
62}
63
64#[derive(Clone, Debug)]
65pub struct ReplaySteering {
66    pub step: String,
67    pub to_decision: JsonValue,
68    pub reason: String,
69    pub applied_by: String,
70    pub scope: CorrectionScope,
71}
72
73impl ReplaySteering {
74    pub fn new(
75        step: impl Into<String>,
76        to_decision: JsonValue,
77        reason: Option<String>,
78        applied_by: Option<String>,
79        scope: Option<&str>,
80    ) -> Result<Self, String> {
81        Ok(Self {
82            step: step.into(),
83            to_decision,
84            reason: reason.unwrap_or_else(default_correction_reason),
85            applied_by: applied_by.unwrap_or_else(default_correction_applied_by),
86            scope: scope
87                .map(CorrectionScope::parse)
88                .transpose()?
89                .unwrap_or_default(),
90        })
91    }
92
93    fn from_cli_decision(
94        step: impl Into<String>,
95        raw_to_decision: &str,
96        reason: Option<String>,
97        applied_by: Option<String>,
98        scope: Option<&str>,
99    ) -> Result<Self, String> {
100        Self::new(
101            step,
102            parse_decision_value(raw_to_decision),
103            reason,
104            applied_by,
105            scope,
106        )
107    }
108}
109
110#[derive(Clone, Debug, Serialize)]
111struct BulkReplayItem {
112    event_id: String,
113    binding_id: String,
114    binding_version: u32,
115    binding_key: String,
116    latest_status: String,
117    status: String,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    report: Option<TriggerReplayReport>,
120}
121
122#[derive(Clone, Debug, Serialize)]
123struct BulkReplayReport {
124    operation: String,
125    dry_run: bool,
126    filter: String,
127    matched_count: usize,
128    executed_count: usize,
129    skipped_count: usize,
130    #[serde(skip_serializing_if = "Option::is_none")]
131    audit_id: Option<String>,
132    items: Vec<BulkReplayItem>,
133}
134
135struct BulkReplayOptions<'a> {
136    diff: bool,
137    dry_run: bool,
138    progress: bool,
139    rate_limit: Option<f64>,
140    as_of: Option<&'a str>,
141}
142
143pub(crate) async fn run(args: TriggerReplayArgs) -> Result<(), String> {
144    let (workspace_root, event_log) = workspace_root_and_event_log()?;
145    let steering = replay_steering_from_args(&args)?;
146
147    if args.where_expr.is_none() {
148        let event_id = args
149            .event_id
150            .as_deref()
151            .ok_or_else(|| "missing trigger event id".to_string())?;
152        let report = replay_report_for_event_log(
153            event_log,
154            &workspace_root,
155            event_id,
156            args.as_of.as_deref(),
157            args.diff,
158            steering.as_ref(),
159        )
160        .await?;
161
162        println!(
163            "{}",
164            serde_json::to_string_pretty(&report)
165                .map_err(|error| format!("failed to encode replay report: {error}"))?
166        );
167        return Ok(());
168    }
169
170    if steering.is_some() {
171        return Err("--steer-from is only supported for single-event replay".to_string());
172    }
173
174    install_trigger_runtime(&workspace_root).await?;
175    let as_of = args.as_of.as_deref().map(parse_timestamp).transpose()?;
176    let where_expr = args.where_expr.as_deref().unwrap_or_default();
177    let (targets, normalized_filter) = load_bulk_targets(&event_log, where_expr, as_of).await?;
178    let report = replay_bulk_targets(
179        &event_log,
180        &workspace_root,
181        targets,
182        normalized_filter,
183        BulkReplayOptions {
184            diff: args.diff,
185            dry_run: args.dry_run,
186            progress: args.progress,
187            rate_limit: args.rate_limit,
188            as_of: args.as_of.as_deref(),
189        },
190    )
191    .await?;
192    println!(
193        "{}",
194        serde_json::to_string_pretty(&report)
195            .map_err(|error| format!("failed to encode replay report: {error}"))?
196    );
197    Ok(())
198}
199
200/// In-process entry point for `harn trigger replay --where ... [--diff] [--dry-run]`.
201/// Used by integration tests to drive the bulk replay path without spawning
202/// the `harn` binary; the binary entry calls the same internals via `run`.
203pub async fn replay_bulk_in_process(
204    event_log: Arc<AnyEventLog>,
205    workspace_root: &Path,
206    where_expr: &str,
207    diff: bool,
208    dry_run: bool,
209    rate_limit: Option<f64>,
210    as_of: Option<&str>,
211) -> Result<JsonValue, String> {
212    install_trigger_runtime(workspace_root).await?;
213    let as_of_dt = as_of.map(parse_timestamp).transpose()?;
214    let (targets, normalized_filter) = load_bulk_targets(&event_log, where_expr, as_of_dt).await?;
215    let report = replay_bulk_targets(
216        &event_log,
217        workspace_root,
218        targets,
219        normalized_filter,
220        BulkReplayOptions {
221            diff,
222            dry_run,
223            progress: false,
224            rate_limit,
225            as_of,
226        },
227    )
228    .await?;
229    serde_json::to_value(report).map_err(|error| format!("failed to encode replay report: {error}"))
230}
231
232pub async fn replay_report_for_event_log(
233    event_log: Arc<AnyEventLog>,
234    workspace_root: &Path,
235    event_id: &str,
236    as_of: Option<&str>,
237    diff: bool,
238    steering: Option<&ReplaySteering>,
239) -> Result<TriggerReplayReport, String> {
240    let recorded = load_recorded_event(&event_log, event_id).await?;
241    replay_report_for_record(event_log, workspace_root, recorded, as_of, diff, steering).await
242}
243
244async fn replay_report_for_record(
245    event_log: Arc<AnyEventLog>,
246    workspace_root: &Path,
247    recorded: TriggerEventRecord,
248    as_of: Option<&str>,
249    diff: bool,
250    steering: Option<&ReplaySteering>,
251) -> Result<TriggerReplayReport, String> {
252    let mut vm = build_replay_vm(workspace_root);
253    let extensions = package::load_runtime_extensions(workspace_root);
254    package::install_runtime_extensions(&extensions);
255    package::install_manifest_triggers(&mut vm, &extensions)
256        .await
257        .map_err(|error| format!("failed to install manifest triggers: {error}"))?;
258
259    let original = if diff {
260        Some(load_original_outcome(&event_log, &recorded).await?)
261    } else {
262        None
263    };
264    let as_of = as_of.map(parse_timestamp).transpose()?;
265    let binding = resolve_binding(&recorded, as_of)?;
266    let steering_from_decision = match steering {
267        Some(steering) => Some(
268            resolve_steering_from_decision(&event_log, &recorded, &binding, &steering.step).await?,
269        ),
270        None => None,
271    };
272
273    append_replay_record(&event_log, &binding, &recorded.event).await?;
274    let dispatcher = harn_vm::Dispatcher::with_event_log(vm, event_log.clone());
275    let replay = dispatcher
276        .dispatch_replay(
277            &binding,
278            recorded.event.clone(),
279            recorded.event.id.0.clone(),
280        )
281        .await
282        .map_err(|error| format!("trigger replay failed: {error}"))?;
283    let replay_summary = summarize_dispatch_outcome(&replay);
284    let correction = match (steering, steering_from_decision) {
285        (Some(steering), Some(from_decision)) => Some(
286            append_replay_correction(
287                &event_log,
288                &recorded,
289                &binding,
290                &replay_summary,
291                steering,
292                from_decision,
293            )
294            .await?,
295        ),
296        _ => None,
297    };
298
299    let drift = original
300        .as_ref()
301        .map(|original| diff_outcomes(original, &replay_summary));
302    Ok(TriggerReplayReport {
303        event_id: recorded.event.id.0,
304        binding_id: binding.id.as_str().to_string(),
305        binding_version: binding.version,
306        as_of: as_of.map(format_timestamp),
307        replay: replay_summary,
308        original,
309        drift,
310        correction,
311    })
312}
313
314async fn replay_bulk_targets(
315    event_log: &Arc<AnyEventLog>,
316    workspace_root: &Path,
317    targets: Vec<BulkTriggerTarget>,
318    normalized_filter: String,
319    options: BulkReplayOptions<'_>,
320) -> Result<BulkReplayReport, String> {
321    let matched_count = targets.len();
322    let mut items = Vec::new();
323    let mut executed_count = 0;
324    let mut skipped_count = 0;
325    let mut limiter = RateLimiter::new(options.rate_limit);
326    let mut progress_reporter = ProgressReporter::new(options.progress, "replay", matched_count);
327
328    for target in &targets {
329        if options.dry_run {
330            skipped_count += 1;
331            progress_reporter.update("dry_run");
332            items.push(BulkReplayItem {
333                event_id: target.event_id.clone(),
334                binding_id: target.binding_id.clone(),
335                binding_version: target.binding_version,
336                binding_key: target.binding_key.clone(),
337                latest_status: target.latest_status.clone(),
338                status: "dry_run".to_string(),
339                report: None,
340            });
341            continue;
342        }
343
344        limiter.wait().await;
345        let report = replay_report_for_record(
346            event_log.clone(),
347            workspace_root,
348            target.record.clone(),
349            options.as_of,
350            options.diff,
351            None,
352        )
353        .await?;
354        executed_count += 1;
355        progress_reporter.update(report.replay.status.as_str());
356        items.push(BulkReplayItem {
357            event_id: target.event_id.clone(),
358            binding_id: target.binding_id.clone(),
359            binding_version: target.binding_version,
360            binding_key: target.binding_key.clone(),
361            latest_status: target.latest_status.clone(),
362            status: report.replay.status.clone(),
363            report: Some(report),
364        });
365    }
366
367    let audit = build_operation_audit(
368        "replay",
369        options.dry_run,
370        Some(normalized_filter.clone()),
371        options.rate_limit,
372        matched_count,
373        executed_count,
374        skipped_count,
375        &targets,
376    );
377    let audit_id = append_replay_audit(event_log, &audit).await?;
378
379    Ok(BulkReplayReport {
380        operation: "replay".to_string(),
381        dry_run: options.dry_run,
382        filter: normalized_filter,
383        matched_count,
384        executed_count,
385        skipped_count,
386        audit_id: Some(audit_id),
387        items,
388    })
389}
390
391async fn append_replay_audit(
392    event_log: &Arc<AnyEventLog>,
393    audit: &crate::commands::trigger::ops::TriggerOperationAuditEntry,
394) -> Result<String, String> {
395    crate::commands::trigger::ops::append_operation_audit(event_log, audit).await?;
396    Ok(audit.id.clone())
397}
398
399pub fn build_replay_vm(workspace_root: &Path) -> harn_vm::Vm {
400    let mut vm = harn_vm::Vm::new();
401    harn_vm::register_vm_stdlib(&mut vm);
402    crate::install_default_hostlib(&mut vm);
403    harn_vm::register_store_builtins(&mut vm, workspace_root);
404    harn_vm::register_metadata_builtins(&mut vm, workspace_root);
405    harn_vm::register_checkpoint_builtins(&mut vm, workspace_root, "trigger-replay");
406    vm.set_project_root(workspace_root);
407    vm.set_source_dir(workspace_root);
408    vm
409}
410
411fn replay_steering_from_args(args: &TriggerReplayArgs) -> Result<Option<ReplaySteering>, String> {
412    let Some(step) = args.steer_from.as_ref() else {
413        return Ok(None);
414    };
415    let raw_to_decision = args
416        .to_decision
417        .as_deref()
418        .ok_or_else(|| "--steer-from requires --to-decision".to_string())?;
419    ReplaySteering::from_cli_decision(
420        step.clone(),
421        raw_to_decision,
422        args.reason.clone(),
423        args.applied_by.clone(),
424        args.scope.as_deref(),
425    )
426    .map(Some)
427}
428
429fn default_correction_reason() -> String {
430    "manual replay steering".to_string()
431}
432
433fn default_correction_applied_by() -> String {
434    std::env::var("HARN_APPLIED_BY").unwrap_or_else(|_| "operator".to_string())
435}
436
437fn parse_decision_value(raw: &str) -> JsonValue {
438    serde_json::from_str(raw).unwrap_or_else(|_| JsonValue::String(raw.to_string()))
439}
440
441async fn resolve_steering_from_decision(
442    event_log: &Arc<AnyEventLog>,
443    recorded: &TriggerEventRecord,
444    binding: &harn_vm::triggers::registry::TriggerBinding,
445    step: &str,
446) -> Result<JsonValue, String> {
447    match step {
448        "event" | "trigger" => serde_json::to_value(&recorded.event)
449            .map_err(|error| format!("failed to encode trigger event for correction: {error}")),
450        "outcome" | "dispatch" | "terminal" => serde_json::to_value(
451            load_original_outcome(event_log, recorded).await?,
452        )
453        .map_err(|error| {
454            format!("failed to encode original dispatch outcome for correction: {error}")
455        }),
456        other => {
457            let binding_key = binding.binding_key();
458            load_action_graph_node(event_log, &recorded.event.id.0, &binding_key, other)
459                .await?
460                .ok_or_else(|| {
461                    format!(
462                        "unknown replay step '{other}'; expected event, outcome, or an action graph node id"
463                    )
464                })
465        }
466    }
467}
468
469async fn load_action_graph_node(
470    event_log: &Arc<AnyEventLog>,
471    event_id: &str,
472    binding_key: &str,
473    step: &str,
474) -> Result<Option<JsonValue>, String> {
475    let topic = Topic::new(ACTION_GRAPH_TOPIC)
476        .map_err(|error| format!("invalid action graph topic: {error}"))?;
477    let events = event_log
478        .read_range(&topic, None, usize::MAX)
479        .await
480        .map_err(|error| format!("failed to read action graph updates: {error}"))?;
481    for (_, event) in events {
482        let context = event.payload.get("context");
483        if context
484            .and_then(|value| value.get("event_id"))
485            .and_then(|value| value.as_str())
486            .is_some_and(|candidate| candidate != event_id)
487        {
488            continue;
489        }
490        if context
491            .and_then(|value| value.get("binding_key"))
492            .and_then(|value| value.as_str())
493            .is_some_and(|candidate| candidate != binding_key)
494        {
495            continue;
496        }
497        let Some(nodes) = event.payload["observability"]["action_graph_nodes"].as_array() else {
498            continue;
499        };
500        if let Some(node) = nodes.iter().find(|node| {
501            node.get("id").and_then(|value| value.as_str()) == Some(step)
502                || node.get("node_id").and_then(|value| value.as_str()) == Some(step)
503        }) {
504            return Ok(Some(node.clone()));
505        }
506    }
507    Ok(None)
508}
509
510async fn append_replay_correction(
511    event_log: &Arc<AnyEventLog>,
512    recorded: &TriggerEventRecord,
513    binding: &harn_vm::triggers::registry::TriggerBinding,
514    replay: &DispatchOutcomeSummary,
515    steering: &ReplaySteering,
516    from_decision: JsonValue,
517) -> Result<CorrectionRecord, String> {
518    let mut record = CorrectionRecord::new(
519        from_decision,
520        steering.to_decision.clone(),
521        steering.reason.clone(),
522        steering.applied_by.clone(),
523        steering.scope,
524    );
525    record.actor_id = Some(binding.id.as_str().to_string());
526    record.action = Some(format!(
527        "{}.{}",
528        recorded.event.provider.as_str(),
529        recorded.event.kind
530    ));
531    record.trace_id = Some(recorded.event.trace_id.0.clone());
532    record.step = Some(steering.step.clone());
533    record.metadata.insert(
534        "event_id".to_string(),
535        serde_json::json!(recorded.event.id.0),
536    );
537    record.metadata.insert(
538        "binding_key".to_string(),
539        serde_json::json!(binding.binding_key()),
540    );
541    record.metadata.insert(
542        "binding_version".to_string(),
543        serde_json::json!(binding.version),
544    );
545    record.metadata.insert(
546        "replay_status".to_string(),
547        serde_json::json!(replay.status),
548    );
549    append_correction_record(event_log, &record)
550        .await
551        .map_err(|error| format!("failed to append correction record: {error}"))
552}
553
554pub(crate) fn parse_timestamp(raw: &str) -> Result<OffsetDateTime, String> {
555    if let Ok(parsed) = OffsetDateTime::parse(raw, &Rfc3339) {
556        return Ok(parsed);
557    }
558    if let Ok(unix) = raw.parse::<i64>() {
559        let parsed = if raw.len() > 10 {
560            OffsetDateTime::from_unix_timestamp_nanos(unix as i128 * 1_000_000)
561        } else {
562            OffsetDateTime::from_unix_timestamp(unix)
563        };
564        return parsed.map_err(|error| format!("invalid --as-of timestamp '{raw}': {error}"));
565    }
566    Err(format!(
567        "invalid --as-of timestamp '{raw}': expected RFC3339 or unix seconds/milliseconds"
568    ))
569}
570
571use crate::format::format_timestamp_rfc3339 as format_timestamp;
572
573async fn load_recorded_event(
574    event_log: &Arc<AnyEventLog>,
575    event_id: &str,
576) -> Result<TriggerEventRecord, String> {
577    let topic = Topic::new(TRIGGER_EVENTS_TOPIC)
578        .map_err(|error| format!("invalid trigger events topic: {error}"))?;
579    let events = event_log
580        .read_range(&topic, None, usize::MAX)
581        .await
582        .map_err(|error| format!("failed to read trigger events: {error}"))?;
583
584    let mut replay_match = None;
585    for (_, event) in events {
586        let Ok(record) = serde_json::from_value::<TriggerEventRecord>(event.payload) else {
587            continue;
588        };
589        if record.event.id.0 != event_id {
590            continue;
591        }
592        if record.replay_of_event_id.is_none() {
593            return Ok(record);
594        }
595        replay_match.get_or_insert(record);
596    }
597
598    if let Some(record) = replay_match {
599        return Ok(record);
600    }
601
602    load_ingested_event(event_log, event_id).await
603}
604
605async fn load_ingested_event(
606    event_log: &Arc<AnyEventLog>,
607    event_id: &str,
608) -> Result<TriggerEventRecord, String> {
609    let envelopes_topic = Topic::new(harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC)
610        .map_err(|error| format!("invalid trigger inbox topic: {error}"))?;
611    let legacy_topic = Topic::new(harn_vm::TRIGGER_INBOX_LEGACY_TOPIC)
612        .map_err(|error| format!("invalid trigger inbox legacy topic: {error}"))?;
613    let mut envelopes = event_log
614        .read_range(&envelopes_topic, None, usize::MAX)
615        .await
616        .map_err(|error| format!("failed to read trigger inbox envelopes: {error}"))?;
617    let legacy = event_log
618        .read_range(&legacy_topic, None, usize::MAX)
619        .await
620        .map_err(|error| format!("failed to read legacy trigger inbox envelopes: {error}"))?;
621    envelopes.extend(legacy);
622    for (_, event) in envelopes {
623        if event.kind != "event_ingested" {
624            continue;
625        }
626        let Ok(envelope) =
627            serde_json::from_value::<harn_vm::triggers::dispatcher::InboxEnvelope>(event.payload)
628        else {
629            continue;
630        };
631        let (Some(binding_id), Some(binding_version)) =
632            (envelope.trigger_id, envelope.binding_version)
633        else {
634            continue;
635        };
636        if envelope.event.id.0 != event_id {
637            continue;
638        }
639        return Ok(TriggerEventRecord {
640            binding_id,
641            binding_version,
642            replay_of_event_id: None,
643            event: envelope.event,
644        });
645    }
646
647    Err(format!("unknown trigger event id '{event_id}'"))
648}
649
650fn resolve_binding(
651    recorded: &TriggerEventRecord,
652    as_of: Option<OffsetDateTime>,
653) -> Result<Arc<harn_vm::triggers::registry::TriggerBinding>, String> {
654    if let Some(as_of) = as_of {
655        return harn_vm::resolve_trigger_binding_as_of(&recorded.binding_id, as_of).map_err(
656            |error| {
657                format!(
658                    "failed to resolve binding '{}' as of {}: {}",
659                    recorded.binding_id,
660                    format_timestamp(as_of),
661                    error
662                )
663            },
664        );
665    }
666
667    harn_vm::resolve_live_or_as_of(
668        &recorded.binding_id,
669        harn_vm::RecordedTriggerBinding {
670            version: recorded.binding_version,
671            received_at: recorded.event.received_at,
672        },
673    )
674    .map_err(|error| {
675        format!(
676            "failed to resolve recorded binding '{}@v{}' for replay: {}",
677            recorded.binding_id, recorded.binding_version, error
678        )
679    })
680}
681
682async fn append_replay_record(
683    event_log: &Arc<AnyEventLog>,
684    binding: &harn_vm::triggers::registry::TriggerBinding,
685    event: &harn_vm::TriggerEvent,
686) -> Result<(), String> {
687    let topic = Topic::new(TRIGGER_EVENTS_TOPIC)
688        .map_err(|error| format!("invalid trigger events topic: {error}"))?;
689    event_log
690        .append(
691            &topic,
692            LogEvent::new(
693                "trigger_event",
694                serde_json::to_value(TriggerEventRecord {
695                    binding_id: binding.id.as_str().to_string(),
696                    binding_version: binding.version,
697                    replay_of_event_id: Some(event.id.0.clone()),
698                    event: event.clone(),
699                })
700                .unwrap_or(JsonValue::Null),
701            ),
702        )
703        .await
704        .map(|_| ())
705        .map_err(|error| format!("failed to append replay record: {error}"))
706}
707
708async fn load_original_outcome(
709    event_log: &Arc<AnyEventLog>,
710    recorded: &TriggerEventRecord,
711) -> Result<DispatchOutcomeSummary, String> {
712    let binding_key = format!("{}@v{}", recorded.binding_id, recorded.binding_version);
713    if let Some(outcome) =
714        load_original_terminal_outcome(event_log, &recorded.event.id.0, &binding_key).await?
715    {
716        return Ok(outcome);
717    }
718
719    load_skipped_outcome(event_log, &recorded.event.id.0, &binding_key)
720        .await?
721        .ok_or_else(|| {
722            format!(
723                "no stored original outcome found for '{}@v{}' event '{}'",
724                recorded.binding_id, recorded.binding_version, recorded.event.id.0
725            )
726        })
727}
728
729async fn load_original_terminal_outcome(
730    event_log: &Arc<AnyEventLog>,
731    event_id: &str,
732    binding_key: &str,
733) -> Result<Option<DispatchOutcomeSummary>, String> {
734    let outbox_topic = Topic::new(TRIGGER_OUTBOX_TOPIC)
735        .map_err(|error| format!("invalid trigger outbox topic: {error}"))?;
736    let dlq_topic = Topic::new(TRIGGER_DLQ_TOPIC)
737        .map_err(|error| format!("invalid trigger dlq topic: {error}"))?;
738
739    let outbox_events = event_log
740        .read_range(&outbox_topic, None, usize::MAX)
741        .await
742        .map_err(|error| format!("failed to read trigger outbox: {error}"))?;
743    let dlq_events = event_log
744        .read_range(&dlq_topic, None, usize::MAX)
745        .await
746        .map_err(|error| format!("failed to read trigger dlq: {error}"))?;
747
748    let mut success = None;
749    let mut failure = None;
750    for (_, event) in outbox_events {
751        if !matches_original_dispatch(&event, event_id, binding_key) {
752            continue;
753        }
754        let attempt = header_u32(&event, "attempt").unwrap_or(0);
755        let handler_kind = header_text(&event, "handler_kind").unwrap_or_default();
756        let target_uri = event
757            .payload
758            .get("target_uri")
759            .cloned()
760            .and_then(|value| value.as_str().map(str::to_string));
761        match event.kind.as_str() {
762            "dispatch_succeeded" => {
763                success = Some(DispatchOutcomeSummary {
764                    status: "succeeded".to_string(),
765                    attempt_count: attempt,
766                    handler_kind,
767                    target_uri,
768                    result: event.payload.get("result").cloned(),
769                    error: None,
770                });
771            }
772            "dispatch_failed" => {
773                let error = event
774                    .payload
775                    .get("error")
776                    .and_then(|value| value.as_str())
777                    .map(str::to_string);
778                failure = Some(DispatchOutcomeSummary {
779                    status: failure_status(error.as_deref()),
780                    attempt_count: attempt,
781                    handler_kind,
782                    target_uri,
783                    result: None,
784                    error,
785                });
786            }
787            _ => {}
788        }
789    }
790
791    for (_, event) in dlq_events {
792        if !matches_original_dispatch(&event, event_id, binding_key) || event.kind != "dlq_moved" {
793            continue;
794        }
795        let attempt_count = event
796            .payload
797            .get("attempt_count")
798            .and_then(|value| value.as_u64())
799            .unwrap_or(0) as u32;
800        return Ok(Some(DispatchOutcomeSummary {
801            status: "dlq".to_string(),
802            attempt_count,
803            handler_kind: header_text(&event, "handler_kind").unwrap_or_default(),
804            target_uri: None,
805            result: None,
806            error: event
807                .payload
808                .get("final_error")
809                .and_then(|value| value.as_str())
810                .map(str::to_string),
811        }));
812    }
813
814    Ok(success.or(failure))
815}
816
817async fn load_skipped_outcome(
818    event_log: &Arc<AnyEventLog>,
819    event_id: &str,
820    binding_key: &str,
821) -> Result<Option<DispatchOutcomeSummary>, String> {
822    let topic = Topic::new(ACTION_GRAPH_TOPIC)
823        .map_err(|error| format!("invalid action graph topic: {error}"))?;
824    let events = event_log
825        .read_range(&topic, None, usize::MAX)
826        .await
827        .map_err(|error| format!("failed to read action graph updates: {error}"))?;
828
829    for (_, event) in events {
830        let Some(context) = event.payload.get("context") else {
831            continue;
832        };
833        if context.get("event_id").and_then(|value| value.as_str()) != Some(event_id) {
834            continue;
835        }
836        if context.get("binding_key").and_then(|value| value.as_str()) != Some(binding_key) {
837            continue;
838        }
839        if context
840            .get("replay_of_event_id")
841            .and_then(|value| value.as_str())
842            .is_some()
843        {
844            continue;
845        }
846        let Some(nodes) = event.payload["observability"]["action_graph_nodes"].as_array() else {
847            continue;
848        };
849        let predicate = nodes.iter().find(|node| {
850            node.get("kind").and_then(|value| value.as_str()) == Some("predicate")
851                && node.get("outcome").and_then(|value| value.as_str()) == Some("false")
852        });
853        if let Some(predicate) = predicate {
854            return Ok(Some(DispatchOutcomeSummary {
855                status: "skipped".to_string(),
856                attempt_count: 0,
857                handler_kind: String::new(),
858                target_uri: None,
859                result: Some(json!({
860                    "skipped": true,
861                    "predicate": predicate.get("label").cloned().unwrap_or(JsonValue::Null),
862                })),
863                error: None,
864            }));
865        }
866    }
867
868    Ok(None)
869}
870
871fn matches_original_dispatch(event: &LogEvent, event_id: &str, binding_key: &str) -> bool {
872    header_text(event, "event_id") == Some(event_id.to_string())
873        && header_text(event, "binding_key") == Some(binding_key.to_string())
874        && header_text(event, "replay_of_event_id").is_none()
875}
876
877fn header_text(event: &LogEvent, key: &str) -> Option<String> {
878    event.headers.get(key).cloned()
879}
880
881fn header_u32(event: &LogEvent, key: &str) -> Option<u32> {
882    event.headers.get(key).and_then(|value| value.parse().ok())
883}
884
885fn failure_status(error: Option<&str>) -> String {
886    if error.is_some_and(|error| error.contains("cancelled")) {
887        "cancelled".to_string()
888    } else {
889        "failed".to_string()
890    }
891}
892
893fn summarize_dispatch_outcome(outcome: &harn_vm::DispatchOutcome) -> DispatchOutcomeSummary {
894    DispatchOutcomeSummary {
895        status: match outcome.status {
896            harn_vm::DispatchStatus::Succeeded => "succeeded".to_string(),
897            harn_vm::DispatchStatus::Failed => "failed".to_string(),
898            harn_vm::DispatchStatus::Dlq => "dlq".to_string(),
899            harn_vm::DispatchStatus::Skipped => "skipped".to_string(),
900            harn_vm::DispatchStatus::Waiting => "waiting".to_string(),
901            harn_vm::DispatchStatus::Cancelled => "cancelled".to_string(),
902        },
903        attempt_count: outcome.attempt_count,
904        handler_kind: outcome.handler_kind.clone(),
905        target_uri: Some(outcome.target_uri.clone()),
906        result: outcome.result.clone(),
907        error: outcome.error.clone(),
908    }
909}
910
911fn diff_outcomes(
912    original: &DispatchOutcomeSummary,
913    replayed: &DispatchOutcomeSummary,
914) -> DriftReport {
915    let original = serde_json::to_value(original).unwrap_or(JsonValue::Null);
916    let replayed = serde_json::to_value(replayed).unwrap_or(JsonValue::Null);
917    let mut fields = BTreeMap::new();
918
919    let original = original.as_object().cloned().unwrap_or_default();
920    let replayed = replayed.as_object().cloned().unwrap_or_default();
921    let mut keys = original.keys().cloned().collect::<Vec<_>>();
922    for key in replayed.keys() {
923        if !keys.iter().any(|existing| existing == key) {
924            keys.push(key.clone());
925        }
926    }
927    keys.sort();
928    keys.dedup();
929
930    for key in keys {
931        let left = original.get(&key).cloned().unwrap_or(JsonValue::Null);
932        let right = replayed.get(&key).cloned().unwrap_or(JsonValue::Null);
933        if left != right {
934            fields.insert(
935                key,
936                DriftField {
937                    original: left,
938                    replayed: right,
939                },
940            );
941        }
942    }
943
944    DriftReport {
945        changed: !fields.is_empty(),
946        fields,
947    }
948}
949
950#[cfg(test)]
951// Tests below hold the shared `lock_harn_state` guard across `.await`
952// points; the guard is dropped when each `#[tokio::test]` future resolves
953// so this is safe in practice, matching the pattern already in
954// `mcp/serve.rs`.
955#[allow(clippy::await_holding_lock)]
956mod tests {
957    use std::collections::BTreeMap;
958    use std::fs;
959    use std::path::Path;
960    use std::rc::Rc;
961    use std::sync::Arc;
962
963    use harn_vm::event_log::{
964        install_default_for_base_dir, AnyEventLog, EventLog, LogEvent, Topic,
965    };
966    use harn_vm::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
967    use harn_vm::triggers::event::{CronEventPayload, KnownProviderPayload};
968    use time::OffsetDateTime;
969
970    use super::{
971        append_replay_record, build_replay_vm, load_recorded_event, replay_report_for_record,
972        resolve_binding, summarize_dispatch_outcome, ReplaySteering, TriggerEventRecord,
973        TRIGGER_EVENTS_TOPIC,
974    };
975    use crate::package;
976
977    const TEST_TRIGGER_ID: &str = "replay-cron";
978
979    #[tokio::test(flavor = "current_thread")]
980    async fn replay_falls_back_to_recorded_timestamp_when_version_lookup_is_stale() {
981        let _guard = crate::tests::common::harn_state_lock::lock_harn_state();
982        harn_vm::reset_thread_local_state();
983        let sink = Rc::new(CollectorSink::new());
984        clear_event_sinks();
985        add_event_sink(sink.clone());
986
987        let tempdir = tempfile::tempdir().expect("tempdir");
988        let workspace_root = tempdir.path();
989        let event_log = install_default_for_base_dir(workspace_root).expect("install event log");
990
991        install_local_manifest(workspace_root, "on_tick_v1");
992        install_workspace_manifest(workspace_root).await;
993        install_local_manifest(workspace_root, "on_tick_v2");
994        install_workspace_manifest(workspace_root).await;
995        install_local_manifest(workspace_root, "on_tick_v3");
996        install_workspace_manifest(workspace_root).await;
997
998        let current = harn_vm::resolve_live_trigger_binding(TEST_TRIGGER_ID, None)
999            .expect("resolve active binding");
1000        assert_eq!(current.version, 3);
1001        assert!(matches!(
1002            harn_vm::resolve_live_trigger_binding(TEST_TRIGGER_ID, Some(1)),
1003            Err(harn_vm::TriggerRegistryError::UnknownBindingVersion { .. })
1004        ));
1005
1006        append_trigger_event(
1007            &event_log,
1008            TriggerEventRecord {
1009                binding_id: TEST_TRIGGER_ID.to_string(),
1010                binding_version: 1,
1011                replay_of_event_id: None,
1012                event: recorded_cron_event("evt-stale", OffsetDateTime::now_utc()),
1013            },
1014        )
1015        .await;
1016
1017        let recorded = load_recorded_event(&event_log, "evt-stale")
1018            .await
1019            .expect("load recorded event");
1020        let binding = resolve_binding(&recorded, None).expect("resolve fallback binding");
1021        append_replay_record(&event_log, &binding, &recorded.event)
1022            .await
1023            .expect("append replay record");
1024
1025        let dispatcher =
1026            harn_vm::Dispatcher::with_event_log(build_replay_vm(workspace_root), event_log.clone());
1027        let outcome = dispatcher
1028            .dispatch_replay(
1029                &binding,
1030                recorded.event.clone(),
1031                recorded.event.id.0.clone(),
1032            )
1033            .await
1034            .expect("dispatch replay succeeds");
1035        let replay = summarize_dispatch_outcome(&outcome);
1036        assert_eq!(replay.status, "succeeded");
1037
1038        let topic = Topic::new(TRIGGER_EVENTS_TOPIC).expect("valid trigger events topic");
1039        let records: Vec<TriggerEventRecord> = event_log
1040            .read_range(&topic, None, usize::MAX)
1041            .await
1042            .expect("read trigger events")
1043            .into_iter()
1044            .map(|(_, event)| serde_json::from_value(event.payload).expect("decode trigger event"))
1045            .collect();
1046
1047        assert!(records.iter().any(|record| {
1048            record.replay_of_event_id.as_deref() == Some("evt-stale")
1049                && record.binding_id == TEST_TRIGGER_ID
1050                && record.binding_version == 3
1051        }));
1052        assert!(sink.logs.borrow().iter().any(|log| {
1053            log.level == EventLevel::Warn
1054                && log.category == "replay.binding_version_gc_fallback"
1055                && log.metadata.get("trigger_id") == Some(&serde_json::json!(TEST_TRIGGER_ID))
1056                && log.metadata.get("recorded_version") == Some(&serde_json::json!(1))
1057                && log.metadata.get("resolved_version") == Some(&serde_json::json!(3))
1058        }));
1059
1060        harn_vm::reset_thread_local_state();
1061    }
1062
1063    #[tokio::test(flavor = "current_thread")]
1064    async fn replay_steering_appends_correction_and_adapts_policy() {
1065        let _guard = crate::tests::common::harn_state_lock::lock_harn_state();
1066        harn_vm::reset_thread_local_state();
1067
1068        let tempdir = tempfile::tempdir().expect("tempdir");
1069        let workspace_root = tempdir.path();
1070        let event_log = install_default_for_base_dir(workspace_root).expect("install event log");
1071
1072        install_local_manifest(workspace_root, "on_tick_v1");
1073        append_trigger_event(
1074            &event_log,
1075            TriggerEventRecord {
1076                binding_id: TEST_TRIGGER_ID.to_string(),
1077                binding_version: 1,
1078                replay_of_event_id: None,
1079                event: recorded_cron_event("evt-steer", OffsetDateTime::now_utc()),
1080            },
1081        )
1082        .await;
1083
1084        let recorded = load_recorded_event(&event_log, "evt-steer")
1085            .await
1086            .expect("load recorded event");
1087        let steering = ReplaySteering {
1088            step: "event".to_string(),
1089            to_decision: serde_json::json!({"kind": "cron.steered"}),
1090            reason: "human corrected replay routing".to_string(),
1091            applied_by: "alice".to_string(),
1092            scope: harn_vm::CorrectionScope::ThisPersona,
1093        };
1094
1095        let report = replay_report_for_record(
1096            event_log.clone(),
1097            workspace_root,
1098            recorded,
1099            None,
1100            false,
1101            Some(&steering),
1102        )
1103        .await
1104        .expect("replay report");
1105
1106        let correction = report.correction.expect("correction record");
1107        assert_eq!(correction.actor_id.as_deref(), Some(TEST_TRIGGER_ID));
1108        assert_eq!(correction.step.as_deref(), Some("event"));
1109        assert_eq!(correction.applied_by, "alice");
1110        assert_eq!(correction.scope, harn_vm::CorrectionScope::ThisPersona);
1111
1112        let corrections = harn_vm::query_correction_records(
1113            &event_log,
1114            &harn_vm::CorrectionQueryFilters {
1115                actor_id: Some(TEST_TRIGGER_ID.to_string()),
1116                ..harn_vm::CorrectionQueryFilters::default()
1117            },
1118        )
1119        .await
1120        .expect("query corrections");
1121        assert_eq!(corrections.len(), 1);
1122
1123        let policy = harn_vm::policy_for_agent(&event_log, TEST_TRIGGER_ID)
1124            .await
1125            .expect("policy for agent");
1126        assert_eq!(policy.side_effect_level.as_deref(), Some("read_only"));
1127
1128        harn_vm::reset_thread_local_state();
1129    }
1130
1131    fn install_local_manifest(root: &Path, handler_name: &str) {
1132        std::fs::create_dir_all(root.join(".git")).expect("create .git");
1133        fs::write(
1134            root.join("harn.toml"),
1135            format!(
1136                r#"
1137[package]
1138name = "workspace"
1139
1140[exports]
1141handlers = "lib.harn"
1142
1143[[triggers]]
1144id = "{TEST_TRIGGER_ID}"
1145kind = "cron"
1146provider = "cron"
1147match = {{ events = ["cron.tick"] }}
1148schedule = "* * * * *"
1149timezone = "UTC"
1150handler = "handlers::{handler_name}"
1151"#
1152            ),
1153        )
1154        .expect("write manifest");
1155        fs::write(
1156            root.join("lib.harn"),
1157            format!(
1158                r#"
1159import "std/triggers"
1160
1161pub fn {handler_name}(event: TriggerEvent) -> string {{
1162  return event.kind
1163}}
1164"#
1165            ),
1166        )
1167        .expect("write lib");
1168        fs::write(root.join("main.harn"), "pipeline main() {}\n").expect("write main");
1169    }
1170
1171    async fn install_workspace_manifest(root: &Path) {
1172        let mut vm = super::build_replay_vm(root);
1173        let extensions = package::load_runtime_extensions(&root.join("main.harn"));
1174        package::install_manifest_triggers(&mut vm, &extensions)
1175            .await
1176            .expect("install manifest triggers");
1177    }
1178
1179    fn recorded_cron_event(event_id: &str, received_at: OffsetDateTime) -> harn_vm::TriggerEvent {
1180        harn_vm::TriggerEvent {
1181            id: harn_vm::TriggerEventId(event_id.to_string()),
1182            provider: harn_vm::ProviderId::from("cron"),
1183            kind: "cron.tick".to_string(),
1184            received_at,
1185            occurred_at: None,
1186            dedupe_key: format!("delivery-{event_id}"),
1187            trace_id: harn_vm::TraceId(format!("trace-{event_id}")),
1188            tenant_id: None,
1189            headers: BTreeMap::new(),
1190            batch: None,
1191            raw_body: None,
1192            provider_payload: harn_vm::ProviderPayload::Known(KnownProviderPayload::Cron(
1193                CronEventPayload {
1194                    cron_id: Some("test-cron".to_string()),
1195                    schedule: Some("* * * * *".to_string()),
1196                    tick_at: received_at,
1197                    raw: serde_json::json!({ "event_id": event_id }),
1198                },
1199            )),
1200            signature_status: harn_vm::SignatureStatus::Verified,
1201            dedupe_claimed: false,
1202        }
1203    }
1204
1205    async fn append_trigger_event(event_log: &Arc<AnyEventLog>, record: TriggerEventRecord) {
1206        let topic = Topic::new(TRIGGER_EVENTS_TOPIC).expect("valid trigger events topic");
1207        event_log
1208            .append(
1209                &topic,
1210                LogEvent::new(
1211                    "trigger_event",
1212                    serde_json::to_value(record).expect("encode trigger event"),
1213                ),
1214            )
1215            .await
1216            .expect("append trigger event");
1217    }
1218}