Skip to main content

nodedb_crdt/validator/
policy_dispatch.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Policy resolution dispatch for validation violations.
4
5use crate::CrdtAuthContext;
6use crate::constraint::{Constraint, ConstraintKind};
7use crate::error::Result;
8use crate::policy::{ConflictPolicy, PolicyResolution, ResolvedAction};
9use crate::state::CrdtState;
10
11use super::core::Validator;
12use super::types::{ProposedChange, ValidationOutcome};
13
14impl Validator {
15    /// Validate with declarative policy resolution.
16    ///
17    /// This is the new core validation method. It attempts to resolve violations
18    /// via policy before falling back to the DLQ.
19    ///
20    /// # Arguments
21    ///
22    /// * `state` — current CRDT state
23    /// * `peer_id` — source peer ID
24    /// * `change` — proposed change
25    /// * `delta_bytes` — raw delta bytes
26    /// * `hlc_timestamp` — Hybrid Logical Clock timestamp of the incoming write
27    ///
28    /// Returns:
29    /// - `Ok(PolicyResolution::AutoResolved(_))` if the policy auto-fixed the violation
30    /// - `Ok(PolicyResolution::Deferred { .. })` if deferred for retry (entry already enqueued)
31    /// - `Ok(PolicyResolution::WebhookRequired { .. })` if webhook call needed (caller's responsibility)
32    /// - `Ok(PolicyResolution::Escalate)` if escalating to DLQ (entry already enqueued)
33    /// - `Err(_)` if an internal error occurred
34    pub fn validate_with_policy(
35        &mut self,
36        state: &CrdtState,
37        peer_id: u64,
38        auth: CrdtAuthContext,
39        change: &ProposedChange,
40        delta_bytes: Vec<u8>,
41        hlc_timestamp: u64,
42    ) -> Result<PolicyResolution> {
43        match self.validate(state, change) {
44            ValidationOutcome::Accepted => {
45                // No violation; return synthetic "auto-resolved" to maintain API consistency
46                Ok(PolicyResolution::AutoResolved(
47                    ResolvedAction::OverwriteExisting,
48                ))
49            }
50            ValidationOutcome::Rejected(violations) => {
51                // Exactly one violation per constraint (current design)
52                let v = &violations[0];
53                let constraint = self
54                    .constraints
55                    .all()
56                    .iter()
57                    .find(|c| c.name == v.constraint_name)
58                    .cloned()
59                    .unwrap_or_else(|| Constraint {
60                        name: v.constraint_name.clone(),
61                        collection: change.collection.clone(),
62                        field: String::new(),
63                        kind: ConstraintKind::NotNull,
64                    });
65
66                let policy = self.policies.get_owned(&change.collection);
67                let policy_for_kind = policy.for_kind(&constraint.kind);
68
69                // Attempt policy resolution
70                match policy_for_kind {
71                    ConflictPolicy::LastWriterWins => {
72                        tracing::info!(
73                            constraint = %v.constraint_name,
74                            collection = %change.collection,
75                            timestamp = hlc_timestamp,
76                            reason = %v.reason,
77                            "resolved via LAST_WRITER_WINS"
78                        );
79                        Ok(PolicyResolution::AutoResolved(
80                            ResolvedAction::OverwriteExisting,
81                        ))
82                    }
83
84                    ConflictPolicy::RenameSuffix => {
85                        let counter_key = (change.collection.clone(), constraint.field.clone());
86                        let suffix = self.suffix_counter.entry(counter_key).or_insert(0);
87                        *suffix += 1;
88                        let new_value = format!(
89                            "{}_{}",
90                            change
91                                .fields
92                                .iter()
93                                .find(|(f, _)| f == &constraint.field)
94                                .map(|(_, v)| format!("{:?}", v))
95                                .unwrap_or_else(|| "unknown".to_string()),
96                            suffix
97                        );
98
99                        tracing::info!(
100                            constraint = %v.constraint_name,
101                            field = %constraint.field,
102                            new_value = %new_value,
103                            "resolved via RENAME_APPEND_SUFFIX"
104                        );
105
106                        Ok(PolicyResolution::AutoResolved(
107                            ResolvedAction::RenamedField {
108                                field: constraint.field.clone(),
109                                new_value,
110                            },
111                        ))
112                    }
113
114                    ConflictPolicy::CascadeDefer {
115                        max_retries,
116                        ttl_secs,
117                    } => {
118                        let now_ms = std::time::SystemTime::now()
119                            .duration_since(std::time::UNIX_EPOCH)
120                            .unwrap_or_default()
121                            .as_millis() as u64;
122
123                        let base_ms = 500u64;
124                        let first_retry_after_ms = base_ms;
125
126                        let id = self.deferred.enqueue(
127                            peer_id,
128                            auth.user_id,
129                            auth.tenant_id,
130                            delta_bytes,
131                            change.collection.clone(),
132                            constraint.name.clone(),
133                            0,
134                            *max_retries,
135                            now_ms,
136                            first_retry_after_ms,
137                            *ttl_secs,
138                        );
139
140                        tracing::info!(
141                            constraint = %v.constraint_name,
142                            deferred_id = id,
143                            reason = %v.reason,
144                            "resolved via CASCADE_DEFER (queued for retry)"
145                        );
146
147                        Ok(PolicyResolution::Deferred {
148                            retry_after_ms: first_retry_after_ms,
149                            attempt: 0,
150                        })
151                    }
152
153                    ConflictPolicy::Custom {
154                        webhook_url,
155                        timeout_secs,
156                    } => {
157                        tracing::info!(
158                            constraint = %v.constraint_name,
159                            webhook_url = %webhook_url,
160                            "escalated to webhook"
161                        );
162
163                        Ok(PolicyResolution::WebhookRequired {
164                            webhook_url: webhook_url.clone(),
165                            timeout_secs: *timeout_secs,
166                        })
167                    }
168
169                    ConflictPolicy::EscalateToDlq => {
170                        self.dlq.enqueue(
171                            peer_id,
172                            auth.user_id,
173                            auth.tenant_id,
174                            delta_bytes,
175                            &constraint,
176                            v.reason.clone(),
177                            v.hint.clone(),
178                        )?;
179
180                        tracing::info!(
181                            constraint = %v.constraint_name,
182                            collection = %change.collection,
183                            "escalated to DLQ"
184                        );
185
186                        Ok(PolicyResolution::Escalate)
187                    }
188                }
189            }
190        }
191    }
192}