1use std::collections::{BTreeMap, BTreeSet};
8
9use serde::{Deserialize, Serialize};
10
11use crate::{
12 content::MissingContentPolicy,
13 domain::{
14 AgentError, AgentErrorKind, ContentRef, DedupeKey, EffectId, JournalCursor,
15 RetryClassification,
16 },
17 event::{EventCursor, EventStreamScope, cursor_compatible},
18 journal::{
19 JOURNAL_SCHEMA_VERSION, JournalRecord, JournalRecordKind, JournalRecordPayload,
20 PendingSideEffect, RunCheckpoint,
21 },
22 output_delivery::{
23 OutputDeliveryDedupeRecord, OutputDeliveryId, OutputDeliveryIntentRecord,
24 OutputDeliveryReconciliationRecord, OutputDeliveryRecord, OutputDeliveryResultRecord,
25 OutputDispatchStatus, ReplayRepairDecision, TerminalAppendStatus,
26 },
27};
28
29#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
30#[serde(rename_all = "snake_case")]
31pub enum ReplayMode {
34 AuditReplay,
36 ResumeReplay,
38 RepairReplay,
40}
41
42#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
43#[serde(rename_all = "snake_case")]
44pub enum ReplayStatus {
47 Complete,
49 RepairNeeded,
51}
52
53#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
54#[serde(rename_all = "snake_case")]
55pub enum ReplayRepairKind {
58 MissingContentRef,
60 UnsafePendingSideEffect,
62 NonIdempotentPendingSideEffect,
64 OutputDeliveryReconciliation,
66 CursorScopeMismatch,
68 CheckpointInvalid,
70 ReplayInvariantViolation,
72}
73
74#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
75pub struct ReplayRepairNeeded {
78 pub kind: ReplayRepairKind,
81 pub record_id: String,
83 pub journal_seq: u64,
85 pub reason: String,
87 pub retry: RetryClassification,
89}
90
91#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
92pub struct ReplayPendingSideEffect {
95 pub effect_id: EffectId,
97 pub intent_record_id: String,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub idempotency_key: Option<crate::domain::IdempotencyKey>,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub dedupe_key: Option<DedupeKey>,
107 pub unsafe_pending_reason: String,
110 pub retry_allowed: bool,
113}
114
115impl ReplayPendingSideEffect {
116 pub fn from_pending(pending: PendingSideEffect) -> Self {
120 let retry_allowed = pending.idempotency_key.is_some() || pending.dedupe_key.is_some();
121 Self {
122 effect_id: pending.effect_id,
123 intent_record_id: pending.intent_record_id,
124 idempotency_key: pending.idempotency_key,
125 dedupe_key: pending.dedupe_key,
126 unsafe_pending_reason: pending.unsafe_pending_reason,
127 retry_allowed,
128 }
129 }
130}
131
132#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
133pub struct ReplayResult {
136 pub mode: ReplayMode,
140 pub status: ReplayStatus,
142 pub resume_allowed: bool,
145 pub latest_journal_seq: u64,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 pub terminal_status: Option<String>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub next_loop_state: Option<String>,
155 #[serde(default, skip_serializing_if = "Vec::is_empty")]
156 pub unsafe_pending_side_effects: Vec<ReplayPendingSideEffect>,
159 #[serde(default, skip_serializing_if = "Vec::is_empty")]
160 pub missing_content_refs: Vec<ContentRef>,
163 #[serde(default, skip_serializing_if = "Vec::is_empty")]
164 pub repair_needed: Vec<ReplayRepairNeeded>,
167 #[serde(default, skip_serializing_if = "Vec::is_empty")]
168 pub output_delivery_repairs: Vec<OutputDeliveryReconciliationRecord>,
171 #[serde(skip_serializing_if = "Option::is_none")]
172 pub latest_checkpoint: Option<RunCheckpoint>,
175}
176
177#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
178#[serde(rename_all = "snake_case")]
179pub enum CursorCompatibility {
182 Compatible,
184 ScopeMismatch,
186}
187
188#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
189#[serde(rename_all = "snake_case")]
190pub enum DurableReplaySupport {
193 RunJournal,
195 HostArchiveRequired,
197}
198
199#[derive(Clone, Debug)]
200pub struct ReplayReducer {
203 mode: ReplayMode,
204 last_journal_seq: Option<u64>,
205 seen_records: BTreeMap<String, JournalRecord>,
206 available_content_refs: Option<BTreeSet<ContentRef>>,
207 missing_content_policy: MissingContentPolicy,
208 missing_content_refs: BTreeSet<ContentRef>,
209 repair_needed: Vec<ReplayRepairNeeded>,
210 unsafe_pending_side_effects: Vec<ReplayPendingSideEffect>,
211 pending_effects: BTreeMap<EffectId, ReplayPendingSideEffect>,
212 output_intents: BTreeMap<OutputDeliveryId, OutputIntentState>,
213 output_results: BTreeMap<OutputDeliveryId, OutputDeliveryResultRecord>,
214 output_dedupes: BTreeMap<DedupeKey, OutputDeliveryDedupeRecord>,
215 output_reconciliations: BTreeMap<OutputDeliveryId, OutputDeliveryReconciliationRecord>,
216 terminal_status: Option<String>,
217 latest_checkpoint: Option<RunCheckpoint>,
218}
219
220impl ReplayReducer {
221 pub fn new(mode: ReplayMode) -> Self {
225 Self {
226 mode,
227 last_journal_seq: None,
228 seen_records: BTreeMap::new(),
229 available_content_refs: None,
230 missing_content_policy: MissingContentPolicy::Fail,
231 missing_content_refs: BTreeSet::new(),
232 repair_needed: Vec::new(),
233 unsafe_pending_side_effects: Vec::new(),
234 pending_effects: BTreeMap::new(),
235 output_intents: BTreeMap::new(),
236 output_results: BTreeMap::new(),
237 output_dedupes: BTreeMap::new(),
238 output_reconciliations: BTreeMap::new(),
239 terminal_status: None,
240 latest_checkpoint: None,
241 }
242 }
243
244 pub fn with_available_content_refs(
248 mut self,
249 refs: impl IntoIterator<Item = ContentRef>,
250 ) -> Self {
251 self.available_content_refs = Some(refs.into_iter().collect());
252 self
253 }
254
255 pub fn with_missing_content_policy(mut self, policy: MissingContentPolicy) -> Self {
259 self.missing_content_policy = policy;
260 self
261 }
262
263 pub fn apply(&mut self, record: JournalRecord) -> Result<(), AgentError> {
267 if self
268 .seen_records
269 .get(&record.record_id)
270 .is_some_and(|seen| seen == &record && idempotent_duplicate_allowed(&record))
271 {
272 return Ok(());
273 }
274 self.validate_ordering(&record)?;
275 self.validate_not_after_terminal(&record)?;
276 self.observe_content_refs(&record.record_id, record.journal_seq, &record.content_refs);
277
278 match &record.payload {
279 JournalRecordPayload::Checkpoint(checkpoint) => {
280 checkpoint
281 .validate_against_latest_seq(record.journal_seq)
282 .inspect_err(|error| {
283 self.repair(
284 ReplayRepairKind::CheckpointInvalid,
285 &record.record_id,
286 record.journal_seq,
287 error.context().message.clone(),
288 RetryClassification::RepairNeeded,
289 );
290 })?;
291 self.observe_content_refs(
292 &record.record_id,
293 record.journal_seq,
294 &checkpoint.content_ref_manifest,
295 );
296 if checkpoint_is_newer(checkpoint, self.latest_checkpoint.as_ref()) {
297 self.latest_checkpoint = Some(checkpoint.clone());
298 }
299 }
300 JournalRecordPayload::Recovery(recovery) => {
301 for pending in recovery.unsafe_pending.iter().cloned() {
302 self.add_unsafe_pending(pending, &record.record_id, record.journal_seq);
303 }
304 }
305 JournalRecordPayload::EffectIntent(intent) => {
306 self.pending_effects.insert(
307 intent.effect_id.clone(),
308 ReplayPendingSideEffect {
309 effect_id: intent.effect_id.clone(),
310 intent_record_id: record.record_id.clone(),
311 idempotency_key: intent.idempotency_key.clone(),
312 dedupe_key: intent.dedupe_key.clone(),
313 unsafe_pending_reason: "effect intent has no terminal result in replay"
314 .to_string(),
315 retry_allowed: intent.idempotency_key.is_some()
316 || intent.dedupe_key.is_some(),
317 },
318 );
319 }
320 JournalRecordPayload::EffectResult(result) => {
321 self.pending_effects.remove(&result.effect_id);
322 }
323 JournalRecordPayload::OutputDelivery(output) => {
324 self.apply_output_record(output, &record);
325 }
326 JournalRecordPayload::RunLifecycle(lifecycle) => {
327 if is_terminal_lifecycle(&lifecycle.status) {
328 self.terminal_status = Some(lifecycle.status.clone());
329 }
330 }
331 JournalRecordPayload::TerminalResult(marker) => {
332 self.pending_effects.remove(&marker.effect_id);
333 self.terminal_status = Some(marker.terminal_status.clone());
334 }
335 _ => {}
336 }
337
338 self.last_journal_seq = Some(record.journal_seq);
339 self.seen_records.insert(record.record_id.clone(), record);
340 Ok(())
341 }
342
343 pub fn finish(mut self) -> Result<ReplayResult, AgentError> {
347 self.finish_pending_effects();
348 let output_delivery_repairs = self.finish_output_deliveries();
349 let repair_needed = self.repair_needed;
350 let missing_content_refs = self.missing_content_refs.into_iter().collect::<Vec<_>>();
351 let unsafe_pending_side_effects = self.unsafe_pending_side_effects;
352 let status = if repair_needed.is_empty()
353 && missing_content_refs.is_empty()
354 && unsafe_pending_side_effects
355 .iter()
356 .all(|pending| pending.retry_allowed)
357 {
358 ReplayStatus::Complete
359 } else {
360 ReplayStatus::RepairNeeded
361 };
362 let resume_allowed =
363 self.mode != ReplayMode::ResumeReplay || status == ReplayStatus::Complete;
364
365 Ok(ReplayResult {
366 mode: self.mode,
367 status,
368 resume_allowed,
369 latest_journal_seq: self.last_journal_seq.unwrap_or(0),
370 terminal_status: self.terminal_status,
371 next_loop_state: self
372 .latest_checkpoint
373 .as_ref()
374 .map(|checkpoint| checkpoint.loop_state.clone()),
375 unsafe_pending_side_effects,
376 missing_content_refs,
377 repair_needed,
378 output_delivery_repairs,
379 latest_checkpoint: self.latest_checkpoint,
380 })
381 }
382
383 fn validate_ordering(&mut self, record: &JournalRecord) -> Result<(), AgentError> {
384 if record.journal_schema_version != JOURNAL_SCHEMA_VERSION {
385 return Err(AgentError::new(
386 AgentErrorKind::RecoveryRepairNeeded,
387 RetryClassification::RepairNeeded,
388 "journal record schema version is not supported by replay reducer",
389 ));
390 }
391
392 if self.seen_records.contains_key(&record.record_id) {
393 return Err(AgentError::new(
394 AgentErrorKind::InvalidStateTransition,
395 RetryClassification::RepairNeeded,
396 "duplicate non-idempotent journal record during replay",
397 ));
398 }
399
400 if let Some(last_seq) = self.last_journal_seq {
401 if record.journal_seq <= last_seq {
402 return Err(AgentError::new(
403 AgentErrorKind::InvalidStateTransition,
404 RetryClassification::RepairNeeded,
405 "journal records must be strictly increasing during replay",
406 ));
407 }
408 }
409 Ok(())
410 }
411
412 fn validate_not_after_terminal(&self, record: &JournalRecord) -> Result<(), AgentError> {
413 if self.terminal_status.is_none()
414 || matches!(
415 record.record_kind,
416 JournalRecordKind::Checkpoint | JournalRecordKind::Recovery
417 )
418 {
419 return Ok(());
420 }
421 Err(AgentError::new(
422 AgentErrorKind::InvalidStateTransition,
423 RetryClassification::RepairNeeded,
424 "journal record appears after sealed terminal replay state",
425 ))
426 }
427
428 fn observe_content_refs(&mut self, record_id: &str, journal_seq: u64, refs: &[ContentRef]) {
429 let Some(available) = self.available_content_refs.as_ref() else {
430 return;
431 };
432 let missing = refs
433 .iter()
434 .filter(|content_ref| {
435 !available.contains(*content_ref)
436 && !self.missing_content_refs.contains(*content_ref)
437 })
438 .cloned()
439 .collect::<Vec<_>>();
440 for content_ref in missing {
441 self.missing_content_refs.insert(content_ref.clone());
442 if matches!(
443 self.missing_content_policy,
444 MissingContentPolicy::Fail
445 | MissingContentPolicy::RecoverableReplayGap
446 | MissingContentPolicy::RequestHostRepair
447 ) {
448 self.repair(
449 ReplayRepairKind::MissingContentRef,
450 record_id,
451 journal_seq,
452 format!("content ref {} is missing for replay", content_ref.as_str()),
453 RetryClassification::UserActionNeeded,
454 );
455 }
456 }
457 }
458
459 fn add_unsafe_pending(
460 &mut self,
461 pending: PendingSideEffect,
462 record_id: &str,
463 journal_seq: u64,
464 ) {
465 let pending = ReplayPendingSideEffect::from_pending(pending);
466 let repair_kind = if pending.retry_allowed {
467 ReplayRepairKind::UnsafePendingSideEffect
468 } else {
469 ReplayRepairKind::NonIdempotentPendingSideEffect
470 };
471 let reason = pending.unsafe_pending_reason.clone();
472 self.repair(
473 repair_kind,
474 record_id,
475 journal_seq,
476 reason,
477 RetryClassification::RepairNeeded,
478 );
479 self.unsafe_pending_side_effects.push(pending);
480 }
481
482 fn apply_output_record(&mut self, output: &OutputDeliveryRecord, record: &JournalRecord) {
483 match output {
484 OutputDeliveryRecord::Intent(intent) => {
485 self.output_intents.insert(
486 intent.delivery_id.clone(),
487 OutputIntentState {
488 record_id: record.record_id.clone(),
489 journal_seq: record.journal_seq,
490 intent: intent.clone(),
491 },
492 );
493 }
494 OutputDeliveryRecord::Result(result) => {
495 self.output_results
496 .insert(result.delivery_id.clone(), result.clone());
497 }
498 OutputDeliveryRecord::Dedupe(dedupe) => {
499 self.output_dedupes
500 .insert(dedupe.dedupe_key.clone(), dedupe.clone());
501 }
502 OutputDeliveryRecord::Reconciliation(reconciliation) => {
503 self.output_reconciliations
504 .insert(reconciliation.delivery_id.clone(), reconciliation.clone());
505 self.repair(
506 ReplayRepairKind::OutputDeliveryReconciliation,
507 &record.record_id,
508 record.journal_seq,
509 reconciliation.unsafe_pending_reason.clone(),
510 RetryClassification::RepairNeeded,
511 );
512 }
513 OutputDeliveryRecord::Event(_) => {}
514 }
515 }
516
517 fn finish_pending_effects(&mut self) {
518 let pending = self
519 .pending_effects
520 .values()
521 .cloned()
522 .collect::<Vec<ReplayPendingSideEffect>>();
523 for pending in pending {
524 let repair_kind = if pending.retry_allowed {
525 ReplayRepairKind::UnsafePendingSideEffect
526 } else {
527 ReplayRepairKind::NonIdempotentPendingSideEffect
528 };
529 self.repair(
530 repair_kind,
531 &pending.intent_record_id,
532 self.last_journal_seq.unwrap_or_default(),
533 pending.unsafe_pending_reason.clone(),
534 RetryClassification::RepairNeeded,
535 );
536 self.unsafe_pending_side_effects.push(pending);
537 }
538 }
539
540 fn finish_output_deliveries(&mut self) -> Vec<OutputDeliveryReconciliationRecord> {
541 let mut repairs = Vec::new();
542 let intents = self
543 .output_intents
544 .values()
545 .cloned()
546 .collect::<Vec<OutputIntentState>>();
547 for state in intents {
548 if self.output_results.contains_key(&state.intent.delivery_id) {
549 continue;
550 }
551 if let Some(reconciliation) = self
552 .output_reconciliations
553 .get(&state.intent.delivery_id)
554 .cloned()
555 {
556 repairs.push(reconciliation);
557 continue;
558 }
559 if let Some(dedupe) = self.output_dedupes.get(&state.intent.dedupe_key) {
560 repairs.push(reconciliation_from_dedupe(&state, dedupe));
561 continue;
562 }
563
564 let reconciliation = unsafe_output_reconciliation(&state);
565 self.repair(
566 ReplayRepairKind::OutputDeliveryReconciliation,
567 &state.record_id,
568 state.journal_seq,
569 reconciliation.unsafe_pending_reason.clone(),
570 RetryClassification::RepairNeeded,
571 );
572 repairs.push(reconciliation);
573 }
574 repairs
575 }
576
577 fn repair(
578 &mut self,
579 kind: ReplayRepairKind,
580 record_id: &str,
581 journal_seq: u64,
582 reason: impl Into<String>,
583 retry: RetryClassification,
584 ) {
585 self.repair_needed.push(ReplayRepairNeeded {
586 kind,
587 record_id: record_id.to_string(),
588 journal_seq,
589 reason: reason.into(),
590 retry,
591 });
592 }
593}
594
595pub fn check_cursor_compatibility(
599 requested_scope: &EventStreamScope,
600 cursor: Option<&EventCursor>,
601) -> CursorCompatibility {
602 match cursor_compatible(requested_scope, cursor) {
603 Ok(()) => CursorCompatibility::Compatible,
604 Err(_) => CursorCompatibility::ScopeMismatch,
605 }
606}
607
608pub fn durable_replay_support(scope: &EventStreamScope) -> DurableReplaySupport {
611 match scope {
612 EventStreamScope::Run(_) => DurableReplaySupport::RunJournal,
613 EventStreamScope::All | EventStreamScope::Agent(_) | EventStreamScope::Filter { .. } => {
614 DurableReplaySupport::HostArchiveRequired
615 }
616 }
617}
618
619#[derive(Clone, Debug)]
620struct OutputIntentState {
621 record_id: String,
622 journal_seq: u64,
623 intent: OutputDeliveryIntentRecord,
624}
625
626fn reconciliation_from_dedupe(
627 state: &OutputIntentState,
628 dedupe: &OutputDeliveryDedupeRecord,
629) -> OutputDeliveryReconciliationRecord {
630 OutputDeliveryReconciliationRecord {
631 delivery_id: state.intent.delivery_id.clone(),
632 intent_record_id: state.record_id.clone(),
633 side_effect_kind: crate::effect::EffectKind::OutputDelivery,
634 idempotency_key: state.intent.idempotency_key.clone(),
635 dedupe_key: state.intent.dedupe_key.clone(),
636 external_operation_id: dedupe.prior_external_operation_id.clone(),
637 terminal_status: dedupe.prior_terminal_status,
638 terminal_append_status: TerminalAppendStatus::NotAttempted,
639 reconciliation_adapter: Some(state.intent.sink_ref.clone()),
640 unsafe_pending_reason: "repair replay found completed dedupe proof".to_string(),
641 replay_decision: ReplayRepairDecision::CompletedByDedupeProof,
642 resend_allowed: false,
643 }
644}
645
646fn unsafe_output_reconciliation(state: &OutputIntentState) -> OutputDeliveryReconciliationRecord {
647 OutputDeliveryReconciliationRecord {
648 delivery_id: state.intent.delivery_id.clone(),
649 intent_record_id: state.record_id.clone(),
650 side_effect_kind: crate::effect::EffectKind::OutputDelivery,
651 idempotency_key: state.intent.idempotency_key.clone(),
652 dedupe_key: state.intent.dedupe_key.clone(),
653 external_operation_id: None,
654 terminal_status: OutputDispatchStatus::ReconciliationNeeded,
655 terminal_append_status: TerminalAppendStatus::NotAttempted,
656 reconciliation_adapter: Some(state.intent.sink_ref.clone()),
657 unsafe_pending_reason:
658 "repair replay cannot resend output delivery without completed dedupe proof".to_string(),
659 replay_decision: ReplayRepairDecision::UnsafePending,
660 resend_allowed: false,
661 }
662}
663
664fn checkpoint_is_newer(candidate: &RunCheckpoint, current: Option<&RunCheckpoint>) -> bool {
665 current.is_none_or(|current| {
666 (
667 candidate.covers_journal_seq,
668 candidate.checkpoint_seq,
669 candidate.created_at_millis,
670 ) > (
671 current.covers_journal_seq,
672 current.checkpoint_seq,
673 current.created_at_millis,
674 )
675 })
676}
677
678fn is_terminal_lifecycle(status: &str) -> bool {
679 matches!(
680 status,
681 "completed" | "failed" | "cancelled" | "run_completed" | "run_failed" | "run_cancelled"
682 )
683}
684
685fn idempotent_duplicate_allowed(record: &JournalRecord) -> bool {
686 record.idempotency_key.is_some()
687 || record.dedupe_key.is_some()
688 || matches!(
689 record.record_kind,
690 JournalRecordKind::Checkpoint | JournalRecordKind::Recovery
691 )
692}
693
694pub fn journal_cursor_for_seq(seq: u64) -> JournalCursor {
697 JournalCursor::new(format!("journal.{seq}"))
698}