1use std::collections::BTreeMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use serde::Serialize;
6use serde_json::{json, Value as JsonValue};
7use time::format_description::well_known::Rfc3339;
8use time::OffsetDateTime;
9
10use harn_vm::corrections::{append_correction_record, CorrectionRecord, CorrectionScope};
11use harn_vm::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
12
13use crate::cli::TriggerReplayArgs;
14use crate::commands::trigger::ops::{
15 build_operation_audit, install_trigger_runtime, load_bulk_targets,
16 workspace_root_and_event_log, BulkTriggerTarget, ProgressReporter, RateLimiter,
17 TriggerEventRecord,
18};
19use crate::package;
20
21const TRIGGER_EVENTS_TOPIC: &str = "triggers.events";
22const TRIGGER_OUTBOX_TOPIC: &str = "trigger.outbox";
23const TRIGGER_DLQ_TOPIC: &str = "trigger.dlq";
24const ACTION_GRAPH_TOPIC: &str = "observability.action_graph";
25
26#[derive(Clone, Debug, Serialize)]
27pub struct DispatchOutcomeSummary {
28 pub status: String,
29 pub attempt_count: u32,
30 pub handler_kind: String,
31 pub target_uri: Option<String>,
32 pub result: Option<JsonValue>,
33 pub error: Option<String>,
34}
35
36#[derive(Clone, Debug, Serialize)]
37pub struct DriftField {
38 pub original: JsonValue,
39 pub replayed: JsonValue,
40}
41
42#[derive(Clone, Debug, Serialize)]
43pub struct DriftReport {
44 pub changed: bool,
45 pub fields: BTreeMap<String, DriftField>,
46}
47
48#[derive(Clone, Debug, Serialize)]
49pub struct TriggerReplayReport {
50 pub event_id: String,
51 pub binding_id: String,
52 pub binding_version: u32,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub as_of: Option<String>,
55 pub replay: DispatchOutcomeSummary,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub original: Option<DispatchOutcomeSummary>,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub drift: Option<DriftReport>,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 pub correction: Option<CorrectionRecord>,
62}
63
64#[derive(Clone, Debug)]
65pub struct ReplaySteering {
66 pub step: String,
67 pub to_decision: JsonValue,
68 pub reason: String,
69 pub applied_by: String,
70 pub scope: CorrectionScope,
71}
72
73impl ReplaySteering {
74 pub fn new(
75 step: impl Into<String>,
76 to_decision: JsonValue,
77 reason: Option<String>,
78 applied_by: Option<String>,
79 scope: Option<&str>,
80 ) -> Result<Self, String> {
81 Ok(Self {
82 step: step.into(),
83 to_decision,
84 reason: reason.unwrap_or_else(default_correction_reason),
85 applied_by: applied_by.unwrap_or_else(default_correction_applied_by),
86 scope: scope
87 .map(CorrectionScope::parse)
88 .transpose()?
89 .unwrap_or_default(),
90 })
91 }
92
93 fn from_cli_decision(
94 step: impl Into<String>,
95 raw_to_decision: &str,
96 reason: Option<String>,
97 applied_by: Option<String>,
98 scope: Option<&str>,
99 ) -> Result<Self, String> {
100 Self::new(
101 step,
102 parse_decision_value(raw_to_decision),
103 reason,
104 applied_by,
105 scope,
106 )
107 }
108}
109
110#[derive(Clone, Debug, Serialize)]
111struct BulkReplayItem {
112 event_id: String,
113 binding_id: String,
114 binding_version: u32,
115 binding_key: String,
116 latest_status: String,
117 status: String,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 report: Option<TriggerReplayReport>,
120}
121
122#[derive(Clone, Debug, Serialize)]
123struct BulkReplayReport {
124 operation: String,
125 dry_run: bool,
126 filter: String,
127 matched_count: usize,
128 executed_count: usize,
129 skipped_count: usize,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 audit_id: Option<String>,
132 items: Vec<BulkReplayItem>,
133}
134
135struct BulkReplayOptions<'a> {
136 diff: bool,
137 dry_run: bool,
138 progress: bool,
139 rate_limit: Option<f64>,
140 as_of: Option<&'a str>,
141}
142
143pub(crate) async fn run(args: TriggerReplayArgs) -> Result<(), String> {
144 let (workspace_root, event_log) = workspace_root_and_event_log()?;
145 let steering = replay_steering_from_args(&args)?;
146
147 if args.where_expr.is_none() {
148 let event_id = args
149 .event_id
150 .as_deref()
151 .ok_or_else(|| "missing trigger event id".to_string())?;
152 let report = replay_report_for_event_log(
153 event_log,
154 &workspace_root,
155 event_id,
156 args.as_of.as_deref(),
157 args.diff,
158 steering.as_ref(),
159 )
160 .await?;
161
162 println!(
163 "{}",
164 serde_json::to_string_pretty(&report)
165 .map_err(|error| format!("failed to encode replay report: {error}"))?
166 );
167 return Ok(());
168 }
169
170 if steering.is_some() {
171 return Err("--steer-from is only supported for single-event replay".to_string());
172 }
173
174 install_trigger_runtime(&workspace_root).await?;
175 let as_of = args.as_of.as_deref().map(parse_timestamp).transpose()?;
176 let where_expr = args.where_expr.as_deref().unwrap_or_default();
177 let (targets, normalized_filter) = load_bulk_targets(&event_log, where_expr, as_of).await?;
178 let report = replay_bulk_targets(
179 &event_log,
180 &workspace_root,
181 targets,
182 normalized_filter,
183 BulkReplayOptions {
184 diff: args.diff,
185 dry_run: args.dry_run,
186 progress: args.progress,
187 rate_limit: args.rate_limit,
188 as_of: args.as_of.as_deref(),
189 },
190 )
191 .await?;
192 println!(
193 "{}",
194 serde_json::to_string_pretty(&report)
195 .map_err(|error| format!("failed to encode replay report: {error}"))?
196 );
197 Ok(())
198}
199
200pub async fn replay_bulk_in_process(
204 event_log: Arc<AnyEventLog>,
205 workspace_root: &Path,
206 where_expr: &str,
207 diff: bool,
208 dry_run: bool,
209 rate_limit: Option<f64>,
210 as_of: Option<&str>,
211) -> Result<JsonValue, String> {
212 install_trigger_runtime(workspace_root).await?;
213 let as_of_dt = as_of.map(parse_timestamp).transpose()?;
214 let (targets, normalized_filter) = load_bulk_targets(&event_log, where_expr, as_of_dt).await?;
215 let report = replay_bulk_targets(
216 &event_log,
217 workspace_root,
218 targets,
219 normalized_filter,
220 BulkReplayOptions {
221 diff,
222 dry_run,
223 progress: false,
224 rate_limit,
225 as_of,
226 },
227 )
228 .await?;
229 serde_json::to_value(report).map_err(|error| format!("failed to encode replay report: {error}"))
230}
231
232pub async fn replay_report_for_event_log(
233 event_log: Arc<AnyEventLog>,
234 workspace_root: &Path,
235 event_id: &str,
236 as_of: Option<&str>,
237 diff: bool,
238 steering: Option<&ReplaySteering>,
239) -> Result<TriggerReplayReport, String> {
240 let recorded = load_recorded_event(&event_log, event_id).await?;
241 replay_report_for_record(event_log, workspace_root, recorded, as_of, diff, steering).await
242}
243
244async fn replay_report_for_record(
245 event_log: Arc<AnyEventLog>,
246 workspace_root: &Path,
247 recorded: TriggerEventRecord,
248 as_of: Option<&str>,
249 diff: bool,
250 steering: Option<&ReplaySteering>,
251) -> Result<TriggerReplayReport, String> {
252 let mut vm = build_replay_vm(workspace_root);
253 let extensions = package::load_runtime_extensions(workspace_root);
254 package::install_runtime_extensions(&extensions);
255 package::install_manifest_triggers(&mut vm, &extensions)
256 .await
257 .map_err(|error| format!("failed to install manifest triggers: {error}"))?;
258
259 let original = if diff {
260 Some(load_original_outcome(&event_log, &recorded).await?)
261 } else {
262 None
263 };
264 let as_of = as_of.map(parse_timestamp).transpose()?;
265 let binding = resolve_binding(&recorded, as_of)?;
266 let steering_from_decision = match steering {
267 Some(steering) => Some(
268 resolve_steering_from_decision(&event_log, &recorded, &binding, &steering.step).await?,
269 ),
270 None => None,
271 };
272
273 append_replay_record(&event_log, &binding, &recorded.event).await?;
274 let dispatcher = harn_vm::Dispatcher::with_event_log(vm, event_log.clone());
275 let replay = dispatcher
276 .dispatch_replay(
277 &binding,
278 recorded.event.clone(),
279 recorded.event.id.0.clone(),
280 )
281 .await
282 .map_err(|error| format!("trigger replay failed: {error}"))?;
283 let replay_summary = summarize_dispatch_outcome(&replay);
284 let correction = match (steering, steering_from_decision) {
285 (Some(steering), Some(from_decision)) => Some(
286 append_replay_correction(
287 &event_log,
288 &recorded,
289 &binding,
290 &replay_summary,
291 steering,
292 from_decision,
293 )
294 .await?,
295 ),
296 _ => None,
297 };
298
299 let drift = original
300 .as_ref()
301 .map(|original| diff_outcomes(original, &replay_summary));
302 Ok(TriggerReplayReport {
303 event_id: recorded.event.id.0,
304 binding_id: binding.id.as_str().to_string(),
305 binding_version: binding.version,
306 as_of: as_of.map(format_timestamp),
307 replay: replay_summary,
308 original,
309 drift,
310 correction,
311 })
312}
313
314async fn replay_bulk_targets(
315 event_log: &Arc<AnyEventLog>,
316 workspace_root: &Path,
317 targets: Vec<BulkTriggerTarget>,
318 normalized_filter: String,
319 options: BulkReplayOptions<'_>,
320) -> Result<BulkReplayReport, String> {
321 let matched_count = targets.len();
322 let mut items = Vec::new();
323 let mut executed_count = 0;
324 let mut skipped_count = 0;
325 let mut limiter = RateLimiter::new(options.rate_limit);
326 let mut progress_reporter = ProgressReporter::new(options.progress, "replay", matched_count);
327
328 for target in &targets {
329 if options.dry_run {
330 skipped_count += 1;
331 progress_reporter.update("dry_run");
332 items.push(BulkReplayItem {
333 event_id: target.event_id.clone(),
334 binding_id: target.binding_id.clone(),
335 binding_version: target.binding_version,
336 binding_key: target.binding_key.clone(),
337 latest_status: target.latest_status.clone(),
338 status: "dry_run".to_string(),
339 report: None,
340 });
341 continue;
342 }
343
344 limiter.wait().await;
345 let report = replay_report_for_record(
346 event_log.clone(),
347 workspace_root,
348 target.record.clone(),
349 options.as_of,
350 options.diff,
351 None,
352 )
353 .await?;
354 executed_count += 1;
355 progress_reporter.update(report.replay.status.as_str());
356 items.push(BulkReplayItem {
357 event_id: target.event_id.clone(),
358 binding_id: target.binding_id.clone(),
359 binding_version: target.binding_version,
360 binding_key: target.binding_key.clone(),
361 latest_status: target.latest_status.clone(),
362 status: report.replay.status.clone(),
363 report: Some(report),
364 });
365 }
366
367 let audit = build_operation_audit(
368 "replay",
369 options.dry_run,
370 Some(normalized_filter.clone()),
371 options.rate_limit,
372 matched_count,
373 executed_count,
374 skipped_count,
375 &targets,
376 );
377 let audit_id = append_replay_audit(event_log, &audit).await?;
378
379 Ok(BulkReplayReport {
380 operation: "replay".to_string(),
381 dry_run: options.dry_run,
382 filter: normalized_filter,
383 matched_count,
384 executed_count,
385 skipped_count,
386 audit_id: Some(audit_id),
387 items,
388 })
389}
390
391async fn append_replay_audit(
392 event_log: &Arc<AnyEventLog>,
393 audit: &crate::commands::trigger::ops::TriggerOperationAuditEntry,
394) -> Result<String, String> {
395 crate::commands::trigger::ops::append_operation_audit(event_log, audit).await?;
396 Ok(audit.id.clone())
397}
398
399pub fn build_replay_vm(workspace_root: &Path) -> harn_vm::Vm {
400 let mut vm = harn_vm::Vm::new();
401 harn_vm::register_vm_stdlib(&mut vm);
402 crate::install_default_hostlib(&mut vm);
403 harn_vm::register_store_builtins(&mut vm, workspace_root);
404 harn_vm::register_metadata_builtins(&mut vm, workspace_root);
405 harn_vm::register_checkpoint_builtins(&mut vm, workspace_root, "trigger-replay");
406 vm.set_project_root(workspace_root);
407 vm.set_source_dir(workspace_root);
408 vm
409}
410
411fn replay_steering_from_args(args: &TriggerReplayArgs) -> Result<Option<ReplaySteering>, String> {
412 let Some(step) = args.steer_from.as_ref() else {
413 return Ok(None);
414 };
415 let raw_to_decision = args
416 .to_decision
417 .as_deref()
418 .ok_or_else(|| "--steer-from requires --to-decision".to_string())?;
419 ReplaySteering::from_cli_decision(
420 step.clone(),
421 raw_to_decision,
422 args.reason.clone(),
423 args.applied_by.clone(),
424 args.scope.as_deref(),
425 )
426 .map(Some)
427}
428
429fn default_correction_reason() -> String {
430 "manual replay steering".to_string()
431}
432
433fn default_correction_applied_by() -> String {
434 std::env::var("HARN_APPLIED_BY").unwrap_or_else(|_| "operator".to_string())
435}
436
437fn parse_decision_value(raw: &str) -> JsonValue {
438 serde_json::from_str(raw).unwrap_or_else(|_| JsonValue::String(raw.to_string()))
439}
440
441async fn resolve_steering_from_decision(
442 event_log: &Arc<AnyEventLog>,
443 recorded: &TriggerEventRecord,
444 binding: &harn_vm::triggers::registry::TriggerBinding,
445 step: &str,
446) -> Result<JsonValue, String> {
447 match step {
448 "event" | "trigger" => serde_json::to_value(&recorded.event)
449 .map_err(|error| format!("failed to encode trigger event for correction: {error}")),
450 "outcome" | "dispatch" | "terminal" => serde_json::to_value(
451 load_original_outcome(event_log, recorded).await?,
452 )
453 .map_err(|error| {
454 format!("failed to encode original dispatch outcome for correction: {error}")
455 }),
456 other => {
457 let binding_key = binding.binding_key();
458 load_action_graph_node(event_log, &recorded.event.id.0, &binding_key, other)
459 .await?
460 .ok_or_else(|| {
461 format!(
462 "unknown replay step '{other}'; expected event, outcome, or an action graph node id"
463 )
464 })
465 }
466 }
467}
468
469async fn load_action_graph_node(
470 event_log: &Arc<AnyEventLog>,
471 event_id: &str,
472 binding_key: &str,
473 step: &str,
474) -> Result<Option<JsonValue>, String> {
475 let topic = Topic::new(ACTION_GRAPH_TOPIC)
476 .map_err(|error| format!("invalid action graph topic: {error}"))?;
477 let events = event_log
478 .read_range(&topic, None, usize::MAX)
479 .await
480 .map_err(|error| format!("failed to read action graph updates: {error}"))?;
481 for (_, event) in events {
482 let context = event.payload.get("context");
483 if context
484 .and_then(|value| value.get("event_id"))
485 .and_then(|value| value.as_str())
486 .is_some_and(|candidate| candidate != event_id)
487 {
488 continue;
489 }
490 if context
491 .and_then(|value| value.get("binding_key"))
492 .and_then(|value| value.as_str())
493 .is_some_and(|candidate| candidate != binding_key)
494 {
495 continue;
496 }
497 let Some(nodes) = event.payload["observability"]["action_graph_nodes"].as_array() else {
498 continue;
499 };
500 if let Some(node) = nodes.iter().find(|node| {
501 node.get("id").and_then(|value| value.as_str()) == Some(step)
502 || node.get("node_id").and_then(|value| value.as_str()) == Some(step)
503 }) {
504 return Ok(Some(node.clone()));
505 }
506 }
507 Ok(None)
508}
509
510async fn append_replay_correction(
511 event_log: &Arc<AnyEventLog>,
512 recorded: &TriggerEventRecord,
513 binding: &harn_vm::triggers::registry::TriggerBinding,
514 replay: &DispatchOutcomeSummary,
515 steering: &ReplaySteering,
516 from_decision: JsonValue,
517) -> Result<CorrectionRecord, String> {
518 let mut record = CorrectionRecord::new(
519 from_decision,
520 steering.to_decision.clone(),
521 steering.reason.clone(),
522 steering.applied_by.clone(),
523 steering.scope,
524 );
525 record.actor_id = Some(binding.id.as_str().to_string());
526 record.action = Some(format!(
527 "{}.{}",
528 recorded.event.provider.as_str(),
529 recorded.event.kind
530 ));
531 record.trace_id = Some(recorded.event.trace_id.0.clone());
532 record.step = Some(steering.step.clone());
533 record.metadata.insert(
534 "event_id".to_string(),
535 serde_json::json!(recorded.event.id.0),
536 );
537 record.metadata.insert(
538 "binding_key".to_string(),
539 serde_json::json!(binding.binding_key()),
540 );
541 record.metadata.insert(
542 "binding_version".to_string(),
543 serde_json::json!(binding.version),
544 );
545 record.metadata.insert(
546 "replay_status".to_string(),
547 serde_json::json!(replay.status),
548 );
549 append_correction_record(event_log, &record)
550 .await
551 .map_err(|error| format!("failed to append correction record: {error}"))
552}
553
554pub(crate) fn parse_timestamp(raw: &str) -> Result<OffsetDateTime, String> {
555 if let Ok(parsed) = OffsetDateTime::parse(raw, &Rfc3339) {
556 return Ok(parsed);
557 }
558 if let Ok(unix) = raw.parse::<i64>() {
559 let parsed = if raw.len() > 10 {
560 OffsetDateTime::from_unix_timestamp_nanos(unix as i128 * 1_000_000)
561 } else {
562 OffsetDateTime::from_unix_timestamp(unix)
563 };
564 return parsed.map_err(|error| format!("invalid --as-of timestamp '{raw}': {error}"));
565 }
566 Err(format!(
567 "invalid --as-of timestamp '{raw}': expected RFC3339 or unix seconds/milliseconds"
568 ))
569}
570
571use crate::format::format_timestamp_rfc3339 as format_timestamp;
572
573async fn load_recorded_event(
574 event_log: &Arc<AnyEventLog>,
575 event_id: &str,
576) -> Result<TriggerEventRecord, String> {
577 let topic = Topic::new(TRIGGER_EVENTS_TOPIC)
578 .map_err(|error| format!("invalid trigger events topic: {error}"))?;
579 let events = event_log
580 .read_range(&topic, None, usize::MAX)
581 .await
582 .map_err(|error| format!("failed to read trigger events: {error}"))?;
583
584 let mut replay_match = None;
585 for (_, event) in events {
586 let Ok(record) = serde_json::from_value::<TriggerEventRecord>(event.payload) else {
587 continue;
588 };
589 if record.event.id.0 != event_id {
590 continue;
591 }
592 if record.replay_of_event_id.is_none() {
593 return Ok(record);
594 }
595 replay_match.get_or_insert(record);
596 }
597
598 if let Some(record) = replay_match {
599 return Ok(record);
600 }
601
602 load_ingested_event(event_log, event_id).await
603}
604
605async fn load_ingested_event(
606 event_log: &Arc<AnyEventLog>,
607 event_id: &str,
608) -> Result<TriggerEventRecord, String> {
609 let envelopes_topic = Topic::new(harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC)
610 .map_err(|error| format!("invalid trigger inbox topic: {error}"))?;
611 let legacy_topic = Topic::new(harn_vm::TRIGGER_INBOX_LEGACY_TOPIC)
612 .map_err(|error| format!("invalid trigger inbox legacy topic: {error}"))?;
613 let mut envelopes = event_log
614 .read_range(&envelopes_topic, None, usize::MAX)
615 .await
616 .map_err(|error| format!("failed to read trigger inbox envelopes: {error}"))?;
617 let legacy = event_log
618 .read_range(&legacy_topic, None, usize::MAX)
619 .await
620 .map_err(|error| format!("failed to read legacy trigger inbox envelopes: {error}"))?;
621 envelopes.extend(legacy);
622 for (_, event) in envelopes {
623 if event.kind != "event_ingested" {
624 continue;
625 }
626 let Ok(envelope) =
627 serde_json::from_value::<harn_vm::triggers::dispatcher::InboxEnvelope>(event.payload)
628 else {
629 continue;
630 };
631 let (Some(binding_id), Some(binding_version)) =
632 (envelope.trigger_id, envelope.binding_version)
633 else {
634 continue;
635 };
636 if envelope.event.id.0 != event_id {
637 continue;
638 }
639 return Ok(TriggerEventRecord {
640 binding_id,
641 binding_version,
642 replay_of_event_id: None,
643 event: envelope.event,
644 });
645 }
646
647 Err(format!("unknown trigger event id '{event_id}'"))
648}
649
650fn resolve_binding(
651 recorded: &TriggerEventRecord,
652 as_of: Option<OffsetDateTime>,
653) -> Result<Arc<harn_vm::triggers::registry::TriggerBinding>, String> {
654 if let Some(as_of) = as_of {
655 return harn_vm::resolve_trigger_binding_as_of(&recorded.binding_id, as_of).map_err(
656 |error| {
657 format!(
658 "failed to resolve binding '{}' as of {}: {}",
659 recorded.binding_id,
660 format_timestamp(as_of),
661 error
662 )
663 },
664 );
665 }
666
667 harn_vm::resolve_live_or_as_of(
668 &recorded.binding_id,
669 harn_vm::RecordedTriggerBinding {
670 version: recorded.binding_version,
671 received_at: recorded.event.received_at,
672 },
673 )
674 .map_err(|error| {
675 format!(
676 "failed to resolve recorded binding '{}@v{}' for replay: {}",
677 recorded.binding_id, recorded.binding_version, error
678 )
679 })
680}
681
682async fn append_replay_record(
683 event_log: &Arc<AnyEventLog>,
684 binding: &harn_vm::triggers::registry::TriggerBinding,
685 event: &harn_vm::TriggerEvent,
686) -> Result<(), String> {
687 let topic = Topic::new(TRIGGER_EVENTS_TOPIC)
688 .map_err(|error| format!("invalid trigger events topic: {error}"))?;
689 event_log
690 .append(
691 &topic,
692 LogEvent::new(
693 "trigger_event",
694 serde_json::to_value(TriggerEventRecord {
695 binding_id: binding.id.as_str().to_string(),
696 binding_version: binding.version,
697 replay_of_event_id: Some(event.id.0.clone()),
698 event: event.clone(),
699 })
700 .unwrap_or(JsonValue::Null),
701 ),
702 )
703 .await
704 .map(|_| ())
705 .map_err(|error| format!("failed to append replay record: {error}"))
706}
707
708async fn load_original_outcome(
709 event_log: &Arc<AnyEventLog>,
710 recorded: &TriggerEventRecord,
711) -> Result<DispatchOutcomeSummary, String> {
712 let binding_key = format!("{}@v{}", recorded.binding_id, recorded.binding_version);
713 if let Some(outcome) =
714 load_original_terminal_outcome(event_log, &recorded.event.id.0, &binding_key).await?
715 {
716 return Ok(outcome);
717 }
718
719 load_skipped_outcome(event_log, &recorded.event.id.0, &binding_key)
720 .await?
721 .ok_or_else(|| {
722 format!(
723 "no stored original outcome found for '{}@v{}' event '{}'",
724 recorded.binding_id, recorded.binding_version, recorded.event.id.0
725 )
726 })
727}
728
729async fn load_original_terminal_outcome(
730 event_log: &Arc<AnyEventLog>,
731 event_id: &str,
732 binding_key: &str,
733) -> Result<Option<DispatchOutcomeSummary>, String> {
734 let outbox_topic = Topic::new(TRIGGER_OUTBOX_TOPIC)
735 .map_err(|error| format!("invalid trigger outbox topic: {error}"))?;
736 let dlq_topic = Topic::new(TRIGGER_DLQ_TOPIC)
737 .map_err(|error| format!("invalid trigger dlq topic: {error}"))?;
738
739 let outbox_events = event_log
740 .read_range(&outbox_topic, None, usize::MAX)
741 .await
742 .map_err(|error| format!("failed to read trigger outbox: {error}"))?;
743 let dlq_events = event_log
744 .read_range(&dlq_topic, None, usize::MAX)
745 .await
746 .map_err(|error| format!("failed to read trigger dlq: {error}"))?;
747
748 let mut success = None;
749 let mut failure = None;
750 for (_, event) in outbox_events {
751 if !matches_original_dispatch(&event, event_id, binding_key) {
752 continue;
753 }
754 let attempt = header_u32(&event, "attempt").unwrap_or(0);
755 let handler_kind = header_text(&event, "handler_kind").unwrap_or_default();
756 let target_uri = event
757 .payload
758 .get("target_uri")
759 .cloned()
760 .and_then(|value| value.as_str().map(str::to_string));
761 match event.kind.as_str() {
762 "dispatch_succeeded" => {
763 success = Some(DispatchOutcomeSummary {
764 status: "succeeded".to_string(),
765 attempt_count: attempt,
766 handler_kind,
767 target_uri,
768 result: event.payload.get("result").cloned(),
769 error: None,
770 });
771 }
772 "dispatch_failed" => {
773 let error = event
774 .payload
775 .get("error")
776 .and_then(|value| value.as_str())
777 .map(str::to_string);
778 failure = Some(DispatchOutcomeSummary {
779 status: failure_status(error.as_deref()),
780 attempt_count: attempt,
781 handler_kind,
782 target_uri,
783 result: None,
784 error,
785 });
786 }
787 _ => {}
788 }
789 }
790
791 for (_, event) in dlq_events {
792 if !matches_original_dispatch(&event, event_id, binding_key) || event.kind != "dlq_moved" {
793 continue;
794 }
795 let attempt_count = event
796 .payload
797 .get("attempt_count")
798 .and_then(|value| value.as_u64())
799 .unwrap_or(0) as u32;
800 return Ok(Some(DispatchOutcomeSummary {
801 status: "dlq".to_string(),
802 attempt_count,
803 handler_kind: header_text(&event, "handler_kind").unwrap_or_default(),
804 target_uri: None,
805 result: None,
806 error: event
807 .payload
808 .get("final_error")
809 .and_then(|value| value.as_str())
810 .map(str::to_string),
811 }));
812 }
813
814 Ok(success.or(failure))
815}
816
817async fn load_skipped_outcome(
818 event_log: &Arc<AnyEventLog>,
819 event_id: &str,
820 binding_key: &str,
821) -> Result<Option<DispatchOutcomeSummary>, String> {
822 let topic = Topic::new(ACTION_GRAPH_TOPIC)
823 .map_err(|error| format!("invalid action graph topic: {error}"))?;
824 let events = event_log
825 .read_range(&topic, None, usize::MAX)
826 .await
827 .map_err(|error| format!("failed to read action graph updates: {error}"))?;
828
829 for (_, event) in events {
830 let Some(context) = event.payload.get("context") else {
831 continue;
832 };
833 if context.get("event_id").and_then(|value| value.as_str()) != Some(event_id) {
834 continue;
835 }
836 if context.get("binding_key").and_then(|value| value.as_str()) != Some(binding_key) {
837 continue;
838 }
839 if context
840 .get("replay_of_event_id")
841 .and_then(|value| value.as_str())
842 .is_some()
843 {
844 continue;
845 }
846 let Some(nodes) = event.payload["observability"]["action_graph_nodes"].as_array() else {
847 continue;
848 };
849 let predicate = nodes.iter().find(|node| {
850 node.get("kind").and_then(|value| value.as_str()) == Some("predicate")
851 && node.get("outcome").and_then(|value| value.as_str()) == Some("false")
852 });
853 if let Some(predicate) = predicate {
854 return Ok(Some(DispatchOutcomeSummary {
855 status: "skipped".to_string(),
856 attempt_count: 0,
857 handler_kind: String::new(),
858 target_uri: None,
859 result: Some(json!({
860 "skipped": true,
861 "predicate": predicate.get("label").cloned().unwrap_or(JsonValue::Null),
862 })),
863 error: None,
864 }));
865 }
866 }
867
868 Ok(None)
869}
870
871fn matches_original_dispatch(event: &LogEvent, event_id: &str, binding_key: &str) -> bool {
872 header_text(event, "event_id") == Some(event_id.to_string())
873 && header_text(event, "binding_key") == Some(binding_key.to_string())
874 && header_text(event, "replay_of_event_id").is_none()
875}
876
877fn header_text(event: &LogEvent, key: &str) -> Option<String> {
878 event.headers.get(key).cloned()
879}
880
881fn header_u32(event: &LogEvent, key: &str) -> Option<u32> {
882 event.headers.get(key).and_then(|value| value.parse().ok())
883}
884
885fn failure_status(error: Option<&str>) -> String {
886 if error.is_some_and(|error| error.contains("cancelled")) {
887 "cancelled".to_string()
888 } else {
889 "failed".to_string()
890 }
891}
892
893fn summarize_dispatch_outcome(outcome: &harn_vm::DispatchOutcome) -> DispatchOutcomeSummary {
894 DispatchOutcomeSummary {
895 status: match outcome.status {
896 harn_vm::DispatchStatus::Succeeded => "succeeded".to_string(),
897 harn_vm::DispatchStatus::Failed => "failed".to_string(),
898 harn_vm::DispatchStatus::Dlq => "dlq".to_string(),
899 harn_vm::DispatchStatus::Skipped => "skipped".to_string(),
900 harn_vm::DispatchStatus::Waiting => "waiting".to_string(),
901 harn_vm::DispatchStatus::Cancelled => "cancelled".to_string(),
902 },
903 attempt_count: outcome.attempt_count,
904 handler_kind: outcome.handler_kind.clone(),
905 target_uri: Some(outcome.target_uri.clone()),
906 result: outcome.result.clone(),
907 error: outcome.error.clone(),
908 }
909}
910
911fn diff_outcomes(
912 original: &DispatchOutcomeSummary,
913 replayed: &DispatchOutcomeSummary,
914) -> DriftReport {
915 let original = serde_json::to_value(original).unwrap_or(JsonValue::Null);
916 let replayed = serde_json::to_value(replayed).unwrap_or(JsonValue::Null);
917 let mut fields = BTreeMap::new();
918
919 let original = original.as_object().cloned().unwrap_or_default();
920 let replayed = replayed.as_object().cloned().unwrap_or_default();
921 let mut keys = original.keys().cloned().collect::<Vec<_>>();
922 for key in replayed.keys() {
923 if !keys.iter().any(|existing| existing == key) {
924 keys.push(key.clone());
925 }
926 }
927 keys.sort();
928 keys.dedup();
929
930 for key in keys {
931 let left = original.get(&key).cloned().unwrap_or(JsonValue::Null);
932 let right = replayed.get(&key).cloned().unwrap_or(JsonValue::Null);
933 if left != right {
934 fields.insert(
935 key,
936 DriftField {
937 original: left,
938 replayed: right,
939 },
940 );
941 }
942 }
943
944 DriftReport {
945 changed: !fields.is_empty(),
946 fields,
947 }
948}
949
950#[cfg(test)]
951#[allow(clippy::await_holding_lock)]
956mod tests {
957 use std::collections::BTreeMap;
958 use std::fs;
959 use std::path::Path;
960 use std::rc::Rc;
961 use std::sync::Arc;
962
963 use harn_vm::event_log::{
964 install_default_for_base_dir, AnyEventLog, EventLog, LogEvent, Topic,
965 };
966 use harn_vm::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
967 use harn_vm::triggers::event::{CronEventPayload, KnownProviderPayload};
968 use time::OffsetDateTime;
969
970 use super::{
971 append_replay_record, build_replay_vm, load_recorded_event, replay_report_for_record,
972 resolve_binding, summarize_dispatch_outcome, ReplaySteering, TriggerEventRecord,
973 TRIGGER_EVENTS_TOPIC,
974 };
975 use crate::package;
976
977 const TEST_TRIGGER_ID: &str = "replay-cron";
978
979 #[tokio::test(flavor = "current_thread")]
980 async fn replay_falls_back_to_recorded_timestamp_when_version_lookup_is_stale() {
981 let _guard = crate::tests::common::harn_state_lock::lock_harn_state();
982 harn_vm::reset_thread_local_state();
983 let sink = Rc::new(CollectorSink::new());
984 clear_event_sinks();
985 add_event_sink(sink.clone());
986
987 let tempdir = tempfile::tempdir().expect("tempdir");
988 let workspace_root = tempdir.path();
989 let event_log = install_default_for_base_dir(workspace_root).expect("install event log");
990
991 install_local_manifest(workspace_root, "on_tick_v1");
992 install_workspace_manifest(workspace_root).await;
993 install_local_manifest(workspace_root, "on_tick_v2");
994 install_workspace_manifest(workspace_root).await;
995 install_local_manifest(workspace_root, "on_tick_v3");
996 install_workspace_manifest(workspace_root).await;
997
998 let current = harn_vm::resolve_live_trigger_binding(TEST_TRIGGER_ID, None)
999 .expect("resolve active binding");
1000 assert_eq!(current.version, 3);
1001 assert!(matches!(
1002 harn_vm::resolve_live_trigger_binding(TEST_TRIGGER_ID, Some(1)),
1003 Err(harn_vm::TriggerRegistryError::UnknownBindingVersion { .. })
1004 ));
1005
1006 append_trigger_event(
1007 &event_log,
1008 TriggerEventRecord {
1009 binding_id: TEST_TRIGGER_ID.to_string(),
1010 binding_version: 1,
1011 replay_of_event_id: None,
1012 event: recorded_cron_event("evt-stale", OffsetDateTime::now_utc()),
1013 },
1014 )
1015 .await;
1016
1017 let recorded = load_recorded_event(&event_log, "evt-stale")
1018 .await
1019 .expect("load recorded event");
1020 let binding = resolve_binding(&recorded, None).expect("resolve fallback binding");
1021 append_replay_record(&event_log, &binding, &recorded.event)
1022 .await
1023 .expect("append replay record");
1024
1025 let dispatcher =
1026 harn_vm::Dispatcher::with_event_log(build_replay_vm(workspace_root), event_log.clone());
1027 let outcome = dispatcher
1028 .dispatch_replay(
1029 &binding,
1030 recorded.event.clone(),
1031 recorded.event.id.0.clone(),
1032 )
1033 .await
1034 .expect("dispatch replay succeeds");
1035 let replay = summarize_dispatch_outcome(&outcome);
1036 assert_eq!(replay.status, "succeeded");
1037
1038 let topic = Topic::new(TRIGGER_EVENTS_TOPIC).expect("valid trigger events topic");
1039 let records: Vec<TriggerEventRecord> = event_log
1040 .read_range(&topic, None, usize::MAX)
1041 .await
1042 .expect("read trigger events")
1043 .into_iter()
1044 .map(|(_, event)| serde_json::from_value(event.payload).expect("decode trigger event"))
1045 .collect();
1046
1047 assert!(records.iter().any(|record| {
1048 record.replay_of_event_id.as_deref() == Some("evt-stale")
1049 && record.binding_id == TEST_TRIGGER_ID
1050 && record.binding_version == 3
1051 }));
1052 assert!(sink.logs.borrow().iter().any(|log| {
1053 log.level == EventLevel::Warn
1054 && log.category == "replay.binding_version_gc_fallback"
1055 && log.metadata.get("trigger_id") == Some(&serde_json::json!(TEST_TRIGGER_ID))
1056 && log.metadata.get("recorded_version") == Some(&serde_json::json!(1))
1057 && log.metadata.get("resolved_version") == Some(&serde_json::json!(3))
1058 }));
1059
1060 harn_vm::reset_thread_local_state();
1061 }
1062
1063 #[tokio::test(flavor = "current_thread")]
1064 async fn replay_steering_appends_correction_and_adapts_policy() {
1065 let _guard = crate::tests::common::harn_state_lock::lock_harn_state();
1066 harn_vm::reset_thread_local_state();
1067
1068 let tempdir = tempfile::tempdir().expect("tempdir");
1069 let workspace_root = tempdir.path();
1070 let event_log = install_default_for_base_dir(workspace_root).expect("install event log");
1071
1072 install_local_manifest(workspace_root, "on_tick_v1");
1073 append_trigger_event(
1074 &event_log,
1075 TriggerEventRecord {
1076 binding_id: TEST_TRIGGER_ID.to_string(),
1077 binding_version: 1,
1078 replay_of_event_id: None,
1079 event: recorded_cron_event("evt-steer", OffsetDateTime::now_utc()),
1080 },
1081 )
1082 .await;
1083
1084 let recorded = load_recorded_event(&event_log, "evt-steer")
1085 .await
1086 .expect("load recorded event");
1087 let steering = ReplaySteering {
1088 step: "event".to_string(),
1089 to_decision: serde_json::json!({"kind": "cron.steered"}),
1090 reason: "human corrected replay routing".to_string(),
1091 applied_by: "alice".to_string(),
1092 scope: harn_vm::CorrectionScope::ThisPersona,
1093 };
1094
1095 let report = replay_report_for_record(
1096 event_log.clone(),
1097 workspace_root,
1098 recorded,
1099 None,
1100 false,
1101 Some(&steering),
1102 )
1103 .await
1104 .expect("replay report");
1105
1106 let correction = report.correction.expect("correction record");
1107 assert_eq!(correction.actor_id.as_deref(), Some(TEST_TRIGGER_ID));
1108 assert_eq!(correction.step.as_deref(), Some("event"));
1109 assert_eq!(correction.applied_by, "alice");
1110 assert_eq!(correction.scope, harn_vm::CorrectionScope::ThisPersona);
1111
1112 let corrections = harn_vm::query_correction_records(
1113 &event_log,
1114 &harn_vm::CorrectionQueryFilters {
1115 actor_id: Some(TEST_TRIGGER_ID.to_string()),
1116 ..harn_vm::CorrectionQueryFilters::default()
1117 },
1118 )
1119 .await
1120 .expect("query corrections");
1121 assert_eq!(corrections.len(), 1);
1122
1123 let policy = harn_vm::policy_for_agent(&event_log, TEST_TRIGGER_ID)
1124 .await
1125 .expect("policy for agent");
1126 assert_eq!(policy.side_effect_level.as_deref(), Some("read_only"));
1127
1128 harn_vm::reset_thread_local_state();
1129 }
1130
1131 fn install_local_manifest(root: &Path, handler_name: &str) {
1132 std::fs::create_dir_all(root.join(".git")).expect("create .git");
1133 fs::write(
1134 root.join("harn.toml"),
1135 format!(
1136 r#"
1137[package]
1138name = "workspace"
1139
1140[exports]
1141handlers = "lib.harn"
1142
1143[[triggers]]
1144id = "{TEST_TRIGGER_ID}"
1145kind = "cron"
1146provider = "cron"
1147match = {{ events = ["cron.tick"] }}
1148schedule = "* * * * *"
1149timezone = "UTC"
1150handler = "handlers::{handler_name}"
1151"#
1152 ),
1153 )
1154 .expect("write manifest");
1155 fs::write(
1156 root.join("lib.harn"),
1157 format!(
1158 r#"
1159import "std/triggers"
1160
1161pub fn {handler_name}(event: TriggerEvent) -> string {{
1162 return event.kind
1163}}
1164"#
1165 ),
1166 )
1167 .expect("write lib");
1168 fs::write(root.join("main.harn"), "pipeline main() {}\n").expect("write main");
1169 }
1170
1171 async fn install_workspace_manifest(root: &Path) {
1172 let mut vm = super::build_replay_vm(root);
1173 let extensions = package::load_runtime_extensions(&root.join("main.harn"));
1174 package::install_manifest_triggers(&mut vm, &extensions)
1175 .await
1176 .expect("install manifest triggers");
1177 }
1178
1179 fn recorded_cron_event(event_id: &str, received_at: OffsetDateTime) -> harn_vm::TriggerEvent {
1180 harn_vm::TriggerEvent {
1181 id: harn_vm::TriggerEventId(event_id.to_string()),
1182 provider: harn_vm::ProviderId::from("cron"),
1183 kind: "cron.tick".to_string(),
1184 received_at,
1185 occurred_at: None,
1186 dedupe_key: format!("delivery-{event_id}"),
1187 trace_id: harn_vm::TraceId(format!("trace-{event_id}")),
1188 tenant_id: None,
1189 headers: BTreeMap::new(),
1190 batch: None,
1191 raw_body: None,
1192 provider_payload: harn_vm::ProviderPayload::Known(KnownProviderPayload::Cron(
1193 CronEventPayload {
1194 cron_id: Some("test-cron".to_string()),
1195 schedule: Some("* * * * *".to_string()),
1196 tick_at: received_at,
1197 raw: serde_json::json!({ "event_id": event_id }),
1198 },
1199 )),
1200 signature_status: harn_vm::SignatureStatus::Verified,
1201 dedupe_claimed: false,
1202 }
1203 }
1204
1205 async fn append_trigger_event(event_log: &Arc<AnyEventLog>, record: TriggerEventRecord) {
1206 let topic = Topic::new(TRIGGER_EVENTS_TOPIC).expect("valid trigger events topic");
1207 event_log
1208 .append(
1209 &topic,
1210 LogEvent::new(
1211 "trigger_event",
1212 serde_json::to_value(record).expect("encode trigger event"),
1213 ),
1214 )
1215 .await
1216 .expect("append trigger event");
1217 }
1218}