Skip to main content

agent_sdk_core/application/
anti_entropy.rs

1//! Application-layer coordination over core primitives. Use these services to lower
2//! helpers, drive runs, validate output, coordinate tools, approvals, delivery,
3//! isolation, telemetry, and feature layers. Methods in this layer may call
4//! configured ports, mutate in-memory stores, append journals, or publish events as
5//! documented. This file contains the anti entropy portion of that contract.
6//!
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    domain::{AgentError, JournalCursor},
11    journal::{JournalRecord, JournalRecordPayload},
12    output_delivery::OutputDeliveryRecord,
13    replay::journal_cursor_for_seq,
14};
15
16#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
17#[serde(rename_all = "snake_case")]
18/// Enumerates the finite derived view kind cases.
19/// Serialized names are part of the SDK contract; update fixtures when variants change.
20pub enum DerivedViewKind {
21    /// Use this variant when the contract needs to represent event subscription index; selecting it has no side effect by itself.
22    EventSubscriptionIndex,
23    /// Use this variant when the contract needs to represent output dedupe index; selecting it has no side effect by itself.
24    OutputDedupeIndex,
25    /// Use this variant when the contract needs to represent output sink repair cursor; selecting it has no side effect by itself.
26    OutputSinkRepairCursor,
27    /// Use this variant when the contract needs to represent telemetry projection; selecting it has no side effect by itself.
28    TelemetryProjection,
29}
30
31#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
32/// Holds derived view state application-layer state or configuration.
33/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
34pub struct DerivedViewState {
35    /// Stable view id used for typed lineage, lookup, or dedupe.
36    pub view_id: String,
37    /// Kind discriminator for view kind.
38    /// Use it to route finite match arms without parsing display text.
39    pub view_kind: DerivedViewKind,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    /// Last journal cursor this derived view successfully reconciled.
42    /// Anti-entropy scans use it to avoid replaying already-repaired derived-view gaps.
43    pub last_repaired_cursor: Option<JournalCursor>,
44}
45
46#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
47/// Holds anti entropy repair application-layer state or configuration.
48/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
49pub struct AntiEntropyRepair {
50    /// Stable view id used for typed lineage, lookup, or dedupe.
51    pub view_id: String,
52    /// Kind discriminator for view kind.
53    /// Use it to route finite match arms without parsing display text.
54    pub view_kind: DerivedViewKind,
55    /// First journal cursor included in the derived-view gap.
56    /// Repair logic uses this as the lower bound for replay or reconciliation evidence.
57    pub repair_from: JournalCursor,
58    /// Last journal cursor included in the derived-view gap.
59    /// Repair logic stores this as the cursor reached after the derived view is reconciled.
60    pub repair_to: JournalCursor,
61    /// Identifiers used to select or correlate affected record values.
62    /// Use them for typed lookup, filtering, or lineage instead of stringly typed matching.
63    pub affected_record_ids: Vec<String>,
64    /// Human-readable reason the anti-entropy scan queued this repair.
65    /// Use it for diagnostics and host action prompts, not as a machine policy discriminator.
66    pub repair_reason: String,
67    /// Whether host action required is enabled.
68    /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
69    pub host_action_required: bool,
70    /// Whether recovery must account for an external side effect that may already have
71    /// happened.
72    /// Repair code uses this to choose compensation or reconciliation instead of blindly
73    /// retrying the effect.
74    pub external_side_effect_compensation: bool,
75}
76
77#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
78/// Holds anti entropy report application-layer state or configuration.
79/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
80pub struct AntiEntropyReport {
81    /// Cursor identifying a replay, export, or subscription position.
82    /// Use it to resume without widening the original scope.
83    pub latest_journal_cursor: JournalCursor,
84    #[serde(default, skip_serializing_if = "Vec::is_empty")]
85    /// Repairs queued by the scan for stale or inconsistent derived views.
86    /// Applying these records mutates only the derived-view state unless a host action is explicitly
87    /// requested.
88    pub repairs: Vec<AntiEntropyRepair>,
89}
90
91#[derive(Clone, Debug, Default)]
92/// Holds anti entropy scanner application-layer state or configuration.
93/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
94pub struct AntiEntropyScanner;
95
96impl AntiEntropyScanner {
97    /// Returns derived view derived from the supplied state.
98    /// This uses only local coordinator state and performs no hidden host work.
99    pub fn derived_view(
100        &self,
101        view_id: impl Into<String>,
102        last_repaired_cursor: Option<JournalCursor>,
103    ) -> DerivedViewState {
104        DerivedViewState {
105            view_id: view_id.into(),
106            view_kind: DerivedViewKind::OutputSinkRepairCursor,
107            last_repaired_cursor,
108        }
109    }
110
111    /// Operates on in-memory or journal-derived application::anti_entropy
112    /// state for diagnostics and repair evidence. It does not create a second
113    /// run loop or product workflow owner.
114    pub fn scan(
115        &self,
116        records: &[JournalRecord],
117        views: &[DerivedViewState],
118    ) -> Result<AntiEntropyReport, AgentError> {
119        let latest_seq = records
120            .iter()
121            .map(|record| record.journal_seq)
122            .max()
123            .unwrap_or(0);
124        let mut repairs = Vec::new();
125        for view in views {
126            if let Some(repair) = self.scan_view(records, view)? {
127                repairs.push(repair);
128            }
129        }
130        Ok(AntiEntropyReport {
131            latest_journal_cursor: journal_cursor_for_seq(latest_seq),
132            repairs,
133        })
134    }
135
136    /// Returns repair internal view derived from the supplied state.
137    /// This uses only local coordinator state and performs no hidden host work.
138    pub fn repair_internal_view(
139        &self,
140        view: &mut DerivedViewState,
141        repair: &AntiEntropyRepair,
142    ) -> Result<(), AgentError> {
143        if view.view_id != repair.view_id || view.view_kind != repair.view_kind {
144            return Err(AgentError::contract_violation(
145                "anti-entropy repair does not target this derived view",
146            ));
147        }
148        view.last_repaired_cursor = Some(repair.repair_to.clone());
149        Ok(())
150    }
151
152    fn scan_view(
153        &self,
154        records: &[JournalRecord],
155        view: &DerivedViewState,
156    ) -> Result<Option<AntiEntropyRepair>, AgentError> {
157        let last_seq = view
158            .last_repaired_cursor
159            .as_ref()
160            .map(journal_cursor_seq)
161            .unwrap_or(0);
162        let relevant = records
163            .iter()
164            .filter(|record| record.journal_seq > last_seq)
165            .filter(|record| relevant_to_view(record, &view.view_kind))
166            .collect::<Vec<_>>();
167        if relevant.is_empty() {
168            return Ok(None);
169        }
170
171        let repair_from = relevant
172            .iter()
173            .map(|record| record.journal_seq)
174            .min()
175            .map(journal_cursor_for_seq)
176            .expect("nonempty relevant records");
177        let repair_to = relevant
178            .iter()
179            .map(|record| record.journal_seq)
180            .max()
181            .map(journal_cursor_for_seq)
182            .expect("nonempty relevant records");
183        let host_action_required = relevant.iter().any(|record| {
184            matches!(
185                &record.payload,
186                JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Reconciliation(_))
187            )
188        });
189
190        Ok(Some(AntiEntropyRepair {
191            view_id: view.view_id.clone(),
192            view_kind: view.view_kind.clone(),
193            repair_from,
194            repair_to,
195            affected_record_ids: relevant
196                .iter()
197                .map(|record| record.record_id.clone())
198                .collect(),
199            repair_reason: match view.view_kind {
200                DerivedViewKind::OutputSinkRepairCursor if host_action_required => {
201                    "output delivery reconciliation requires sink-scoped repair cursor".to_string()
202                }
203                DerivedViewKind::OutputSinkRepairCursor => {
204                    "output delivery derived view is behind journal".to_string()
205                }
206                _ => "derived view is behind journal".to_string(),
207            },
208            host_action_required,
209            external_side_effect_compensation: false,
210        }))
211    }
212}
213
214fn relevant_to_view(record: &JournalRecord, kind: &DerivedViewKind) -> bool {
215    match kind {
216        DerivedViewKind::OutputSinkRepairCursor => matches!(
217            &record.payload,
218            JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Intent(_))
219                | JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Reconciliation(_))
220        ),
221        DerivedViewKind::OutputDedupeIndex => matches!(
222            &record.payload,
223            JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Dedupe(_))
224                | JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Result(_))
225        ),
226        DerivedViewKind::EventSubscriptionIndex | DerivedViewKind::TelemetryProjection => true,
227    }
228}
229
230fn journal_cursor_seq(cursor: &JournalCursor) -> u64 {
231    cursor
232        .as_str()
233        .rsplit_once('.')
234        .and_then(|(_, seq)| seq.parse::<u64>().ok())
235        .unwrap_or(0)
236}