agent_sdk_core/application/anti_entropy.rs
1//! Application-layer coordination over core primitives. Use these services to lower
2//! helpers, drive runs, validate output, coordinate tools, approvals, delivery,
3//! isolation, telemetry, and feature layers. Methods in this layer may call
4//! configured ports, mutate in-memory stores, append journals, or publish events as
5//! documented. This file contains the anti entropy portion of that contract.
6//!
7use serde::{Deserialize, Serialize};
8
9use crate::{
10 domain::{AgentError, JournalCursor},
11 journal::{JournalRecord, JournalRecordPayload},
12 output_delivery::OutputDeliveryRecord,
13 replay::journal_cursor_for_seq,
14};
15
16#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
17#[serde(rename_all = "snake_case")]
18/// Enumerates the finite derived view kind cases.
19/// Serialized names are part of the SDK contract; update fixtures when variants change.
20pub enum DerivedViewKind {
21 /// Use this variant when the contract needs to represent event subscription index; selecting it has no side effect by itself.
22 EventSubscriptionIndex,
23 /// Use this variant when the contract needs to represent output dedupe index; selecting it has no side effect by itself.
24 OutputDedupeIndex,
25 /// Use this variant when the contract needs to represent output sink repair cursor; selecting it has no side effect by itself.
26 OutputSinkRepairCursor,
27 /// Use this variant when the contract needs to represent telemetry projection; selecting it has no side effect by itself.
28 TelemetryProjection,
29}
30
31#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
32/// Holds derived view state application-layer state or configuration.
33/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
34pub struct DerivedViewState {
35 /// Stable view id used for typed lineage, lookup, or dedupe.
36 pub view_id: String,
37 /// Kind discriminator for view kind.
38 /// Use it to route finite match arms without parsing display text.
39 pub view_kind: DerivedViewKind,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 /// Last journal cursor this derived view successfully reconciled.
42 /// Anti-entropy scans use it to avoid replaying already-repaired derived-view gaps.
43 pub last_repaired_cursor: Option<JournalCursor>,
44}
45
46#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
47/// Holds anti entropy repair application-layer state or configuration.
48/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
49pub struct AntiEntropyRepair {
50 /// Stable view id used for typed lineage, lookup, or dedupe.
51 pub view_id: String,
52 /// Kind discriminator for view kind.
53 /// Use it to route finite match arms without parsing display text.
54 pub view_kind: DerivedViewKind,
55 /// First journal cursor included in the derived-view gap.
56 /// Repair logic uses this as the lower bound for replay or reconciliation evidence.
57 pub repair_from: JournalCursor,
58 /// Last journal cursor included in the derived-view gap.
59 /// Repair logic stores this as the cursor reached after the derived view is reconciled.
60 pub repair_to: JournalCursor,
61 /// Identifiers used to select or correlate affected record values.
62 /// Use them for typed lookup, filtering, or lineage instead of stringly typed matching.
63 pub affected_record_ids: Vec<String>,
64 /// Human-readable reason the anti-entropy scan queued this repair.
65 /// Use it for diagnostics and host action prompts, not as a machine policy discriminator.
66 pub repair_reason: String,
67 /// Whether host action required is enabled.
68 /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
69 pub host_action_required: bool,
70 /// Whether recovery must account for an external side effect that may already have
71 /// happened.
72 /// Repair code uses this to choose compensation or reconciliation instead of blindly
73 /// retrying the effect.
74 pub external_side_effect_compensation: bool,
75}
76
77#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
78/// Holds anti entropy report application-layer state or configuration.
79/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
80pub struct AntiEntropyReport {
81 /// Cursor identifying a replay, export, or subscription position.
82 /// Use it to resume without widening the original scope.
83 pub latest_journal_cursor: JournalCursor,
84 #[serde(default, skip_serializing_if = "Vec::is_empty")]
85 /// Repairs queued by the scan for stale or inconsistent derived views.
86 /// Applying these records mutates only the derived-view state unless a host action is explicitly
87 /// requested.
88 pub repairs: Vec<AntiEntropyRepair>,
89}
90
91#[derive(Clone, Debug, Default)]
92/// Holds anti entropy scanner application-layer state or configuration.
93/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
94pub struct AntiEntropyScanner;
95
96impl AntiEntropyScanner {
97 /// Returns derived view derived from the supplied state.
98 /// This uses only local coordinator state and performs no hidden host work.
99 pub fn derived_view(
100 &self,
101 view_id: impl Into<String>,
102 last_repaired_cursor: Option<JournalCursor>,
103 ) -> DerivedViewState {
104 DerivedViewState {
105 view_id: view_id.into(),
106 view_kind: DerivedViewKind::OutputSinkRepairCursor,
107 last_repaired_cursor,
108 }
109 }
110
111 /// Operates on in-memory or journal-derived application::anti_entropy
112 /// state for diagnostics and repair evidence. It does not create a second
113 /// run loop or product workflow owner.
114 pub fn scan(
115 &self,
116 records: &[JournalRecord],
117 views: &[DerivedViewState],
118 ) -> Result<AntiEntropyReport, AgentError> {
119 let latest_seq = records
120 .iter()
121 .map(|record| record.journal_seq)
122 .max()
123 .unwrap_or(0);
124 let mut repairs = Vec::new();
125 for view in views {
126 if let Some(repair) = self.scan_view(records, view)? {
127 repairs.push(repair);
128 }
129 }
130 Ok(AntiEntropyReport {
131 latest_journal_cursor: journal_cursor_for_seq(latest_seq),
132 repairs,
133 })
134 }
135
136 /// Returns repair internal view derived from the supplied state.
137 /// This uses only local coordinator state and performs no hidden host work.
138 pub fn repair_internal_view(
139 &self,
140 view: &mut DerivedViewState,
141 repair: &AntiEntropyRepair,
142 ) -> Result<(), AgentError> {
143 if view.view_id != repair.view_id || view.view_kind != repair.view_kind {
144 return Err(AgentError::contract_violation(
145 "anti-entropy repair does not target this derived view",
146 ));
147 }
148 view.last_repaired_cursor = Some(repair.repair_to.clone());
149 Ok(())
150 }
151
152 fn scan_view(
153 &self,
154 records: &[JournalRecord],
155 view: &DerivedViewState,
156 ) -> Result<Option<AntiEntropyRepair>, AgentError> {
157 let last_seq = view
158 .last_repaired_cursor
159 .as_ref()
160 .map(journal_cursor_seq)
161 .unwrap_or(0);
162 let relevant = records
163 .iter()
164 .filter(|record| record.journal_seq > last_seq)
165 .filter(|record| relevant_to_view(record, &view.view_kind))
166 .collect::<Vec<_>>();
167 if relevant.is_empty() {
168 return Ok(None);
169 }
170
171 let repair_from = relevant
172 .iter()
173 .map(|record| record.journal_seq)
174 .min()
175 .map(journal_cursor_for_seq)
176 .expect("nonempty relevant records");
177 let repair_to = relevant
178 .iter()
179 .map(|record| record.journal_seq)
180 .max()
181 .map(journal_cursor_for_seq)
182 .expect("nonempty relevant records");
183 let host_action_required = relevant.iter().any(|record| {
184 matches!(
185 &record.payload,
186 JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Reconciliation(_))
187 )
188 });
189
190 Ok(Some(AntiEntropyRepair {
191 view_id: view.view_id.clone(),
192 view_kind: view.view_kind.clone(),
193 repair_from,
194 repair_to,
195 affected_record_ids: relevant
196 .iter()
197 .map(|record| record.record_id.clone())
198 .collect(),
199 repair_reason: match view.view_kind {
200 DerivedViewKind::OutputSinkRepairCursor if host_action_required => {
201 "output delivery reconciliation requires sink-scoped repair cursor".to_string()
202 }
203 DerivedViewKind::OutputSinkRepairCursor => {
204 "output delivery derived view is behind journal".to_string()
205 }
206 _ => "derived view is behind journal".to_string(),
207 },
208 host_action_required,
209 external_side_effect_compensation: false,
210 }))
211 }
212}
213
214fn relevant_to_view(record: &JournalRecord, kind: &DerivedViewKind) -> bool {
215 match kind {
216 DerivedViewKind::OutputSinkRepairCursor => matches!(
217 &record.payload,
218 JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Intent(_))
219 | JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Reconciliation(_))
220 ),
221 DerivedViewKind::OutputDedupeIndex => matches!(
222 &record.payload,
223 JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Dedupe(_))
224 | JournalRecordPayload::OutputDelivery(OutputDeliveryRecord::Result(_))
225 ),
226 DerivedViewKind::EventSubscriptionIndex | DerivedViewKind::TelemetryProjection => true,
227 }
228}
229
230fn journal_cursor_seq(cursor: &JournalCursor) -> u64 {
231 cursor
232 .as_str()
233 .rsplit_once('.')
234 .and_then(|(_, seq)| seq.parse::<u64>().ok())
235 .unwrap_or(0)
236}