Skip to main content

agent_sdk_core/application/
replay.rs

1//! Application-layer coordination over core primitives. Use these services to lower
2//! helpers, drive runs, validate output, coordinate tools, approvals, delivery,
3//! isolation, telemetry, and feature layers. Methods in this layer may call
4//! configured ports, mutate in-memory stores, append journals, or publish events as
5//! documented. This file contains the replay portion of that contract.
6//!
7use std::collections::{BTreeMap, BTreeSet};
8
9use serde::{Deserialize, Serialize};
10
11use crate::{
12    content::MissingContentPolicy,
13    domain::{
14        AgentError, AgentErrorKind, ContentRef, DedupeKey, EffectId, JournalCursor,
15        RetryClassification,
16    },
17    event::{EventCursor, EventStreamScope, cursor_compatible},
18    journal::{
19        JOURNAL_SCHEMA_VERSION, JournalRecord, JournalRecordKind, JournalRecordPayload,
20        PendingSideEffect, RunCheckpoint,
21    },
22    output_delivery::{
23        OutputDeliveryDedupeRecord, OutputDeliveryId, OutputDeliveryIntentRecord,
24        OutputDeliveryReconciliationRecord, OutputDeliveryRecord, OutputDeliveryResultRecord,
25        OutputDispatchStatus, ReplayRepairDecision, TerminalAppendStatus,
26    },
27};
28
29#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
30#[serde(rename_all = "snake_case")]
31/// Enumerates the finite replay mode cases.
32/// Serialized names are part of the SDK contract; update fixtures when variants change.
33pub enum ReplayMode {
34    /// Use this variant when the contract needs to represent audit replay; selecting it has no side effect by itself.
35    AuditReplay,
36    /// Use this variant when the contract needs to represent resume replay; selecting it has no side effect by itself.
37    ResumeReplay,
38    /// Use this variant when the contract needs to represent repair replay; selecting it has no side effect by itself.
39    RepairReplay,
40}
41
42#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
43#[serde(rename_all = "snake_case")]
44/// Enumerates the finite replay status cases.
45/// Serialized names are part of the SDK contract; update fixtures when variants change.
46pub enum ReplayStatus {
47    /// Use this variant when the contract needs to represent complete; selecting it has no side effect by itself.
48    Complete,
49    /// Use this variant when the contract needs to represent repair needed; selecting it has no side effect by itself.
50    RepairNeeded,
51}
52
53#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
54#[serde(rename_all = "snake_case")]
55/// Enumerates the finite replay repair kind cases.
56/// Serialized names are part of the SDK contract; update fixtures when variants change.
57pub enum ReplayRepairKind {
58    /// Use this variant when the contract needs to represent missing content ref; selecting it has no side effect by itself.
59    MissingContentRef,
60    /// Use this variant when the contract needs to represent unsafe pending side effect; selecting it has no side effect by itself.
61    UnsafePendingSideEffect,
62    /// Use this variant when the contract needs to represent non idempotent pending side effect; selecting it has no side effect by itself.
63    NonIdempotentPendingSideEffect,
64    /// Use this variant when the contract needs to represent output delivery reconciliation; selecting it has no side effect by itself.
65    OutputDeliveryReconciliation,
66    /// Use this variant when the contract needs to represent cursor scope mismatch; selecting it has no side effect by itself.
67    CursorScopeMismatch,
68    /// Use this variant when the contract needs to represent checkpoint invalid; selecting it has no side effect by itself.
69    CheckpointInvalid,
70    /// Use this variant when the contract needs to represent replay invariant violation; selecting it has no side effect by itself.
71    ReplayInvariantViolation,
72}
73
74#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
75/// Holds replay repair needed application-layer state or configuration.
76/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
77pub struct ReplayRepairNeeded {
78    /// Kind/category for this record, capability, event, or detected
79    /// resource.
80    pub kind: ReplayRepairKind,
81    /// Stable record id used for typed lineage, lookup, or dedupe.
82    pub record_id: String,
83    /// Journal seq used by this record or request.
84    pub journal_seq: u64,
85    /// Redacted explanation for a denial, failure, status, or package delta.
86    pub reason: String,
87    /// Retry used by this record or request.
88    pub retry: RetryClassification,
89}
90
91#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
92/// Holds replay pending side effect application-layer state or configuration.
93/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
94pub struct ReplayPendingSideEffect {
95    /// Stable effect id used for typed lineage, lookup, or dedupe.
96    pub effect_id: EffectId,
97    /// Stable intent record id used for typed lineage, lookup, or dedupe.
98    pub intent_record_id: String,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    /// Idempotency setting or key for deduping retries.
101    /// Use it to prevent duplicate side effects during replay or repair.
102    pub idempotency_key: Option<crate::domain::IdempotencyKey>,
103    #[serde(skip_serializing_if = "Option::is_none")]
104    /// Dedupe policy or key for a side-effecting operation.
105    /// Replay and repair use it to avoid sending or executing the same effect twice.
106    pub dedupe_key: Option<DedupeKey>,
107    /// Reason a pending side effect is unsafe to retry automatically.
108    /// Recovery uses it to require repair or reconciliation before continuing.
109    pub unsafe_pending_reason: String,
110    /// Allowlist for this policy or contract.
111    /// Validation uses it to reject undeclared or policy-denied values.
112    pub retry_allowed: bool,
113}
114
115impl ReplayPendingSideEffect {
116    /// Constructs this value from pending. Use it when adapting
117    /// canonical SDK records without introducing a second behavior
118    /// path.
119    pub fn from_pending(pending: PendingSideEffect) -> Self {
120        let retry_allowed = pending.idempotency_key.is_some() || pending.dedupe_key.is_some();
121        Self {
122            effect_id: pending.effect_id,
123            intent_record_id: pending.intent_record_id,
124            idempotency_key: pending.idempotency_key,
125            dedupe_key: pending.dedupe_key,
126            unsafe_pending_reason: pending.unsafe_pending_reason,
127            retry_allowed,
128        }
129    }
130}
131
132#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
133/// Holds replay result application-layer state or configuration.
134/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
135pub struct ReplayResult {
136    /// Mode that selects how this operation or contract should behave.
137    /// Callers use it to choose the explicit execution path instead of relying on hidden
138    /// defaults.
139    pub mode: ReplayMode,
140    /// Finite status for this record or lifecycle stage.
141    pub status: ReplayStatus,
142    /// Allowlist for this policy or contract.
143    /// Validation uses it to reject undeclared or policy-denied values.
144    pub resume_allowed: bool,
145    /// Latest journal seq used by this record or request.
146    pub latest_journal_seq: u64,
147    #[serde(skip_serializing_if = "Option::is_none")]
148    /// Optional terminal status value.
149    /// When absent, callers should use the documented default or skip that optional behavior.
150    pub terminal_status: Option<String>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    /// Optional next loop state value.
153    /// When absent, callers should use the documented default or skip that optional behavior.
154    pub next_loop_state: Option<String>,
155    #[serde(default, skip_serializing_if = "Vec::is_empty")]
156    /// Side effects found during replay that have intent evidence but no safe terminal result.
157    /// Recovery must reconcile or repair these entries before the run can resume safely.
158    pub unsafe_pending_side_effects: Vec<ReplayPendingSideEffect>,
159    #[serde(default, skip_serializing_if = "Vec::is_empty")]
160    /// Typed missing content refs references. Resolving them is separate from
161    /// constructing this record.
162    pub missing_content_refs: Vec<ContentRef>,
163    #[serde(default, skip_serializing_if = "Vec::is_empty")]
164    /// Replay repairs required before the durable state can be resumed safely.
165    /// Each entry names the repair category and evidence that must be reconciled.
166    pub repair_needed: Vec<ReplayRepairNeeded>,
167    #[serde(default, skip_serializing_if = "Vec::is_empty")]
168    /// Output delivery setting or policy.
169    /// Delivery coordinators use it to decide sink mode, dedupe, and required evidence.
170    pub output_delivery_repairs: Vec<OutputDeliveryReconciliationRecord>,
171    #[serde(skip_serializing_if = "Option::is_none")]
172    /// Optional latest checkpoint value.
173    /// When absent, callers should use the documented default or skip that optional behavior.
174    pub latest_checkpoint: Option<RunCheckpoint>,
175}
176
177#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
178#[serde(rename_all = "snake_case")]
179/// Enumerates the finite cursor compatibility cases.
180/// Serialized names are part of the SDK contract; update fixtures when variants change.
181pub enum CursorCompatibility {
182    /// Use this variant when the contract needs to represent compatible; selecting it has no side effect by itself.
183    Compatible,
184    /// Use this variant when the contract needs to represent scope mismatch; selecting it has no side effect by itself.
185    ScopeMismatch,
186}
187
188#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
189#[serde(rename_all = "snake_case")]
190/// Enumerates the finite durable replay support cases.
191/// Serialized names are part of the SDK contract; update fixtures when variants change.
192pub enum DurableReplaySupport {
193    /// Use this variant when the contract needs to represent run journal; selecting it has no side effect by itself.
194    RunJournal,
195    /// Use this variant when the contract needs to represent host archive required; selecting it has no side effect by itself.
196    HostArchiveRequired,
197}
198
199#[derive(Clone, Debug)]
200/// Holds replay reducer application-layer state or configuration.
201/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
202pub struct ReplayReducer {
203    mode: ReplayMode,
204    last_journal_seq: Option<u64>,
205    seen_records: BTreeMap<String, JournalRecord>,
206    available_content_refs: Option<BTreeSet<ContentRef>>,
207    missing_content_policy: MissingContentPolicy,
208    missing_content_refs: BTreeSet<ContentRef>,
209    repair_needed: Vec<ReplayRepairNeeded>,
210    unsafe_pending_side_effects: Vec<ReplayPendingSideEffect>,
211    pending_effects: BTreeMap<EffectId, ReplayPendingSideEffect>,
212    output_intents: BTreeMap<OutputDeliveryId, OutputIntentState>,
213    output_results: BTreeMap<OutputDeliveryId, OutputDeliveryResultRecord>,
214    output_dedupes: BTreeMap<DedupeKey, OutputDeliveryDedupeRecord>,
215    output_reconciliations: BTreeMap<OutputDeliveryId, OutputDeliveryReconciliationRecord>,
216    terminal_status: Option<String>,
217    latest_checkpoint: Option<RunCheckpoint>,
218}
219
220impl ReplayReducer {
221    /// Creates a new application::replay value with explicit
222    /// caller-provided inputs. This constructor is data-only and
223    /// performs no I/O or external side effects.
224    pub fn new(mode: ReplayMode) -> Self {
225        Self {
226            mode,
227            last_journal_seq: None,
228            seen_records: BTreeMap::new(),
229            available_content_refs: None,
230            missing_content_policy: MissingContentPolicy::Fail,
231            missing_content_refs: BTreeSet::new(),
232            repair_needed: Vec::new(),
233            unsafe_pending_side_effects: Vec::new(),
234            pending_effects: BTreeMap::new(),
235            output_intents: BTreeMap::new(),
236            output_results: BTreeMap::new(),
237            output_dedupes: BTreeMap::new(),
238            output_reconciliations: BTreeMap::new(),
239            terminal_status: None,
240            latest_checkpoint: None,
241        }
242    }
243
244    /// Returns this value with its available content refs setting
245    /// replaced. The method follows builder-style data construction and
246    /// does not execute external work.
247    pub fn with_available_content_refs(
248        mut self,
249        refs: impl IntoIterator<Item = ContentRef>,
250    ) -> Self {
251        self.available_content_refs = Some(refs.into_iter().collect());
252        self
253    }
254
255    /// Returns this value with its missing content policy setting
256    /// replaced. The method follows builder-style data construction and
257    /// does not execute external work.
258    pub fn with_missing_content_policy(mut self, policy: MissingContentPolicy) -> Self {
259        self.missing_content_policy = policy;
260        self
261    }
262
263    /// Apply.
264    /// This mutates only the replay projection with one journal record and never re-executes
265    /// the recorded side effect.
266    pub fn apply(&mut self, record: JournalRecord) -> Result<(), AgentError> {
267        if self
268            .seen_records
269            .get(&record.record_id)
270            .is_some_and(|seen| seen == &record && idempotent_duplicate_allowed(&record))
271        {
272            return Ok(());
273        }
274        self.validate_ordering(&record)?;
275        self.validate_not_after_terminal(&record)?;
276        self.observe_content_refs(&record.record_id, record.journal_seq, &record.content_refs);
277
278        match &record.payload {
279            JournalRecordPayload::Checkpoint(checkpoint) => {
280                checkpoint
281                    .validate_against_latest_seq(record.journal_seq)
282                    .inspect_err(|error| {
283                        self.repair(
284                            ReplayRepairKind::CheckpointInvalid,
285                            &record.record_id,
286                            record.journal_seq,
287                            error.context().message.clone(),
288                            RetryClassification::RepairNeeded,
289                        );
290                    })?;
291                self.observe_content_refs(
292                    &record.record_id,
293                    record.journal_seq,
294                    &checkpoint.content_ref_manifest,
295                );
296                if checkpoint_is_newer(checkpoint, self.latest_checkpoint.as_ref()) {
297                    self.latest_checkpoint = Some(checkpoint.clone());
298                }
299            }
300            JournalRecordPayload::Recovery(recovery) => {
301                for pending in recovery.unsafe_pending.iter().cloned() {
302                    self.add_unsafe_pending(pending, &record.record_id, record.journal_seq);
303                }
304            }
305            JournalRecordPayload::EffectIntent(intent) => {
306                self.pending_effects.insert(
307                    intent.effect_id.clone(),
308                    ReplayPendingSideEffect {
309                        effect_id: intent.effect_id.clone(),
310                        intent_record_id: record.record_id.clone(),
311                        idempotency_key: intent.idempotency_key.clone(),
312                        dedupe_key: intent.dedupe_key.clone(),
313                        unsafe_pending_reason: "effect intent has no terminal result in replay"
314                            .to_string(),
315                        retry_allowed: intent.idempotency_key.is_some()
316                            || intent.dedupe_key.is_some(),
317                    },
318                );
319            }
320            JournalRecordPayload::EffectResult(result) => {
321                self.pending_effects.remove(&result.effect_id);
322            }
323            JournalRecordPayload::OutputDelivery(output) => {
324                self.apply_output_record(output, &record);
325            }
326            JournalRecordPayload::RunLifecycle(lifecycle) => {
327                if is_terminal_lifecycle(&lifecycle.status) {
328                    self.terminal_status = Some(lifecycle.status.clone());
329                }
330            }
331            JournalRecordPayload::TerminalResult(marker) => {
332                self.pending_effects.remove(&marker.effect_id);
333                self.terminal_status = Some(marker.terminal_status.clone());
334            }
335            _ => {}
336        }
337
338        self.last_journal_seq = Some(record.journal_seq);
339        self.seen_records.insert(record.record_id.clone(), record);
340        Ok(())
341    }
342
343    /// Finish.
344    /// This finalizes replay bookkeeping into a result and does not re-execute recorded
345    /// effects.
346    pub fn finish(mut self) -> Result<ReplayResult, AgentError> {
347        self.finish_pending_effects();
348        let output_delivery_repairs = self.finish_output_deliveries();
349        let repair_needed = self.repair_needed;
350        let missing_content_refs = self.missing_content_refs.into_iter().collect::<Vec<_>>();
351        let unsafe_pending_side_effects = self.unsafe_pending_side_effects;
352        let status = if repair_needed.is_empty()
353            && missing_content_refs.is_empty()
354            && unsafe_pending_side_effects
355                .iter()
356                .all(|pending| pending.retry_allowed)
357        {
358            ReplayStatus::Complete
359        } else {
360            ReplayStatus::RepairNeeded
361        };
362        let resume_allowed =
363            self.mode != ReplayMode::ResumeReplay || status == ReplayStatus::Complete;
364
365        Ok(ReplayResult {
366            mode: self.mode,
367            status,
368            resume_allowed,
369            latest_journal_seq: self.last_journal_seq.unwrap_or(0),
370            terminal_status: self.terminal_status,
371            next_loop_state: self
372                .latest_checkpoint
373                .as_ref()
374                .map(|checkpoint| checkpoint.loop_state.clone()),
375            unsafe_pending_side_effects,
376            missing_content_refs,
377            repair_needed,
378            output_delivery_repairs,
379            latest_checkpoint: self.latest_checkpoint,
380        })
381    }
382
383    fn validate_ordering(&mut self, record: &JournalRecord) -> Result<(), AgentError> {
384        if record.journal_schema_version != JOURNAL_SCHEMA_VERSION {
385            return Err(AgentError::new(
386                AgentErrorKind::RecoveryRepairNeeded,
387                RetryClassification::RepairNeeded,
388                "journal record schema version is not supported by replay reducer",
389            ));
390        }
391
392        if self.seen_records.contains_key(&record.record_id) {
393            return Err(AgentError::new(
394                AgentErrorKind::InvalidStateTransition,
395                RetryClassification::RepairNeeded,
396                "duplicate non-idempotent journal record during replay",
397            ));
398        }
399
400        if let Some(last_seq) = self.last_journal_seq {
401            if record.journal_seq <= last_seq {
402                return Err(AgentError::new(
403                    AgentErrorKind::InvalidStateTransition,
404                    RetryClassification::RepairNeeded,
405                    "journal records must be strictly increasing during replay",
406                ));
407            }
408        }
409        Ok(())
410    }
411
412    fn validate_not_after_terminal(&self, record: &JournalRecord) -> Result<(), AgentError> {
413        if self.terminal_status.is_none()
414            || matches!(
415                record.record_kind,
416                JournalRecordKind::Checkpoint | JournalRecordKind::Recovery
417            )
418        {
419            return Ok(());
420        }
421        Err(AgentError::new(
422            AgentErrorKind::InvalidStateTransition,
423            RetryClassification::RepairNeeded,
424            "journal record appears after sealed terminal replay state",
425        ))
426    }
427
428    fn observe_content_refs(&mut self, record_id: &str, journal_seq: u64, refs: &[ContentRef]) {
429        let Some(available) = self.available_content_refs.as_ref() else {
430            return;
431        };
432        let missing = refs
433            .iter()
434            .filter(|content_ref| {
435                !available.contains(*content_ref)
436                    && !self.missing_content_refs.contains(*content_ref)
437            })
438            .cloned()
439            .collect::<Vec<_>>();
440        for content_ref in missing {
441            self.missing_content_refs.insert(content_ref.clone());
442            if matches!(
443                self.missing_content_policy,
444                MissingContentPolicy::Fail
445                    | MissingContentPolicy::RecoverableReplayGap
446                    | MissingContentPolicy::RequestHostRepair
447            ) {
448                self.repair(
449                    ReplayRepairKind::MissingContentRef,
450                    record_id,
451                    journal_seq,
452                    format!("content ref {} is missing for replay", content_ref.as_str()),
453                    RetryClassification::UserActionNeeded,
454                );
455            }
456        }
457    }
458
459    fn add_unsafe_pending(
460        &mut self,
461        pending: PendingSideEffect,
462        record_id: &str,
463        journal_seq: u64,
464    ) {
465        let pending = ReplayPendingSideEffect::from_pending(pending);
466        let repair_kind = if pending.retry_allowed {
467            ReplayRepairKind::UnsafePendingSideEffect
468        } else {
469            ReplayRepairKind::NonIdempotentPendingSideEffect
470        };
471        let reason = pending.unsafe_pending_reason.clone();
472        self.repair(
473            repair_kind,
474            record_id,
475            journal_seq,
476            reason,
477            RetryClassification::RepairNeeded,
478        );
479        self.unsafe_pending_side_effects.push(pending);
480    }
481
482    fn apply_output_record(&mut self, output: &OutputDeliveryRecord, record: &JournalRecord) {
483        match output {
484            OutputDeliveryRecord::Intent(intent) => {
485                self.output_intents.insert(
486                    intent.delivery_id.clone(),
487                    OutputIntentState {
488                        record_id: record.record_id.clone(),
489                        journal_seq: record.journal_seq,
490                        intent: intent.clone(),
491                    },
492                );
493            }
494            OutputDeliveryRecord::Result(result) => {
495                self.output_results
496                    .insert(result.delivery_id.clone(), result.clone());
497            }
498            OutputDeliveryRecord::Dedupe(dedupe) => {
499                self.output_dedupes
500                    .insert(dedupe.dedupe_key.clone(), dedupe.clone());
501            }
502            OutputDeliveryRecord::Reconciliation(reconciliation) => {
503                self.output_reconciliations
504                    .insert(reconciliation.delivery_id.clone(), reconciliation.clone());
505                self.repair(
506                    ReplayRepairKind::OutputDeliveryReconciliation,
507                    &record.record_id,
508                    record.journal_seq,
509                    reconciliation.unsafe_pending_reason.clone(),
510                    RetryClassification::RepairNeeded,
511                );
512            }
513            OutputDeliveryRecord::Event(_) => {}
514        }
515    }
516
517    fn finish_pending_effects(&mut self) {
518        let pending = self
519            .pending_effects
520            .values()
521            .cloned()
522            .collect::<Vec<ReplayPendingSideEffect>>();
523        for pending in pending {
524            let repair_kind = if pending.retry_allowed {
525                ReplayRepairKind::UnsafePendingSideEffect
526            } else {
527                ReplayRepairKind::NonIdempotentPendingSideEffect
528            };
529            self.repair(
530                repair_kind,
531                &pending.intent_record_id,
532                self.last_journal_seq.unwrap_or_default(),
533                pending.unsafe_pending_reason.clone(),
534                RetryClassification::RepairNeeded,
535            );
536            self.unsafe_pending_side_effects.push(pending);
537        }
538    }
539
540    fn finish_output_deliveries(&mut self) -> Vec<OutputDeliveryReconciliationRecord> {
541        let mut repairs = Vec::new();
542        let intents = self
543            .output_intents
544            .values()
545            .cloned()
546            .collect::<Vec<OutputIntentState>>();
547        for state in intents {
548            if self.output_results.contains_key(&state.intent.delivery_id) {
549                continue;
550            }
551            if let Some(reconciliation) = self
552                .output_reconciliations
553                .get(&state.intent.delivery_id)
554                .cloned()
555            {
556                repairs.push(reconciliation);
557                continue;
558            }
559            if let Some(dedupe) = self.output_dedupes.get(&state.intent.dedupe_key) {
560                repairs.push(reconciliation_from_dedupe(&state, dedupe));
561                continue;
562            }
563
564            let reconciliation = unsafe_output_reconciliation(&state);
565            self.repair(
566                ReplayRepairKind::OutputDeliveryReconciliation,
567                &state.record_id,
568                state.journal_seq,
569                reconciliation.unsafe_pending_reason.clone(),
570                RetryClassification::RepairNeeded,
571            );
572            repairs.push(reconciliation);
573        }
574        repairs
575    }
576
577    fn repair(
578        &mut self,
579        kind: ReplayRepairKind,
580        record_id: &str,
581        journal_seq: u64,
582        reason: impl Into<String>,
583        retry: RetryClassification,
584    ) {
585        self.repair_needed.push(ReplayRepairNeeded {
586            kind,
587            record_id: record_id.to_string(),
588            journal_seq,
589            reason: reason.into(),
590            retry,
591        });
592    }
593}
594
595/// Check cursor compatibility.
596/// This is replay bookkeeping over cursors or stream scopes and does not mutate runtime state
597/// or re-execute effects.
598pub fn check_cursor_compatibility(
599    requested_scope: &EventStreamScope,
600    cursor: Option<&EventCursor>,
601) -> CursorCompatibility {
602    match cursor_compatible(requested_scope, cursor) {
603        Ok(()) => CursorCompatibility::Compatible,
604        Err(_) => CursorCompatibility::ScopeMismatch,
605    }
606}
607
608/// Returns durable replay support derived from the supplied state.
609/// This derives SDK state locally and does not call host adapters.
610pub fn durable_replay_support(scope: &EventStreamScope) -> DurableReplaySupport {
611    match scope {
612        EventStreamScope::Run(_) => DurableReplaySupport::RunJournal,
613        EventStreamScope::All | EventStreamScope::Agent(_) | EventStreamScope::Filter { .. } => {
614            DurableReplaySupport::HostArchiveRequired
615        }
616    }
617}
618
619#[derive(Clone, Debug)]
620struct OutputIntentState {
621    record_id: String,
622    journal_seq: u64,
623    intent: OutputDeliveryIntentRecord,
624}
625
626fn reconciliation_from_dedupe(
627    state: &OutputIntentState,
628    dedupe: &OutputDeliveryDedupeRecord,
629) -> OutputDeliveryReconciliationRecord {
630    OutputDeliveryReconciliationRecord {
631        delivery_id: state.intent.delivery_id.clone(),
632        intent_record_id: state.record_id.clone(),
633        side_effect_kind: crate::effect::EffectKind::OutputDelivery,
634        idempotency_key: state.intent.idempotency_key.clone(),
635        dedupe_key: state.intent.dedupe_key.clone(),
636        external_operation_id: dedupe.prior_external_operation_id.clone(),
637        terminal_status: dedupe.prior_terminal_status,
638        terminal_append_status: TerminalAppendStatus::NotAttempted,
639        reconciliation_adapter: Some(state.intent.sink_ref.clone()),
640        unsafe_pending_reason: "repair replay found completed dedupe proof".to_string(),
641        replay_decision: ReplayRepairDecision::CompletedByDedupeProof,
642        resend_allowed: false,
643    }
644}
645
646fn unsafe_output_reconciliation(state: &OutputIntentState) -> OutputDeliveryReconciliationRecord {
647    OutputDeliveryReconciliationRecord {
648        delivery_id: state.intent.delivery_id.clone(),
649        intent_record_id: state.record_id.clone(),
650        side_effect_kind: crate::effect::EffectKind::OutputDelivery,
651        idempotency_key: state.intent.idempotency_key.clone(),
652        dedupe_key: state.intent.dedupe_key.clone(),
653        external_operation_id: None,
654        terminal_status: OutputDispatchStatus::ReconciliationNeeded,
655        terminal_append_status: TerminalAppendStatus::NotAttempted,
656        reconciliation_adapter: Some(state.intent.sink_ref.clone()),
657        unsafe_pending_reason:
658            "repair replay cannot resend output delivery without completed dedupe proof".to_string(),
659        replay_decision: ReplayRepairDecision::UnsafePending,
660        resend_allowed: false,
661    }
662}
663
664fn checkpoint_is_newer(candidate: &RunCheckpoint, current: Option<&RunCheckpoint>) -> bool {
665    current.is_none_or(|current| {
666        (
667            candidate.covers_journal_seq,
668            candidate.checkpoint_seq,
669            candidate.created_at_millis,
670        ) > (
671            current.covers_journal_seq,
672            current.checkpoint_seq,
673            current.created_at_millis,
674        )
675    })
676}
677
678fn is_terminal_lifecycle(status: &str) -> bool {
679    matches!(
680        status,
681        "completed" | "failed" | "cancelled" | "run_completed" | "run_failed" | "run_cancelled"
682    )
683}
684
685fn idempotent_duplicate_allowed(record: &JournalRecord) -> bool {
686    record.idempotency_key.is_some()
687        || record.dedupe_key.is_some()
688        || matches!(
689            record.record_kind,
690            JournalRecordKind::Checkpoint | JournalRecordKind::Recovery
691        )
692}
693
694/// Returns journal cursor for seq derived from the supplied state.
695/// This derives SDK state locally and does not call host adapters.
696pub fn journal_cursor_for_seq(seq: u64) -> JournalCursor {
697    JournalCursor::new(format!("journal.{seq}"))
698}