nodedb_crdt/validator/validate.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Basic validate() + validate_or_reject() entry points.
4
5use crate::CrdtAuthContext;
6use crate::error::{CrdtError, Result};
7use crate::policy::PolicyResolution;
8use crate::state::CrdtState;
9
10use super::core::Validator;
11use super::types::{ProposedChange, ValidationOutcome};
12
13impl Validator {
14 /// Validate a proposed change against all applicable constraints.
15 ///
16 /// Returns `Accepted` if all constraints pass, or `Rejected` with
17 /// detailed violation information.
18 pub fn validate(&self, state: &CrdtState, change: &ProposedChange) -> ValidationOutcome {
19 let constraints = self.constraints.for_collection(&change.collection);
20 let mut violations = Vec::new();
21
22 for constraint in constraints {
23 if let Some(violation) = self.check_constraint(state, change, constraint) {
24 violations.push(violation);
25 }
26 }
27
28 if violations.is_empty() {
29 ValidationOutcome::Accepted
30 } else {
31 ValidationOutcome::Rejected(violations)
32 }
33 }
34
35 /// Validate and apply declarative policy resolution.
36 ///
37 /// ## Replay protection
38 ///
39 /// When `auth.delta_signature` is non-zero, the following steps execute
40 /// in this order to prevent replay attacks at minimum cost:
41 ///
42 /// 1. **Cheap seq_no check** — `seq_no > last_seen[(user_id, device_id)]`.
43 /// Fails fast before any HMAC computation.
44 /// 2. **HMAC verification** — constant-time comparison prevents timing attacks.
45 /// 3. **Atomic seq update** — `last_seen` advances only on success.
46 ///
47 /// For accepted changes, returns Ok(()).
48 /// For violations, applies policy and:
49 /// - If AutoResolved: returns Ok(())
50 /// - If Deferred/Webhook/Escalate: returns appropriate error
51 pub fn validate_or_reject(
52 &mut self,
53 state: &CrdtState,
54 peer_id: u64,
55 auth: CrdtAuthContext,
56 change: &ProposedChange,
57 delta_bytes: Vec<u8>,
58 ) -> Result<()> {
59 // Check auth expiry: agents that accumulated deltas offline must
60 // re-authenticate before syncing.
61 if auth.auth_expires_at > 0 {
62 let now_ms = std::time::SystemTime::now()
63 .duration_since(std::time::UNIX_EPOCH)
64 .unwrap_or_default()
65 .as_millis() as u64;
66 if now_ms > auth.auth_expires_at {
67 return Err(CrdtError::AuthExpired {
68 user_id: auth.user_id,
69 expired_at: auth.auth_expires_at,
70 });
71 }
72 }
73
74 // Replay protection + signature verification (signed path only).
75 //
76 // The unsigned path (all-zeros signature) bypasses replay protection.
77 // Old clients that send device_id=0 / seq_no=0 with a non-zero
78 // signature will be rejected by the seq_no check (0 is never > 0).
79 if auth.delta_signature != [0u8; 32]
80 && let Some(ref verifier) = self.delta_verifier
81 {
82 // Step 1: cheap seq_no check before any HMAC computation.
83 verifier
84 .registry()
85 .check_seq(auth.user_id, auth.device_id, auth.seq_no)?;
86
87 // Step 2: constant-time HMAC verification.
88 verifier.verify(
89 auth.user_id,
90 auth.device_id,
91 auth.seq_no,
92 &delta_bytes,
93 &auth.delta_signature,
94 )?;
95
96 // Step 3: advance last_seen atomically on success.
97 verifier
98 .registry()
99 .commit_seq(auth.user_id, auth.device_id, auth.seq_no)?;
100 }
101
102 let hlc_timestamp = std::time::SystemTime::now()
103 .duration_since(std::time::UNIX_EPOCH)
104 .unwrap_or_default()
105 .as_millis() as u64;
106
107 match self.validate_with_policy(state, peer_id, auth, change, delta_bytes, hlc_timestamp)? {
108 PolicyResolution::AutoResolved(_) => Ok(()),
109 PolicyResolution::Deferred { .. } => {
110 // Violation was deferred for retry; return error to signal this
111 // The deferred entry was already enqueued by validate_with_policy
112 let violations = match self.validate(state, change) {
113 ValidationOutcome::Rejected(v) => v,
114 _ => vec![],
115 };
116 if !violations.is_empty() {
117 let v = &violations[0];
118 Err(CrdtError::ConstraintViolation {
119 constraint: v.constraint_name.clone(),
120 collection: change.collection.clone(),
121 detail: format!("{} (deferred for retry)", v.reason),
122 })
123 } else {
124 Ok(())
125 }
126 }
127 PolicyResolution::WebhookRequired { .. } => {
128 // Webhook decision required; return error
129 let violations = match self.validate(state, change) {
130 ValidationOutcome::Rejected(v) => v,
131 _ => vec![],
132 };
133 if !violations.is_empty() {
134 let v = &violations[0];
135 Err(CrdtError::ConstraintViolation {
136 constraint: v.constraint_name.clone(),
137 collection: change.collection.clone(),
138 detail: format!("{} (webhook required)", v.reason),
139 })
140 } else {
141 Ok(())
142 }
143 }
144 PolicyResolution::Escalate => {
145 // Already enqueued to DLQ by validate_with_policy
146 let violations = match self.validate(state, change) {
147 ValidationOutcome::Rejected(v) => v,
148 _ => vec![],
149 };
150 if !violations.is_empty() {
151 let v = &violations[0];
152 Err(CrdtError::ConstraintViolation {
153 constraint: v.constraint_name.clone(),
154 collection: change.collection.clone(),
155 detail: v.reason.clone(),
156 })
157 } else {
158 Ok(())
159 }
160 }
161 }
162 }
163}