Skip to main content

nodedb_crdt/
policy.rs

1//! Declarative conflict resolution policies for CRDT constraint violations.
2//!
3//! ~80% of constraint violations should be auto-resolved
4//! via declarative policies, with the DLQ serving as fallback-only.
5//!
6//! # Built-in Policies
7//!
8//! - `LAST_WRITER_WINS` — incoming write wins; later HLC timestamp takes precedence
9//! - `RENAME_APPEND_SUFFIX` — UNIQUE violations auto-resolve by appending `_1`, `_2`, ...
10//! - `CASCADE_DEFER` — FK violations queue for exponential backoff retry (max 3 attempts)
11//! - `CUSTOM(webhook_url)` — POST conflict to webhook; response determines action
12//! - `ESCALATE_TO_DLQ` — route directly to dead-letter queue (explicit fallback)
13//!
14//! # Configuration
15//!
16//! Policies are set per-collection and per-constraint-kind via DDL:
17//!
18//! ```sql
19//! ALTER COLLECTION agents SET ON CONFLICT LAST_WRITER_WINS FOR UNIQUE;
20//! ALTER COLLECTION posts SET ON CONFLICT CASCADE_DEFER FOR FOREIGN_KEY;
21//! ```
22
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25
26/// A conflict resolution policy that determines how constraint violations are handled.
27#[derive(Debug, Clone, Serialize, Deserialize, Default)]
28pub enum ConflictPolicy {
29    /// Resolve using HLC timestamp ordering; later write wins.
30    /// Audit trail entry is emitted for visibility.
31    #[default]
32    LastWriterWins,
33
34    /// For UNIQUE violations, append monotonic suffix (`_1`, `_2`, ...) to conflicting field.
35    /// Agent is notified of the renamed value.
36    RenameSuffix,
37
38    /// For FK violations, queue delta for retry with exponential backoff.
39    /// Max retries and TTL are configurable.
40    /// If parent not created within TTL, route to DLQ.
41    CascadeDefer {
42        /// Maximum number of retry attempts (default 3).
43        max_retries: u32,
44        /// Time-to-live in seconds before giving up (default 300).
45        ttl_secs: u64,
46    },
47
48    /// Escape hatch: POST conflict to webhook; response determines accept/reject/rewrite.
49    /// Timeout (default 5s) routes to DLQ.
50    Custom {
51        /// Webhook endpoint URL.
52        webhook_url: String,
53        /// Timeout in seconds (default 5).
54        timeout_secs: u64,
55    },
56
57    /// Explicitly route to the dead-letter queue.
58    /// Used for strict consistency mode or as a fallback when nothing else applies.
59    EscalateToDlq,
60}
61
62/// The result of attempting to resolve a conflict via policy.
63#[derive(Debug, Clone)]
64pub enum PolicyResolution {
65    /// Successfully auto-resolved by applying a policy.
66    AutoResolved(ResolvedAction),
67
68    /// Conflict deferred for retry; caller should enqueue to DeferredQueue.
69    /// `retry_after_ms`: milliseconds to wait before retry
70    /// `attempt`: current attempt number
71    Deferred { retry_after_ms: u64, attempt: u32 },
72
73    /// Requires async webhook call; caller must POST conflict and await response.
74    WebhookRequired {
75        webhook_url: String,
76        timeout_secs: u64,
77    },
78
79    /// Escalate to dead-letter queue.
80    Escalate,
81}
82
83/// The specific action taken to resolve a conflict.
84#[derive(Debug, Clone)]
85pub enum ResolvedAction {
86    /// Overwrite existing value with incoming delta.
87    OverwriteExisting,
88
89    /// Auto-renamed field to resolve UNIQUE conflict.
90    RenamedField { field: String, new_value: String },
91}
92
93/// Per-collection conflict resolution policies, keyed by constraint kind.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct CollectionPolicy {
96    /// Policy for UNIQUE constraint violations.
97    pub unique: ConflictPolicy,
98    /// Policy for FOREIGN_KEY constraint violations.
99    pub foreign_key: ConflictPolicy,
100    /// Policy for NOT_NULL constraint violations.
101    pub not_null: ConflictPolicy,
102    /// Policy for CHECK constraint violations.
103    pub check: ConflictPolicy,
104    /// If `true`, stricter defaults apply (defaults to EscalateToDlq).
105    pub strict_consistency: bool,
106}
107
108impl CollectionPolicy {
109    /// Create a policy suitable for ephemeral agent state (AP mode).
110    /// Defaults to LAST_WRITER_WINS for most constraints.
111    pub fn ephemeral() -> Self {
112        Self {
113            unique: ConflictPolicy::RenameSuffix,
114            foreign_key: ConflictPolicy::CascadeDefer {
115                max_retries: 3,
116                ttl_secs: 300,
117            },
118            not_null: ConflictPolicy::LastWriterWins,
119            check: ConflictPolicy::EscalateToDlq,
120            strict_consistency: false,
121        }
122    }
123
124    /// Create a policy suitable for strict consistency mode (CP mode).
125    /// Defaults to escalating most violations to DLQ.
126    pub fn strict() -> Self {
127        Self {
128            unique: ConflictPolicy::EscalateToDlq,
129            foreign_key: ConflictPolicy::EscalateToDlq,
130            not_null: ConflictPolicy::EscalateToDlq,
131            check: ConflictPolicy::EscalateToDlq,
132            strict_consistency: true,
133        }
134    }
135
136    /// Retrieve the policy for a given constraint kind.
137    pub fn for_kind(&self, kind: &crate::constraint::ConstraintKind) -> &ConflictPolicy {
138        match kind {
139            crate::constraint::ConstraintKind::Unique => &self.unique,
140            crate::constraint::ConstraintKind::ForeignKey { .. } => &self.foreign_key,
141            crate::constraint::ConstraintKind::NotNull => &self.not_null,
142            crate::constraint::ConstraintKind::Check { .. } => &self.check,
143        }
144    }
145}
146
147/// Registry of conflict resolution policies, keyed by collection name.
148#[derive(Debug, Clone, Default)]
149pub struct PolicyRegistry {
150    policies: HashMap<String, CollectionPolicy>,
151}
152
153impl PolicyRegistry {
154    /// Create a new, empty policy registry.
155    pub fn new() -> Self {
156        Self {
157            policies: HashMap::new(),
158        }
159    }
160
161    /// Set the full policy for a collection (overwriting any previous policy).
162    pub fn set(&mut self, collection: &str, policy: CollectionPolicy) {
163        self.policies.insert(collection.to_string(), policy);
164    }
165
166    /// Set the policy for a specific constraint kind within a collection.
167    pub fn set_for_kind(
168        &mut self,
169        collection: &str,
170        kind: &crate::constraint::ConstraintKind,
171        policy: ConflictPolicy,
172    ) {
173        let mut coll_policy = self.get_owned(collection);
174        match kind {
175            crate::constraint::ConstraintKind::Unique => coll_policy.unique = policy,
176            crate::constraint::ConstraintKind::ForeignKey { .. } => {
177                coll_policy.foreign_key = policy
178            }
179            crate::constraint::ConstraintKind::NotNull => coll_policy.not_null = policy,
180            crate::constraint::ConstraintKind::Check { .. } => coll_policy.check = policy,
181        }
182        self.set(collection, coll_policy);
183    }
184
185    /// Retrieve the policy for a collection as an owned value.
186    /// Returns the registered policy if found, otherwise returns the default (ephemeral).
187    pub fn get_owned(&self, collection: &str) -> CollectionPolicy {
188        self.policies
189            .get(collection)
190            .cloned()
191            .unwrap_or_else(CollectionPolicy::ephemeral)
192    }
193
194    /// Check if a collection has an explicit policy registered.
195    pub fn has(&self, collection: &str) -> bool {
196        self.policies.contains_key(collection)
197    }
198
199    /// Total number of registered policies.
200    pub fn len(&self) -> usize {
201        self.policies.len()
202    }
203
204    pub fn is_empty(&self) -> bool {
205        self.policies.is_empty()
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::constraint::ConstraintKind;
213
214    #[test]
215    fn ephemeral_policy_defaults() {
216        let policy = CollectionPolicy::ephemeral();
217        assert!(!policy.strict_consistency);
218        assert!(matches!(policy.unique, ConflictPolicy::RenameSuffix));
219        assert!(matches!(
220            policy.foreign_key,
221            ConflictPolicy::CascadeDefer { .. }
222        ));
223    }
224
225    #[test]
226    fn strict_policy_defaults() {
227        let policy = CollectionPolicy::strict();
228        assert!(policy.strict_consistency);
229        assert!(matches!(policy.unique, ConflictPolicy::EscalateToDlq));
230        assert!(matches!(policy.foreign_key, ConflictPolicy::EscalateToDlq));
231    }
232
233    #[test]
234    fn for_kind_lookup() {
235        let policy = CollectionPolicy::ephemeral();
236
237        let unique_policy = policy.for_kind(&ConstraintKind::Unique);
238        assert!(matches!(unique_policy, ConflictPolicy::RenameSuffix));
239
240        let fk_policy = policy.for_kind(&ConstraintKind::ForeignKey {
241            ref_collection: "users".into(),
242            ref_key: "id".into(),
243        });
244        assert!(matches!(fk_policy, ConflictPolicy::CascadeDefer { .. }));
245    }
246
247    #[test]
248    fn registry_set_and_get() {
249        let mut registry = PolicyRegistry::new();
250        let policy = CollectionPolicy::strict();
251
252        registry.set("agents", policy.clone());
253
254        // Note: Due to lifetime constraints, get returns a static default.
255        // For testing, we verify has() works correctly instead.
256        assert!(registry.has("agents"));
257        assert!(!registry.has("unknown"));
258    }
259
260    #[test]
261    fn registry_set_for_kind() {
262        let mut registry = PolicyRegistry::new();
263
264        // Start with a default policy
265        registry.set("posts", CollectionPolicy::ephemeral());
266
267        // Override UNIQUE policy only
268        registry.set_for_kind(
269            "posts",
270            &ConstraintKind::Unique,
271            ConflictPolicy::LastWriterWins,
272        );
273
274        assert!(registry.has("posts"));
275    }
276
277    #[test]
278    fn registry_len() {
279        let mut registry = PolicyRegistry::new();
280
281        assert_eq!(registry.len(), 0);
282
283        registry.set("coll1", CollectionPolicy::ephemeral());
284        assert_eq!(registry.len(), 1);
285
286        registry.set("coll2", CollectionPolicy::strict());
287        assert_eq!(registry.len(), 2);
288
289        // Overwriting doesn't increase len
290        registry.set("coll1", CollectionPolicy::strict());
291        assert_eq!(registry.len(), 2);
292    }
293
294    #[test]
295    fn conflict_policy_default() {
296        let policy: ConflictPolicy = Default::default();
297        assert!(matches!(policy, ConflictPolicy::LastWriterWins));
298    }
299
300    #[test]
301    fn cascade_defer_exponential_backoff() {
302        // Verify exponential backoff calculation logic.
303        // base: 500ms
304        // attempt 0: 500ms
305        // attempt 1: 500 * 2^1 = 1000ms
306        // attempt 2: 500 * 2^2 = 2000ms
307        // Max: 30s
308        let base_ms = 500u64;
309        for attempt in 0..5 {
310            let backoff = base_ms.saturating_mul(2_u64.saturating_pow(attempt));
311            let capped = backoff.min(30_000);
312            assert!(capped <= 30_000);
313        }
314    }
315}