Skip to main content

nodedb_crdt/
validator.rs

1//! Constraint validator — checks deltas against committed state.
2//!
3//! This is the core of the CRDT/SQL bridge. When a delta arrives from a peer,
4//! the validator checks all applicable constraints against the leader's
5//! committed state. If any constraint is violated, the delta is rejected
6//! with a compensation hint.
7
8use crate::CrdtAuthContext;
9use crate::constraint::{Constraint, ConstraintKind, ConstraintSet};
10use crate::dead_letter::{CompensationHint, DeadLetterQueue};
11use crate::deferred::DeferredQueue;
12use crate::error::{CrdtError, Result};
13use crate::policy::{ConflictPolicy, PolicyRegistry, PolicyResolution, ResolvedAction};
14use crate::signing::DeltaSigner;
15use crate::state::CrdtState;
16
17use loro::LoroValue;
18use std::collections::HashMap;
19
20/// Outcome of validating a proposed change against constraints.
21#[derive(Debug)]
22pub enum ValidationOutcome {
23    /// All constraints satisfied — safe to commit.
24    Accepted,
25    /// One or more constraints violated — delta rejected.
26    Rejected(Vec<Violation>),
27}
28
29/// A single constraint violation.
30#[derive(Debug, Clone)]
31pub struct Violation {
32    /// The constraint that was violated.
33    pub constraint_name: String,
34    /// Human-readable reason.
35    pub reason: String,
36    /// Suggested fix.
37    pub hint: CompensationHint,
38}
39
40/// A proposed row change to validate.
41#[derive(Debug, Clone)]
42pub struct ProposedChange {
43    /// Target collection.
44    pub collection: String,
45    /// Row ID being inserted/updated.
46    pub row_id: String,
47    /// Field values being set.
48    pub fields: Vec<(String, LoroValue)>,
49}
50
51/// The constraint validator.
52///
53/// Validates proposed changes against a set of constraints and the current
54/// committed state. Violations are resolved via declarative policies:
55/// - AUTO_RESOLVED — policy handles it (e.g., LAST_WRITER_WINS)
56/// - DEFERRED — queued for exponential backoff retry (CASCADE_DEFER)
57/// - WEBHOOK_REQUIRED — caller must POST to webhook for decision
58/// - ESCALATE — route to dead-letter queue (fallback)
59pub struct Validator {
60    constraints: ConstraintSet,
61    dlq: DeadLetterQueue,
62    policies: PolicyRegistry,
63    deferred: DeferredQueue,
64    /// Monotonic suffix counter: (collection, field) -> next suffix number
65    suffix_counter: HashMap<(String, String), u64>,
66    /// Optional delta signature verifier. When set, signed deltas are
67    /// verified before constraint validation.
68    delta_verifier: Option<DeltaSigner>,
69}
70
71impl Validator {
72    /// Create a new validator with default (ephemeral) policies.
73    pub fn new(constraints: ConstraintSet, dlq_capacity: usize) -> Self {
74        Self::new_with_policies(constraints, dlq_capacity, PolicyRegistry::new(), 1000)
75    }
76
77    /// Create a new validator with custom policies and deferred queue.
78    pub fn new_with_policies(
79        constraints: ConstraintSet,
80        dlq_capacity: usize,
81        policies: PolicyRegistry,
82        deferred_capacity: usize,
83    ) -> Self {
84        Self {
85            constraints,
86            dlq: DeadLetterQueue::new(dlq_capacity),
87            policies,
88            deferred: DeferredQueue::new(deferred_capacity),
89            suffix_counter: HashMap::new(),
90            delta_verifier: None,
91        }
92    }
93
94    /// Validate a proposed change against all applicable constraints.
95    ///
96    /// Returns `Accepted` if all constraints pass, or `Rejected` with
97    /// detailed violation information.
98    pub fn validate(&self, state: &CrdtState, change: &ProposedChange) -> ValidationOutcome {
99        let constraints = self.constraints.for_collection(&change.collection);
100        let mut violations = Vec::new();
101
102        for constraint in constraints {
103            if let Some(violation) = self.check_constraint(state, change, constraint) {
104                violations.push(violation);
105            }
106        }
107
108        if violations.is_empty() {
109            ValidationOutcome::Accepted
110        } else {
111            ValidationOutcome::Rejected(violations)
112        }
113    }
114
115    /// Validate and apply declarative policy resolution.
116    ///
117    /// This is the modern API. It uses validate_with_policy internally.
118    /// For accepted changes, returns Ok(()).
119    /// For violations, applies policy and:
120    /// - If AutoResolved: returns Ok(())
121    /// - If Deferred/Webhook/Escalate: returns appropriate error
122    pub fn validate_or_reject(
123        &mut self,
124        state: &CrdtState,
125        peer_id: u64,
126        auth: CrdtAuthContext,
127        change: &ProposedChange,
128        delta_bytes: Vec<u8>,
129    ) -> Result<()> {
130        // Check auth expiry: agents that accumulated deltas offline must
131        // re-authenticate before syncing.
132        if auth.auth_expires_at > 0 {
133            let now_ms = std::time::SystemTime::now()
134                .duration_since(std::time::UNIX_EPOCH)
135                .unwrap_or_default()
136                .as_millis() as u64;
137            if now_ms > auth.auth_expires_at {
138                return Err(CrdtError::AuthExpired {
139                    user_id: auth.user_id,
140                    expired_at: auth.auth_expires_at,
141                });
142            }
143        }
144
145        // Verify delta signature if present (non-zero).
146        if auth.delta_signature != [0u8; 32]
147            && let Some(ref verifier) = self.delta_verifier
148        {
149            verifier.verify(auth.user_id, &delta_bytes, &auth.delta_signature)?;
150        }
151
152        let hlc_timestamp = std::time::SystemTime::now()
153            .duration_since(std::time::UNIX_EPOCH)
154            .unwrap_or_default()
155            .as_millis() as u64;
156
157        match self.validate_with_policy(state, peer_id, auth, change, delta_bytes, hlc_timestamp)? {
158            PolicyResolution::AutoResolved(_) => Ok(()),
159            PolicyResolution::Deferred { .. } => {
160                // Violation was deferred for retry; return error to signal this
161                // The deferred entry was already enqueued by validate_with_policy
162                let violations = match self.validate(state, change) {
163                    ValidationOutcome::Rejected(v) => v,
164                    _ => vec![],
165                };
166                if !violations.is_empty() {
167                    let v = &violations[0];
168                    Err(CrdtError::ConstraintViolation {
169                        constraint: v.constraint_name.clone(),
170                        collection: change.collection.clone(),
171                        detail: format!("{} (deferred for retry)", v.reason),
172                    })
173                } else {
174                    Ok(())
175                }
176            }
177            PolicyResolution::WebhookRequired { .. } => {
178                // Webhook decision required; return error
179                let violations = match self.validate(state, change) {
180                    ValidationOutcome::Rejected(v) => v,
181                    _ => vec![],
182                };
183                if !violations.is_empty() {
184                    let v = &violations[0];
185                    Err(CrdtError::ConstraintViolation {
186                        constraint: v.constraint_name.clone(),
187                        collection: change.collection.clone(),
188                        detail: format!("{} (webhook required)", v.reason),
189                    })
190                } else {
191                    Ok(())
192                }
193            }
194            PolicyResolution::Escalate => {
195                // Already enqueued to DLQ by validate_with_policy
196                let violations = match self.validate(state, change) {
197                    ValidationOutcome::Rejected(v) => v,
198                    _ => vec![],
199                };
200                if !violations.is_empty() {
201                    let v = &violations[0];
202                    Err(CrdtError::ConstraintViolation {
203                        constraint: v.constraint_name.clone(),
204                        collection: change.collection.clone(),
205                        detail: v.reason.clone(),
206                    })
207                } else {
208                    Ok(())
209                }
210            }
211        }
212    }
213
214    /// Validate with declarative policy resolution.
215    ///
216    /// This is the new core validation method. It attempts to resolve violations
217    /// via policy before falling back to the DLQ.
218    ///
219    /// # Arguments
220    ///
221    /// * `state` — current CRDT state
222    /// * `peer_id` — source peer ID
223    /// * `change` — proposed change
224    /// * `delta_bytes` — raw delta bytes
225    /// * `hlc_timestamp` — Hybrid Logical Clock timestamp of the incoming write
226    ///
227    /// Returns:
228    /// - `Ok(PolicyResolution::AutoResolved(_))` if the policy auto-fixed the violation
229    /// - `Ok(PolicyResolution::Deferred { .. })` if deferred for retry (entry already enqueued)
230    /// - `Ok(PolicyResolution::WebhookRequired { .. })` if webhook call needed (caller's responsibility)
231    /// - `Ok(PolicyResolution::Escalate)` if escalating to DLQ (entry already enqueued)
232    /// - `Err(_)` if an internal error occurred
233    pub fn validate_with_policy(
234        &mut self,
235        state: &CrdtState,
236        peer_id: u64,
237        auth: CrdtAuthContext,
238        change: &ProposedChange,
239        delta_bytes: Vec<u8>,
240        hlc_timestamp: u64,
241    ) -> Result<PolicyResolution> {
242        match self.validate(state, change) {
243            ValidationOutcome::Accepted => {
244                // No violation; return synthetic "auto-resolved" to maintain API consistency
245                Ok(PolicyResolution::AutoResolved(
246                    ResolvedAction::OverwriteExisting,
247                ))
248            }
249            ValidationOutcome::Rejected(violations) => {
250                // Exactly one violation per constraint (current design)
251                let v = &violations[0];
252                let constraint = self
253                    .constraints
254                    .all()
255                    .iter()
256                    .find(|c| c.name == v.constraint_name)
257                    .cloned()
258                    .unwrap_or_else(|| Constraint {
259                        name: v.constraint_name.clone(),
260                        collection: change.collection.clone(),
261                        field: String::new(),
262                        kind: ConstraintKind::NotNull,
263                    });
264
265                let policy = self.policies.get_owned(&change.collection);
266                let policy_for_kind = policy.for_kind(&constraint.kind);
267
268                // Attempt policy resolution
269                match policy_for_kind {
270                    ConflictPolicy::LastWriterWins => {
271                        // LWW: incoming write always wins; log audit entry
272                        tracing::info!(
273                            constraint = %v.constraint_name,
274                            collection = %change.collection,
275                            timestamp = hlc_timestamp,
276                            reason = %v.reason,
277                            "resolved via LAST_WRITER_WINS"
278                        );
279                        Ok(PolicyResolution::AutoResolved(
280                            ResolvedAction::OverwriteExisting,
281                        ))
282                    }
283
284                    ConflictPolicy::RenameSuffix => {
285                        // Auto-rename conflicting field
286                        let counter_key = (change.collection.clone(), constraint.field.clone());
287                        let suffix = self.suffix_counter.entry(counter_key).or_insert(0);
288                        *suffix += 1;
289                        let new_value = format!(
290                            "{}_{}",
291                            change
292                                .fields
293                                .iter()
294                                .find(|(f, _)| f == &constraint.field)
295                                .map(|(_, v)| format!("{:?}", v))
296                                .unwrap_or_else(|| "unknown".to_string()),
297                            suffix
298                        );
299
300                        tracing::info!(
301                            constraint = %v.constraint_name,
302                            field = %constraint.field,
303                            new_value = %new_value,
304                            "resolved via RENAME_APPEND_SUFFIX"
305                        );
306
307                        Ok(PolicyResolution::AutoResolved(
308                            ResolvedAction::RenamedField {
309                                field: constraint.field.clone(),
310                                new_value,
311                            },
312                        ))
313                    }
314
315                    ConflictPolicy::CascadeDefer {
316                        max_retries,
317                        ttl_secs,
318                    } => {
319                        // Enqueue for exponential backoff retry
320                        let now_ms = std::time::SystemTime::now()
321                            .duration_since(std::time::UNIX_EPOCH)
322                            .unwrap_or_default()
323                            .as_millis() as u64;
324
325                        let base_ms = 500u64;
326                        let first_retry_after_ms = base_ms;
327
328                        let id = self.deferred.enqueue(
329                            peer_id,
330                            auth.user_id,
331                            auth.tenant_id,
332                            delta_bytes,
333                            change.collection.clone(),
334                            constraint.name.clone(),
335                            0,
336                            *max_retries,
337                            now_ms,
338                            first_retry_after_ms,
339                            *ttl_secs,
340                        );
341
342                        tracing::info!(
343                            constraint = %v.constraint_name,
344                            deferred_id = id,
345                            reason = %v.reason,
346                            "resolved via CASCADE_DEFER (queued for retry)"
347                        );
348
349                        Ok(PolicyResolution::Deferred {
350                            retry_after_ms: first_retry_after_ms,
351                            attempt: 0,
352                        })
353                    }
354
355                    ConflictPolicy::Custom {
356                        webhook_url,
357                        timeout_secs,
358                    } => {
359                        // Webhook decision required
360                        tracing::info!(
361                            constraint = %v.constraint_name,
362                            webhook_url = %webhook_url,
363                            "escalated to webhook"
364                        );
365
366                        Ok(PolicyResolution::WebhookRequired {
367                            webhook_url: webhook_url.clone(),
368                            timeout_secs: *timeout_secs,
369                        })
370                    }
371
372                    ConflictPolicy::EscalateToDlq => {
373                        // Explicit fallback to DLQ
374                        self.dlq.enqueue(
375                            peer_id,
376                            auth.user_id,
377                            auth.tenant_id,
378                            delta_bytes,
379                            &constraint,
380                            v.reason.clone(),
381                            v.hint.clone(),
382                        )?;
383
384                        tracing::info!(
385                            constraint = %v.constraint_name,
386                            collection = %change.collection,
387                            "escalated to DLQ"
388                        );
389
390                        Ok(PolicyResolution::Escalate)
391                    }
392                }
393            }
394        }
395    }
396
397    /// Access the dead-letter queue.
398    pub fn dlq(&self) -> &DeadLetterQueue {
399        &self.dlq
400    }
401
402    /// Mutable access to the DLQ (for dequeue/retry).
403    pub fn dlq_mut(&mut self) -> &mut DeadLetterQueue {
404        &mut self.dlq
405    }
406
407    /// Access the policy registry.
408    pub fn policies(&self) -> &PolicyRegistry {
409        &self.policies
410    }
411
412    /// Mutable access to the policy registry.
413    pub fn policies_mut(&mut self) -> &mut PolicyRegistry {
414        &mut self.policies
415    }
416
417    /// Access the deferred queue.
418    pub fn deferred(&self) -> &DeferredQueue {
419        &self.deferred
420    }
421
422    /// Mutable access to the deferred queue.
423    pub fn deferred_mut(&mut self) -> &mut DeferredQueue {
424        &mut self.deferred
425    }
426
427    /// Set the delta signature verifier. When set, deltas with non-zero
428    /// signatures in their CrdtAuthContext will be verified before validation.
429    pub fn set_delta_verifier(&mut self, verifier: DeltaSigner) {
430        self.delta_verifier = Some(verifier);
431    }
432
433    /// Access the delta verifier.
434    pub fn delta_verifier(&self) -> Option<&DeltaSigner> {
435        self.delta_verifier.as_ref()
436    }
437
438    /// Mutable access to the delta verifier.
439    pub fn delta_verifier_mut(&mut self) -> Option<&mut DeltaSigner> {
440        self.delta_verifier.as_mut()
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    fn setup() -> (CrdtState, ConstraintSet) {
449        let state = CrdtState::new(1).unwrap();
450        let mut cs = ConstraintSet::new();
451        cs.add_unique("users_email_unique", "users", "email");
452        cs.add_not_null("users_name_nn", "users", "name");
453        cs.add_foreign_key("posts_author_fk", "posts", "author_id", "users", "id");
454        (state, cs)
455    }
456
457    #[test]
458    fn valid_insert_accepted() {
459        let (state, cs) = setup();
460        let validator = Validator::new(cs, 100);
461
462        let change = ProposedChange {
463            collection: "users".into(),
464            row_id: "u1".into(),
465            fields: vec![
466                ("name".into(), LoroValue::String("Alice".into())),
467                (
468                    "email".into(),
469                    LoroValue::String("alice@example.com".into()),
470                ),
471            ],
472        };
473
474        assert!(matches!(
475            validator.validate(&state, &change),
476            ValidationOutcome::Accepted
477        ));
478    }
479
480    #[test]
481    fn not_null_violation() {
482        let (_state, cs) = setup();
483        let validator = Validator::new(cs, 100);
484        let state = CrdtState::new(1).unwrap();
485
486        let change = ProposedChange {
487            collection: "users".into(),
488            row_id: "u1".into(),
489            fields: vec![
490                // Missing "name" field — violates NOT NULL.
491                (
492                    "email".into(),
493                    LoroValue::String("alice@example.com".into()),
494                ),
495            ],
496        };
497
498        match validator.validate(&state, &change) {
499            ValidationOutcome::Rejected(v) => {
500                assert_eq!(v.len(), 1);
501                assert_eq!(v[0].constraint_name, "users_name_nn");
502                assert!(matches!(
503                    v[0].hint,
504                    CompensationHint::ProvideRequiredField { .. }
505                ));
506            }
507            _ => panic!("expected rejection"),
508        }
509    }
510
511    #[test]
512    fn foreign_key_violation() {
513        let (state, cs) = setup();
514        let validator = Validator::new(cs, 100);
515
516        // No users exist — FK to users.id should fail.
517        let change = ProposedChange {
518            collection: "posts".into(),
519            row_id: "p1".into(),
520            fields: vec![("author_id".into(), LoroValue::String("u1".into()))],
521        };
522
523        match validator.validate(&state, &change) {
524            ValidationOutcome::Rejected(v) => {
525                assert_eq!(v[0].constraint_name, "posts_author_fk");
526                assert!(matches!(
527                    v[0].hint,
528                    CompensationHint::CreateReferencedRow { .. }
529                ));
530            }
531            _ => panic!("expected FK violation"),
532        }
533    }
534
535    #[test]
536    fn foreign_key_passes_when_parent_exists() {
537        let (state, cs) = setup();
538        let validator = Validator::new(cs, 100);
539
540        // Create the referenced user first.
541        state
542            .upsert(
543                "users",
544                "u1",
545                &[
546                    ("name", LoroValue::String("Alice".into())),
547                    ("email", LoroValue::String("a@b.com".into())),
548                ],
549            )
550            .unwrap();
551
552        let change = ProposedChange {
553            collection: "posts".into(),
554            row_id: "p1".into(),
555            fields: vec![("author_id".into(), LoroValue::String("u1".into()))],
556        };
557
558        assert!(matches!(
559            validator.validate(&state, &change),
560            ValidationOutcome::Accepted
561        ));
562    }
563
564    #[test]
565    fn validate_or_reject_enqueues_to_dlq() {
566        let (state, cs) = setup();
567        // Create policies with strict mode (escalates to DLQ)
568        let policies = crate::policy::PolicyRegistry::new();
569        let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
570
571        // Set a strict policy for users collection
572        let strict_policy = crate::policy::CollectionPolicy::strict();
573        validator.policies_mut().set("users", strict_policy);
574
575        let change = ProposedChange {
576            collection: "users".into(),
577            row_id: "u1".into(),
578            fields: vec![
579                // Missing "name" — violates NOT NULL.
580                ("email".into(), LoroValue::String("a@b.com".into())),
581            ],
582        };
583
584        let err = validator
585            .validate_or_reject(
586                &state,
587                42,
588                CrdtAuthContext::default(),
589                &change,
590                b"delta-bytes".to_vec(),
591            )
592            .unwrap_err();
593
594        assert!(matches!(err, CrdtError::ConstraintViolation { .. }));
595        assert_eq!(validator.dlq().len(), 1);
596
597        let dl = validator.dlq().peek().unwrap();
598        assert_eq!(dl.peer_id, 42);
599        assert_eq!(dl.violated_constraint, "users_name_nn");
600    }
601
602    #[test]
603    fn validate_with_policy_last_writer_wins() {
604        let (state, cs) = setup();
605        let mut policies = crate::policy::PolicyRegistry::new();
606        policies.set("users", crate::policy::CollectionPolicy::ephemeral());
607
608        // Override to use explicit LWW
609        let mut policy = crate::policy::CollectionPolicy::ephemeral();
610        policy.not_null = crate::policy::ConflictPolicy::LastWriterWins;
611        policies.set("users", policy);
612
613        let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
614
615        let change = ProposedChange {
616            collection: "users".into(),
617            row_id: "u1".into(),
618            fields: vec![
619                // Missing "name" — should be resolved via LWW
620                ("email".into(), LoroValue::String("a@b.com".into())),
621            ],
622        };
623
624        let now = std::time::SystemTime::now()
625            .duration_since(std::time::UNIX_EPOCH)
626            .unwrap()
627            .as_millis() as u64;
628
629        let resolution = validator
630            .validate_with_policy(
631                &state,
632                42,
633                CrdtAuthContext::default(),
634                &change,
635                b"delta".to_vec(),
636                now,
637            )
638            .unwrap();
639
640        assert!(matches!(
641            resolution,
642            PolicyResolution::AutoResolved(ResolvedAction::OverwriteExisting)
643        ));
644    }
645
646    #[test]
647    fn validate_with_policy_rename_suffix() {
648        let (state, cs) = setup();
649        let mut policies = crate::policy::PolicyRegistry::new();
650        let mut policy = crate::policy::CollectionPolicy::ephemeral();
651        policy.unique = crate::policy::ConflictPolicy::RenameSuffix;
652        policies.set("users", policy);
653
654        let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
655
656        // First user exists
657        state
658            .upsert(
659                "users",
660                "u1",
661                &[
662                    ("name", LoroValue::String("Alice".into())),
663                    ("email", LoroValue::String("alice@example.com".into())),
664                ],
665            )
666            .unwrap();
667
668        // Second user tries same email — should be auto-renamed
669        let change = ProposedChange {
670            collection: "users".into(),
671            row_id: "u2".into(),
672            fields: vec![
673                ("name".into(), LoroValue::String("Bob".into())),
674                (
675                    "email".into(),
676                    LoroValue::String("alice@example.com".into()),
677                ),
678            ],
679        };
680
681        let now = std::time::SystemTime::now()
682            .duration_since(std::time::UNIX_EPOCH)
683            .unwrap()
684            .as_millis() as u64;
685
686        let resolution = validator
687            .validate_with_policy(
688                &state,
689                42,
690                CrdtAuthContext::default(),
691                &change,
692                b"delta".to_vec(),
693                now,
694            )
695            .unwrap();
696
697        match resolution {
698            PolicyResolution::AutoResolved(ResolvedAction::RenamedField { field, new_value }) => {
699                assert_eq!(field, "email");
700                assert!(new_value.contains("_1"));
701            }
702            _ => panic!("expected RenamedField resolution"),
703        }
704    }
705
706    #[test]
707    fn validate_with_policy_cascade_defer() {
708        let (state, cs) = setup();
709        let mut policies = crate::policy::PolicyRegistry::new();
710        let mut policy = crate::policy::CollectionPolicy::ephemeral();
711        policy.foreign_key = crate::policy::ConflictPolicy::CascadeDefer {
712            max_retries: 3,
713            ttl_secs: 60,
714        };
715        policies.set("posts", policy);
716
717        let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
718
719        // FK to non-existent user
720        let change = ProposedChange {
721            collection: "posts".into(),
722            row_id: "p1".into(),
723            fields: vec![("author_id".into(), LoroValue::String("u1".into()))],
724        };
725
726        let now = std::time::SystemTime::now()
727            .duration_since(std::time::UNIX_EPOCH)
728            .unwrap()
729            .as_millis() as u64;
730
731        let resolution = validator
732            .validate_with_policy(
733                &state,
734                42,
735                CrdtAuthContext::default(),
736                &change,
737                b"delta".to_vec(),
738                now,
739            )
740            .unwrap();
741
742        match resolution {
743            PolicyResolution::Deferred {
744                retry_after_ms,
745                attempt,
746            } => {
747                assert_eq!(attempt, 0);
748                assert_eq!(retry_after_ms, 500); // base backoff
749            }
750            _ => panic!("expected Deferred resolution"),
751        }
752
753        // Verify entry was enqueued to deferred queue
754        assert_eq!(validator.deferred().len(), 1);
755    }
756
757    #[test]
758    fn validate_with_policy_escalate_to_dlq() {
759        let (state, cs) = setup();
760        let mut policies = crate::policy::PolicyRegistry::new();
761        let mut policy = crate::policy::CollectionPolicy::ephemeral();
762        policy.not_null = crate::policy::ConflictPolicy::EscalateToDlq;
763        policies.set("users", policy);
764
765        let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
766
767        let change = ProposedChange {
768            collection: "users".into(),
769            row_id: "u1".into(),
770            fields: vec![("email".into(), LoroValue::String("a@b.com".into()))],
771        };
772
773        let now = std::time::SystemTime::now()
774            .duration_since(std::time::UNIX_EPOCH)
775            .unwrap()
776            .as_millis() as u64;
777
778        let resolution = validator
779            .validate_with_policy(
780                &state,
781                42,
782                CrdtAuthContext::default(),
783                &change,
784                b"delta".to_vec(),
785                now,
786            )
787            .unwrap();
788
789        assert!(matches!(resolution, PolicyResolution::Escalate));
790        assert_eq!(validator.dlq().len(), 1);
791    }
792}