nodedb_crdt/validator/policy_dispatch.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Policy resolution dispatch for validation violations.
4
5use crate::CrdtAuthContext;
6use crate::constraint::{Constraint, ConstraintKind};
7use crate::error::Result;
8use crate::policy::{ConflictPolicy, PolicyResolution, ResolvedAction};
9use crate::state::CrdtState;
10
11use super::core::Validator;
12use super::types::{ProposedChange, ValidationOutcome};
13
14impl Validator {
15 /// Validate with declarative policy resolution.
16 ///
17 /// This is the new core validation method. It attempts to resolve violations
18 /// via policy before falling back to the DLQ.
19 ///
20 /// # Arguments
21 ///
22 /// * `state` — current CRDT state
23 /// * `peer_id` — source peer ID
24 /// * `change` — proposed change
25 /// * `delta_bytes` — raw delta bytes
26 /// * `hlc_timestamp` — Hybrid Logical Clock timestamp of the incoming write
27 ///
28 /// Returns:
29 /// - `Ok(PolicyResolution::AutoResolved(_))` if the policy auto-fixed the violation
30 /// - `Ok(PolicyResolution::Deferred { .. })` if deferred for retry (entry already enqueued)
31 /// - `Ok(PolicyResolution::WebhookRequired { .. })` if webhook call needed (caller's responsibility)
32 /// - `Ok(PolicyResolution::Escalate)` if escalating to DLQ (entry already enqueued)
33 /// - `Err(_)` if an internal error occurred
34 pub fn validate_with_policy(
35 &mut self,
36 state: &CrdtState,
37 peer_id: u64,
38 auth: CrdtAuthContext,
39 change: &ProposedChange,
40 delta_bytes: Vec<u8>,
41 hlc_timestamp: u64,
42 ) -> Result<PolicyResolution> {
43 match self.validate(state, change) {
44 ValidationOutcome::Accepted => {
45 // No violation; return synthetic "auto-resolved" to maintain API consistency
46 Ok(PolicyResolution::AutoResolved(
47 ResolvedAction::OverwriteExisting,
48 ))
49 }
50 ValidationOutcome::Rejected(violations) => {
51 // Exactly one violation per constraint (current design)
52 let v = &violations[0];
53 let constraint = self
54 .constraints
55 .all()
56 .iter()
57 .find(|c| c.name == v.constraint_name)
58 .cloned()
59 .unwrap_or_else(|| Constraint {
60 name: v.constraint_name.clone(),
61 collection: change.collection.clone(),
62 field: String::new(),
63 kind: ConstraintKind::NotNull,
64 });
65
66 let policy = self.policies.get_owned(&change.collection);
67 let policy_for_kind = policy.for_kind(&constraint.kind);
68
69 // Attempt policy resolution
70 match policy_for_kind {
71 ConflictPolicy::LastWriterWins => {
72 tracing::info!(
73 constraint = %v.constraint_name,
74 collection = %change.collection,
75 timestamp = hlc_timestamp,
76 reason = %v.reason,
77 "resolved via LAST_WRITER_WINS"
78 );
79 Ok(PolicyResolution::AutoResolved(
80 ResolvedAction::OverwriteExisting,
81 ))
82 }
83
84 ConflictPolicy::RenameSuffix => {
85 let counter_key = (change.collection.clone(), constraint.field.clone());
86 let suffix = self.suffix_counter.entry(counter_key).or_insert(0);
87 *suffix += 1;
88 let new_value = format!(
89 "{}_{}",
90 change
91 .fields
92 .iter()
93 .find(|(f, _)| f == &constraint.field)
94 .map(|(_, v)| format!("{:?}", v))
95 .unwrap_or_else(|| "unknown".to_string()),
96 suffix
97 );
98
99 tracing::info!(
100 constraint = %v.constraint_name,
101 field = %constraint.field,
102 new_value = %new_value,
103 "resolved via RENAME_APPEND_SUFFIX"
104 );
105
106 Ok(PolicyResolution::AutoResolved(
107 ResolvedAction::RenamedField {
108 field: constraint.field.clone(),
109 new_value,
110 },
111 ))
112 }
113
114 ConflictPolicy::CascadeDefer {
115 max_retries,
116 ttl_secs,
117 } => {
118 let now_ms = std::time::SystemTime::now()
119 .duration_since(std::time::UNIX_EPOCH)
120 .unwrap_or_default()
121 .as_millis() as u64;
122
123 let base_ms = 500u64;
124 let first_retry_after_ms = base_ms;
125
126 let id = self.deferred.enqueue(
127 peer_id,
128 auth.user_id,
129 auth.tenant_id,
130 delta_bytes,
131 change.collection.clone(),
132 constraint.name.clone(),
133 0,
134 *max_retries,
135 now_ms,
136 first_retry_after_ms,
137 *ttl_secs,
138 );
139
140 tracing::info!(
141 constraint = %v.constraint_name,
142 deferred_id = id,
143 reason = %v.reason,
144 "resolved via CASCADE_DEFER (queued for retry)"
145 );
146
147 Ok(PolicyResolution::Deferred {
148 retry_after_ms: first_retry_after_ms,
149 attempt: 0,
150 })
151 }
152
153 ConflictPolicy::Custom {
154 webhook_url,
155 timeout_secs,
156 } => {
157 tracing::info!(
158 constraint = %v.constraint_name,
159 webhook_url = %webhook_url,
160 "escalated to webhook"
161 );
162
163 Ok(PolicyResolution::WebhookRequired {
164 webhook_url: webhook_url.clone(),
165 timeout_secs: *timeout_secs,
166 })
167 }
168
169 ConflictPolicy::EscalateToDlq => {
170 self.dlq.enqueue(
171 peer_id,
172 auth.user_id,
173 auth.tenant_id,
174 delta_bytes,
175 &constraint,
176 v.reason.clone(),
177 v.hint.clone(),
178 )?;
179
180 tracing::info!(
181 constraint = %v.constraint_name,
182 collection = %change.collection,
183 "escalated to DLQ"
184 );
185
186 Ok(PolicyResolution::Escalate)
187 }
188 }
189 }
190 }
191 }
192}