Skip to main content

nodedb_crdt/validator/
validate.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Basic validate() + validate_or_reject() entry points.
4
5use crate::CrdtAuthContext;
6use crate::error::{CrdtError, Result};
7use crate::policy::PolicyResolution;
8use crate::state::CrdtState;
9
10use super::core::Validator;
11use super::types::{ProposedChange, ValidationOutcome};
12
13impl Validator {
14    /// Validate a proposed change against all applicable constraints.
15    ///
16    /// Returns `Accepted` if all constraints pass, or `Rejected` with
17    /// detailed violation information.
18    pub fn validate(&self, state: &CrdtState, change: &ProposedChange) -> ValidationOutcome {
19        let constraints = self.constraints.for_collection(&change.collection);
20        let mut violations = Vec::new();
21
22        for constraint in constraints {
23            if let Some(violation) = self.check_constraint(state, change, constraint) {
24                violations.push(violation);
25            }
26        }
27
28        if violations.is_empty() {
29            ValidationOutcome::Accepted
30        } else {
31            ValidationOutcome::Rejected(violations)
32        }
33    }
34
35    /// Validate and apply declarative policy resolution.
36    ///
37    /// ## Replay protection
38    ///
39    /// When `auth.delta_signature` is non-zero, the following steps execute
40    /// in this order to prevent replay attacks at minimum cost:
41    ///
42    /// 1. **Cheap seq_no check** — `seq_no > last_seen[(user_id, device_id)]`.
43    ///    Fails fast before any HMAC computation.
44    /// 2. **HMAC verification** — constant-time comparison prevents timing attacks.
45    /// 3. **Atomic seq update** — `last_seen` advances only on success.
46    ///
47    /// For accepted changes, returns Ok(()).
48    /// For violations, applies policy and:
49    /// - If AutoResolved: returns Ok(())
50    /// - If Deferred/Webhook/Escalate: returns appropriate error
51    pub fn validate_or_reject(
52        &mut self,
53        state: &CrdtState,
54        peer_id: u64,
55        auth: CrdtAuthContext,
56        change: &ProposedChange,
57        delta_bytes: Vec<u8>,
58    ) -> Result<()> {
59        // Check auth expiry: agents that accumulated deltas offline must
60        // re-authenticate before syncing.
61        if auth.auth_expires_at > 0 {
62            let now_ms = std::time::SystemTime::now()
63                .duration_since(std::time::UNIX_EPOCH)
64                .unwrap_or_default()
65                .as_millis() as u64;
66            if now_ms > auth.auth_expires_at {
67                return Err(CrdtError::AuthExpired {
68                    user_id: auth.user_id,
69                    expired_at: auth.auth_expires_at,
70                });
71            }
72        }
73
74        // Replay protection + signature verification (signed path only).
75        //
76        // The unsigned path (all-zeros signature) bypasses replay protection.
77        // Old clients that send device_id=0 / seq_no=0 with a non-zero
78        // signature will be rejected by the seq_no check (0 is never > 0).
79        if auth.delta_signature != [0u8; 32]
80            && let Some(ref verifier) = self.delta_verifier
81        {
82            // Step 1: cheap seq_no check before any HMAC computation.
83            verifier
84                .registry()
85                .check_seq(auth.user_id, auth.device_id, auth.seq_no)?;
86
87            // Step 2: constant-time HMAC verification.
88            verifier.verify(
89                auth.user_id,
90                auth.device_id,
91                auth.seq_no,
92                &delta_bytes,
93                &auth.delta_signature,
94            )?;
95
96            // Step 3: advance last_seen atomically on success.
97            verifier
98                .registry()
99                .commit_seq(auth.user_id, auth.device_id, auth.seq_no)?;
100        }
101
102        let hlc_timestamp = std::time::SystemTime::now()
103            .duration_since(std::time::UNIX_EPOCH)
104            .unwrap_or_default()
105            .as_millis() as u64;
106
107        match self.validate_with_policy(state, peer_id, auth, change, delta_bytes, hlc_timestamp)? {
108            PolicyResolution::AutoResolved(_) => Ok(()),
109            PolicyResolution::Deferred { .. } => {
110                // Violation was deferred for retry; return error to signal this
111                // The deferred entry was already enqueued by validate_with_policy
112                let violations = match self.validate(state, change) {
113                    ValidationOutcome::Rejected(v) => v,
114                    _ => vec![],
115                };
116                if !violations.is_empty() {
117                    let v = &violations[0];
118                    Err(CrdtError::ConstraintViolation {
119                        constraint: v.constraint_name.clone(),
120                        collection: change.collection.clone(),
121                        detail: format!("{} (deferred for retry)", v.reason),
122                    })
123                } else {
124                    Ok(())
125                }
126            }
127            PolicyResolution::WebhookRequired { .. } => {
128                // Webhook decision required; return error
129                let violations = match self.validate(state, change) {
130                    ValidationOutcome::Rejected(v) => v,
131                    _ => vec![],
132                };
133                if !violations.is_empty() {
134                    let v = &violations[0];
135                    Err(CrdtError::ConstraintViolation {
136                        constraint: v.constraint_name.clone(),
137                        collection: change.collection.clone(),
138                        detail: format!("{} (webhook required)", v.reason),
139                    })
140                } else {
141                    Ok(())
142                }
143            }
144            PolicyResolution::Escalate => {
145                // Already enqueued to DLQ by validate_with_policy
146                let violations = match self.validate(state, change) {
147                    ValidationOutcome::Rejected(v) => v,
148                    _ => vec![],
149                };
150                if !violations.is_empty() {
151                    let v = &violations[0];
152                    Err(CrdtError::ConstraintViolation {
153                        constraint: v.constraint_name.clone(),
154                        collection: change.collection.clone(),
155                        detail: v.reason.clone(),
156                    })
157                } else {
158                    Ok(())
159                }
160            }
161        }
162    }
163}