Skip to main content

nodedb_crdt/
policy.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Declarative conflict resolution policies for CRDT constraint violations.
4//!
5//! ~80% of constraint violations should be auto-resolved
6//! via declarative policies, with the DLQ serving as fallback-only.
7//!
8//! # Built-in Policies
9//!
10//! - `LAST_WRITER_WINS` — incoming write wins; later HLC timestamp takes precedence
11//! - `RENAME_APPEND_SUFFIX` — UNIQUE violations auto-resolve by appending `_1`, `_2`, ...
12//! - `CASCADE_DEFER` — FK violations queue for exponential backoff retry (max 3 attempts)
13//! - `CUSTOM(webhook_url)` — POST conflict to webhook; response determines action
14//! - `ESCALATE_TO_DLQ` — route directly to dead-letter queue (explicit fallback)
15//!
16//! # Configuration
17//!
18//! Policies are set per-collection and per-constraint-kind via DDL:
19//!
20//! ```sql
21//! ALTER COLLECTION agents SET ON CONFLICT LAST_WRITER_WINS FOR UNIQUE;
22//! ALTER COLLECTION posts SET ON CONFLICT CASCADE_DEFER FOR FOREIGN_KEY;
23//! ```
24
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27
28/// A conflict resolution policy that determines how constraint violations are handled.
29#[derive(Debug, Clone, Serialize, Deserialize, Default)]
30pub enum ConflictPolicy {
31    /// Resolve using HLC timestamp ordering; later write wins.
32    /// Audit trail entry is emitted for visibility.
33    #[default]
34    LastWriterWins,
35
36    /// For UNIQUE violations, append monotonic suffix (`_1`, `_2`, ...) to conflicting field.
37    /// Agent is notified of the renamed value.
38    RenameSuffix,
39
40    /// For FK violations, queue delta for retry with exponential backoff.
41    /// Max retries and TTL are configurable.
42    /// If parent not created within TTL, route to DLQ.
43    CascadeDefer {
44        /// Maximum number of retry attempts (default 3).
45        max_retries: u32,
46        /// Time-to-live in seconds before giving up (default 300).
47        ttl_secs: u64,
48    },
49
50    /// Escape hatch: POST conflict to webhook; response determines accept/reject/rewrite.
51    /// Timeout (default 5s) routes to DLQ.
52    Custom {
53        /// Webhook endpoint URL.
54        webhook_url: String,
55        /// Timeout in seconds (default 5).
56        timeout_secs: u64,
57    },
58
59    /// Explicitly route to the dead-letter queue.
60    /// Used for strict consistency mode or as a fallback when nothing else applies.
61    EscalateToDlq,
62}
63
64/// The result of attempting to resolve a conflict via policy.
65#[derive(Debug, Clone)]
66pub enum PolicyResolution {
67    /// Successfully auto-resolved by applying a policy.
68    AutoResolved(ResolvedAction),
69
70    /// Conflict deferred for retry; caller should enqueue to DeferredQueue.
71    /// `retry_after_ms`: milliseconds to wait before retry
72    /// `attempt`: current attempt number
73    Deferred { retry_after_ms: u64, attempt: u32 },
74
75    /// Requires async webhook call; caller must POST conflict and await response.
76    WebhookRequired {
77        webhook_url: String,
78        timeout_secs: u64,
79    },
80
81    /// Escalate to dead-letter queue.
82    Escalate,
83}
84
85/// The specific action taken to resolve a conflict.
86#[derive(Debug, Clone)]
87pub enum ResolvedAction {
88    /// Overwrite existing value with incoming delta.
89    OverwriteExisting,
90
91    /// Auto-renamed field to resolve UNIQUE conflict.
92    RenamedField { field: String, new_value: String },
93}
94
95/// Per-collection conflict resolution policies, keyed by constraint kind.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct CollectionPolicy {
98    /// Policy for UNIQUE constraint violations.
99    pub unique: ConflictPolicy,
100    /// Policy for FOREIGN_KEY constraint violations.
101    pub foreign_key: ConflictPolicy,
102    /// Policy for NOT_NULL constraint violations.
103    pub not_null: ConflictPolicy,
104    /// Policy for CHECK constraint violations.
105    pub check: ConflictPolicy,
106    /// If `true`, stricter defaults apply (defaults to EscalateToDlq).
107    pub strict_consistency: bool,
108}
109
110impl CollectionPolicy {
111    /// Create a policy suitable for ephemeral agent state (AP mode).
112    /// Defaults to LAST_WRITER_WINS for most constraints.
113    pub fn ephemeral() -> Self {
114        Self {
115            unique: ConflictPolicy::RenameSuffix,
116            foreign_key: ConflictPolicy::CascadeDefer {
117                max_retries: 3,
118                ttl_secs: 300,
119            },
120            not_null: ConflictPolicy::LastWriterWins,
121            check: ConflictPolicy::EscalateToDlq,
122            strict_consistency: false,
123        }
124    }
125
126    /// Create a policy suitable for strict consistency mode (CP mode).
127    /// Defaults to escalating most violations to DLQ.
128    pub fn strict() -> Self {
129        Self {
130            unique: ConflictPolicy::EscalateToDlq,
131            foreign_key: ConflictPolicy::EscalateToDlq,
132            not_null: ConflictPolicy::EscalateToDlq,
133            check: ConflictPolicy::EscalateToDlq,
134            strict_consistency: true,
135        }
136    }
137
138    /// Retrieve the policy for a given constraint kind.
139    pub fn for_kind(&self, kind: &crate::constraint::ConstraintKind) -> &ConflictPolicy {
140        match kind {
141            crate::constraint::ConstraintKind::Unique => &self.unique,
142            crate::constraint::ConstraintKind::ForeignKey { .. }
143            | crate::constraint::ConstraintKind::BiTemporalFK { .. } => &self.foreign_key,
144            crate::constraint::ConstraintKind::NotNull => &self.not_null,
145            crate::constraint::ConstraintKind::Check { .. } => &self.check,
146        }
147    }
148}
149
150/// Registry of conflict resolution policies, keyed by collection name.
151#[derive(Debug, Clone, Default)]
152pub struct PolicyRegistry {
153    policies: HashMap<String, CollectionPolicy>,
154}
155
156impl PolicyRegistry {
157    /// Create a new, empty policy registry.
158    pub fn new() -> Self {
159        Self {
160            policies: HashMap::new(),
161        }
162    }
163
164    /// Set the full policy for a collection (overwriting any previous policy).
165    pub fn set(&mut self, collection: &str, policy: CollectionPolicy) {
166        self.policies.insert(collection.to_string(), policy);
167    }
168
169    /// Remove the policy for a collection. Returns `true` if one existed.
170    pub fn remove(&mut self, collection: &str) -> bool {
171        self.policies.remove(collection).is_some()
172    }
173
174    /// Set the policy for a specific constraint kind within a collection.
175    pub fn set_for_kind(
176        &mut self,
177        collection: &str,
178        kind: &crate::constraint::ConstraintKind,
179        policy: ConflictPolicy,
180    ) {
181        let mut coll_policy = self.get_owned(collection);
182        match kind {
183            crate::constraint::ConstraintKind::Unique => coll_policy.unique = policy,
184            crate::constraint::ConstraintKind::ForeignKey { .. }
185            | crate::constraint::ConstraintKind::BiTemporalFK { .. } => {
186                coll_policy.foreign_key = policy
187            }
188            crate::constraint::ConstraintKind::NotNull => coll_policy.not_null = policy,
189            crate::constraint::ConstraintKind::Check { .. } => coll_policy.check = policy,
190        }
191        self.set(collection, coll_policy);
192    }
193
194    /// Retrieve the policy for a collection as an owned value.
195    /// Returns the registered policy if found, otherwise returns the default (ephemeral).
196    pub fn get_owned(&self, collection: &str) -> CollectionPolicy {
197        self.policies
198            .get(collection)
199            .cloned()
200            .unwrap_or_else(CollectionPolicy::ephemeral)
201    }
202
203    /// Check if a collection has an explicit policy registered.
204    pub fn has(&self, collection: &str) -> bool {
205        self.policies.contains_key(collection)
206    }
207
208    /// Total number of registered policies.
209    pub fn len(&self) -> usize {
210        self.policies.len()
211    }
212
213    pub fn is_empty(&self) -> bool {
214        self.policies.is_empty()
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use crate::constraint::ConstraintKind;
222
223    #[test]
224    fn ephemeral_policy_defaults() {
225        let policy = CollectionPolicy::ephemeral();
226        assert!(!policy.strict_consistency);
227        assert!(matches!(policy.unique, ConflictPolicy::RenameSuffix));
228        assert!(matches!(
229            policy.foreign_key,
230            ConflictPolicy::CascadeDefer { .. }
231        ));
232    }
233
234    #[test]
235    fn strict_policy_defaults() {
236        let policy = CollectionPolicy::strict();
237        assert!(policy.strict_consistency);
238        assert!(matches!(policy.unique, ConflictPolicy::EscalateToDlq));
239        assert!(matches!(policy.foreign_key, ConflictPolicy::EscalateToDlq));
240    }
241
242    #[test]
243    fn for_kind_lookup() {
244        let policy = CollectionPolicy::ephemeral();
245
246        let unique_policy = policy.for_kind(&ConstraintKind::Unique);
247        assert!(matches!(unique_policy, ConflictPolicy::RenameSuffix));
248
249        let fk_policy = policy.for_kind(&ConstraintKind::ForeignKey {
250            ref_collection: "users".into(),
251            ref_key: "id".into(),
252        });
253        assert!(matches!(fk_policy, ConflictPolicy::CascadeDefer { .. }));
254    }
255
256    #[test]
257    fn registry_set_and_get() {
258        let mut registry = PolicyRegistry::new();
259        let policy = CollectionPolicy::strict();
260
261        registry.set("agents", policy.clone());
262
263        // Note: Due to lifetime constraints, get returns a static default.
264        // For testing, we verify has() works correctly instead.
265        assert!(registry.has("agents"));
266        assert!(!registry.has("unknown"));
267    }
268
269    #[test]
270    fn registry_set_for_kind() {
271        let mut registry = PolicyRegistry::new();
272
273        // Start with a default policy
274        registry.set("posts", CollectionPolicy::ephemeral());
275
276        // Override UNIQUE policy only
277        registry.set_for_kind(
278            "posts",
279            &ConstraintKind::Unique,
280            ConflictPolicy::LastWriterWins,
281        );
282
283        assert!(registry.has("posts"));
284    }
285
286    #[test]
287    fn registry_len() {
288        let mut registry = PolicyRegistry::new();
289
290        assert_eq!(registry.len(), 0);
291
292        registry.set("coll1", CollectionPolicy::ephemeral());
293        assert_eq!(registry.len(), 1);
294
295        registry.set("coll2", CollectionPolicy::strict());
296        assert_eq!(registry.len(), 2);
297
298        // Overwriting doesn't increase len
299        registry.set("coll1", CollectionPolicy::strict());
300        assert_eq!(registry.len(), 2);
301    }
302
303    #[test]
304    fn conflict_policy_default() {
305        let policy: ConflictPolicy = Default::default();
306        assert!(matches!(policy, ConflictPolicy::LastWriterWins));
307    }
308
309    #[test]
310    fn cascade_defer_exponential_backoff() {
311        // Verify exponential backoff calculation logic.
312        // base: 500ms
313        // attempt 0: 500ms
314        // attempt 1: 500 * 2^1 = 1000ms
315        // attempt 2: 500 * 2^2 = 2000ms
316        // Max: 30s
317        let base_ms = 500u64;
318        for attempt in 0..5 {
319            let backoff = base_ms.saturating_mul(2_u64.saturating_pow(attempt));
320            let capped = backoff.min(30_000);
321            assert!(capped <= 30_000);
322        }
323    }
324}