1use crate::CrdtAuthContext;
9use crate::constraint::{Constraint, ConstraintKind, ConstraintSet};
10use crate::dead_letter::{CompensationHint, DeadLetterQueue};
11use crate::deferred::DeferredQueue;
12use crate::error::{CrdtError, Result};
13use crate::policy::{ConflictPolicy, PolicyRegistry, PolicyResolution, ResolvedAction};
14use crate::signing::DeltaSigner;
15use crate::state::CrdtState;
16
17use loro::LoroValue;
18use std::collections::HashMap;
19
20#[derive(Debug)]
22pub enum ValidationOutcome {
23 Accepted,
25 Rejected(Vec<Violation>),
27}
28
29#[derive(Debug, Clone)]
31pub struct Violation {
32 pub constraint_name: String,
34 pub reason: String,
36 pub hint: CompensationHint,
38}
39
40#[derive(Debug, Clone)]
42pub struct ProposedChange {
43 pub collection: String,
45 pub row_id: String,
47 pub fields: Vec<(String, LoroValue)>,
49}
50
51pub struct Validator {
60 constraints: ConstraintSet,
61 dlq: DeadLetterQueue,
62 policies: PolicyRegistry,
63 deferred: DeferredQueue,
64 suffix_counter: HashMap<(String, String), u64>,
66 delta_verifier: Option<DeltaSigner>,
69}
70
71impl Validator {
72 pub fn new(constraints: ConstraintSet, dlq_capacity: usize) -> Self {
74 Self::new_with_policies(constraints, dlq_capacity, PolicyRegistry::new(), 1000)
75 }
76
77 pub fn new_with_policies(
79 constraints: ConstraintSet,
80 dlq_capacity: usize,
81 policies: PolicyRegistry,
82 deferred_capacity: usize,
83 ) -> Self {
84 Self {
85 constraints,
86 dlq: DeadLetterQueue::new(dlq_capacity),
87 policies,
88 deferred: DeferredQueue::new(deferred_capacity),
89 suffix_counter: HashMap::new(),
90 delta_verifier: None,
91 }
92 }
93
94 pub fn validate(&self, state: &CrdtState, change: &ProposedChange) -> ValidationOutcome {
99 let constraints = self.constraints.for_collection(&change.collection);
100 let mut violations = Vec::new();
101
102 for constraint in constraints {
103 if let Some(violation) = self.check_constraint(state, change, constraint) {
104 violations.push(violation);
105 }
106 }
107
108 if violations.is_empty() {
109 ValidationOutcome::Accepted
110 } else {
111 ValidationOutcome::Rejected(violations)
112 }
113 }
114
115 pub fn validate_or_reject(
123 &mut self,
124 state: &CrdtState,
125 peer_id: u64,
126 auth: CrdtAuthContext,
127 change: &ProposedChange,
128 delta_bytes: Vec<u8>,
129 ) -> Result<()> {
130 if auth.auth_expires_at > 0 {
133 let now_ms = std::time::SystemTime::now()
134 .duration_since(std::time::UNIX_EPOCH)
135 .unwrap_or_default()
136 .as_millis() as u64;
137 if now_ms > auth.auth_expires_at {
138 return Err(CrdtError::AuthExpired {
139 user_id: auth.user_id,
140 expired_at: auth.auth_expires_at,
141 });
142 }
143 }
144
145 if auth.delta_signature != [0u8; 32]
147 && let Some(ref verifier) = self.delta_verifier
148 {
149 verifier.verify(auth.user_id, &delta_bytes, &auth.delta_signature)?;
150 }
151
152 let hlc_timestamp = std::time::SystemTime::now()
153 .duration_since(std::time::UNIX_EPOCH)
154 .unwrap_or_default()
155 .as_millis() as u64;
156
157 match self.validate_with_policy(state, peer_id, auth, change, delta_bytes, hlc_timestamp)? {
158 PolicyResolution::AutoResolved(_) => Ok(()),
159 PolicyResolution::Deferred { .. } => {
160 let violations = match self.validate(state, change) {
163 ValidationOutcome::Rejected(v) => v,
164 _ => vec![],
165 };
166 if !violations.is_empty() {
167 let v = &violations[0];
168 Err(CrdtError::ConstraintViolation {
169 constraint: v.constraint_name.clone(),
170 collection: change.collection.clone(),
171 detail: format!("{} (deferred for retry)", v.reason),
172 })
173 } else {
174 Ok(())
175 }
176 }
177 PolicyResolution::WebhookRequired { .. } => {
178 let violations = match self.validate(state, change) {
180 ValidationOutcome::Rejected(v) => v,
181 _ => vec![],
182 };
183 if !violations.is_empty() {
184 let v = &violations[0];
185 Err(CrdtError::ConstraintViolation {
186 constraint: v.constraint_name.clone(),
187 collection: change.collection.clone(),
188 detail: format!("{} (webhook required)", v.reason),
189 })
190 } else {
191 Ok(())
192 }
193 }
194 PolicyResolution::Escalate => {
195 let violations = match self.validate(state, change) {
197 ValidationOutcome::Rejected(v) => v,
198 _ => vec![],
199 };
200 if !violations.is_empty() {
201 let v = &violations[0];
202 Err(CrdtError::ConstraintViolation {
203 constraint: v.constraint_name.clone(),
204 collection: change.collection.clone(),
205 detail: v.reason.clone(),
206 })
207 } else {
208 Ok(())
209 }
210 }
211 }
212 }
213
214 pub fn validate_with_policy(
234 &mut self,
235 state: &CrdtState,
236 peer_id: u64,
237 auth: CrdtAuthContext,
238 change: &ProposedChange,
239 delta_bytes: Vec<u8>,
240 hlc_timestamp: u64,
241 ) -> Result<PolicyResolution> {
242 match self.validate(state, change) {
243 ValidationOutcome::Accepted => {
244 Ok(PolicyResolution::AutoResolved(
246 ResolvedAction::OverwriteExisting,
247 ))
248 }
249 ValidationOutcome::Rejected(violations) => {
250 let v = &violations[0];
252 let constraint = self
253 .constraints
254 .all()
255 .iter()
256 .find(|c| c.name == v.constraint_name)
257 .cloned()
258 .unwrap_or_else(|| Constraint {
259 name: v.constraint_name.clone(),
260 collection: change.collection.clone(),
261 field: String::new(),
262 kind: ConstraintKind::NotNull,
263 });
264
265 let policy = self.policies.get_owned(&change.collection);
266 let policy_for_kind = policy.for_kind(&constraint.kind);
267
268 match policy_for_kind {
270 ConflictPolicy::LastWriterWins => {
271 tracing::info!(
273 constraint = %v.constraint_name,
274 collection = %change.collection,
275 timestamp = hlc_timestamp,
276 reason = %v.reason,
277 "resolved via LAST_WRITER_WINS"
278 );
279 Ok(PolicyResolution::AutoResolved(
280 ResolvedAction::OverwriteExisting,
281 ))
282 }
283
284 ConflictPolicy::RenameSuffix => {
285 let counter_key = (change.collection.clone(), constraint.field.clone());
287 let suffix = self.suffix_counter.entry(counter_key).or_insert(0);
288 *suffix += 1;
289 let new_value = format!(
290 "{}_{}",
291 change
292 .fields
293 .iter()
294 .find(|(f, _)| f == &constraint.field)
295 .map(|(_, v)| format!("{:?}", v))
296 .unwrap_or_else(|| "unknown".to_string()),
297 suffix
298 );
299
300 tracing::info!(
301 constraint = %v.constraint_name,
302 field = %constraint.field,
303 new_value = %new_value,
304 "resolved via RENAME_APPEND_SUFFIX"
305 );
306
307 Ok(PolicyResolution::AutoResolved(
308 ResolvedAction::RenamedField {
309 field: constraint.field.clone(),
310 new_value,
311 },
312 ))
313 }
314
315 ConflictPolicy::CascadeDefer {
316 max_retries,
317 ttl_secs,
318 } => {
319 let now_ms = std::time::SystemTime::now()
321 .duration_since(std::time::UNIX_EPOCH)
322 .unwrap_or_default()
323 .as_millis() as u64;
324
325 let base_ms = 500u64;
326 let first_retry_after_ms = base_ms;
327
328 let id = self.deferred.enqueue(
329 peer_id,
330 auth.user_id,
331 auth.tenant_id,
332 delta_bytes,
333 change.collection.clone(),
334 constraint.name.clone(),
335 0,
336 *max_retries,
337 now_ms,
338 first_retry_after_ms,
339 *ttl_secs,
340 );
341
342 tracing::info!(
343 constraint = %v.constraint_name,
344 deferred_id = id,
345 reason = %v.reason,
346 "resolved via CASCADE_DEFER (queued for retry)"
347 );
348
349 Ok(PolicyResolution::Deferred {
350 retry_after_ms: first_retry_after_ms,
351 attempt: 0,
352 })
353 }
354
355 ConflictPolicy::Custom {
356 webhook_url,
357 timeout_secs,
358 } => {
359 tracing::info!(
361 constraint = %v.constraint_name,
362 webhook_url = %webhook_url,
363 "escalated to webhook"
364 );
365
366 Ok(PolicyResolution::WebhookRequired {
367 webhook_url: webhook_url.clone(),
368 timeout_secs: *timeout_secs,
369 })
370 }
371
372 ConflictPolicy::EscalateToDlq => {
373 self.dlq.enqueue(
375 peer_id,
376 auth.user_id,
377 auth.tenant_id,
378 delta_bytes,
379 &constraint,
380 v.reason.clone(),
381 v.hint.clone(),
382 )?;
383
384 tracing::info!(
385 constraint = %v.constraint_name,
386 collection = %change.collection,
387 "escalated to DLQ"
388 );
389
390 Ok(PolicyResolution::Escalate)
391 }
392 }
393 }
394 }
395 }
396
397 pub fn dlq(&self) -> &DeadLetterQueue {
399 &self.dlq
400 }
401
402 pub fn dlq_mut(&mut self) -> &mut DeadLetterQueue {
404 &mut self.dlq
405 }
406
407 pub fn policies(&self) -> &PolicyRegistry {
409 &self.policies
410 }
411
412 pub fn policies_mut(&mut self) -> &mut PolicyRegistry {
414 &mut self.policies
415 }
416
417 pub fn deferred(&self) -> &DeferredQueue {
419 &self.deferred
420 }
421
422 pub fn deferred_mut(&mut self) -> &mut DeferredQueue {
424 &mut self.deferred
425 }
426
427 pub fn set_delta_verifier(&mut self, verifier: DeltaSigner) {
430 self.delta_verifier = Some(verifier);
431 }
432
433 pub fn delta_verifier(&self) -> Option<&DeltaSigner> {
435 self.delta_verifier.as_ref()
436 }
437
438 pub fn delta_verifier_mut(&mut self) -> Option<&mut DeltaSigner> {
440 self.delta_verifier.as_mut()
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 fn setup() -> (CrdtState, ConstraintSet) {
449 let state = CrdtState::new(1).unwrap();
450 let mut cs = ConstraintSet::new();
451 cs.add_unique("users_email_unique", "users", "email");
452 cs.add_not_null("users_name_nn", "users", "name");
453 cs.add_foreign_key("posts_author_fk", "posts", "author_id", "users", "id");
454 (state, cs)
455 }
456
457 #[test]
458 fn valid_insert_accepted() {
459 let (state, cs) = setup();
460 let validator = Validator::new(cs, 100);
461
462 let change = ProposedChange {
463 collection: "users".into(),
464 row_id: "u1".into(),
465 fields: vec![
466 ("name".into(), LoroValue::String("Alice".into())),
467 (
468 "email".into(),
469 LoroValue::String("alice@example.com".into()),
470 ),
471 ],
472 };
473
474 assert!(matches!(
475 validator.validate(&state, &change),
476 ValidationOutcome::Accepted
477 ));
478 }
479
480 #[test]
481 fn not_null_violation() {
482 let (_state, cs) = setup();
483 let validator = Validator::new(cs, 100);
484 let state = CrdtState::new(1).unwrap();
485
486 let change = ProposedChange {
487 collection: "users".into(),
488 row_id: "u1".into(),
489 fields: vec![
490 (
492 "email".into(),
493 LoroValue::String("alice@example.com".into()),
494 ),
495 ],
496 };
497
498 match validator.validate(&state, &change) {
499 ValidationOutcome::Rejected(v) => {
500 assert_eq!(v.len(), 1);
501 assert_eq!(v[0].constraint_name, "users_name_nn");
502 assert!(matches!(
503 v[0].hint,
504 CompensationHint::ProvideRequiredField { .. }
505 ));
506 }
507 _ => panic!("expected rejection"),
508 }
509 }
510
511 #[test]
512 fn foreign_key_violation() {
513 let (state, cs) = setup();
514 let validator = Validator::new(cs, 100);
515
516 let change = ProposedChange {
518 collection: "posts".into(),
519 row_id: "p1".into(),
520 fields: vec![("author_id".into(), LoroValue::String("u1".into()))],
521 };
522
523 match validator.validate(&state, &change) {
524 ValidationOutcome::Rejected(v) => {
525 assert_eq!(v[0].constraint_name, "posts_author_fk");
526 assert!(matches!(
527 v[0].hint,
528 CompensationHint::CreateReferencedRow { .. }
529 ));
530 }
531 _ => panic!("expected FK violation"),
532 }
533 }
534
535 #[test]
536 fn foreign_key_passes_when_parent_exists() {
537 let (state, cs) = setup();
538 let validator = Validator::new(cs, 100);
539
540 state
542 .upsert(
543 "users",
544 "u1",
545 &[
546 ("name", LoroValue::String("Alice".into())),
547 ("email", LoroValue::String("a@b.com".into())),
548 ],
549 )
550 .unwrap();
551
552 let change = ProposedChange {
553 collection: "posts".into(),
554 row_id: "p1".into(),
555 fields: vec![("author_id".into(), LoroValue::String("u1".into()))],
556 };
557
558 assert!(matches!(
559 validator.validate(&state, &change),
560 ValidationOutcome::Accepted
561 ));
562 }
563
564 #[test]
565 fn validate_or_reject_enqueues_to_dlq() {
566 let (state, cs) = setup();
567 let policies = crate::policy::PolicyRegistry::new();
569 let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
570
571 let strict_policy = crate::policy::CollectionPolicy::strict();
573 validator.policies_mut().set("users", strict_policy);
574
575 let change = ProposedChange {
576 collection: "users".into(),
577 row_id: "u1".into(),
578 fields: vec![
579 ("email".into(), LoroValue::String("a@b.com".into())),
581 ],
582 };
583
584 let err = validator
585 .validate_or_reject(
586 &state,
587 42,
588 CrdtAuthContext::default(),
589 &change,
590 b"delta-bytes".to_vec(),
591 )
592 .unwrap_err();
593
594 assert!(matches!(err, CrdtError::ConstraintViolation { .. }));
595 assert_eq!(validator.dlq().len(), 1);
596
597 let dl = validator.dlq().peek().unwrap();
598 assert_eq!(dl.peer_id, 42);
599 assert_eq!(dl.violated_constraint, "users_name_nn");
600 }
601
602 #[test]
603 fn validate_with_policy_last_writer_wins() {
604 let (state, cs) = setup();
605 let mut policies = crate::policy::PolicyRegistry::new();
606 policies.set("users", crate::policy::CollectionPolicy::ephemeral());
607
608 let mut policy = crate::policy::CollectionPolicy::ephemeral();
610 policy.not_null = crate::policy::ConflictPolicy::LastWriterWins;
611 policies.set("users", policy);
612
613 let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
614
615 let change = ProposedChange {
616 collection: "users".into(),
617 row_id: "u1".into(),
618 fields: vec![
619 ("email".into(), LoroValue::String("a@b.com".into())),
621 ],
622 };
623
624 let now = std::time::SystemTime::now()
625 .duration_since(std::time::UNIX_EPOCH)
626 .unwrap()
627 .as_millis() as u64;
628
629 let resolution = validator
630 .validate_with_policy(
631 &state,
632 42,
633 CrdtAuthContext::default(),
634 &change,
635 b"delta".to_vec(),
636 now,
637 )
638 .unwrap();
639
640 assert!(matches!(
641 resolution,
642 PolicyResolution::AutoResolved(ResolvedAction::OverwriteExisting)
643 ));
644 }
645
646 #[test]
647 fn validate_with_policy_rename_suffix() {
648 let (state, cs) = setup();
649 let mut policies = crate::policy::PolicyRegistry::new();
650 let mut policy = crate::policy::CollectionPolicy::ephemeral();
651 policy.unique = crate::policy::ConflictPolicy::RenameSuffix;
652 policies.set("users", policy);
653
654 let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
655
656 state
658 .upsert(
659 "users",
660 "u1",
661 &[
662 ("name", LoroValue::String("Alice".into())),
663 ("email", LoroValue::String("alice@example.com".into())),
664 ],
665 )
666 .unwrap();
667
668 let change = ProposedChange {
670 collection: "users".into(),
671 row_id: "u2".into(),
672 fields: vec![
673 ("name".into(), LoroValue::String("Bob".into())),
674 (
675 "email".into(),
676 LoroValue::String("alice@example.com".into()),
677 ),
678 ],
679 };
680
681 let now = std::time::SystemTime::now()
682 .duration_since(std::time::UNIX_EPOCH)
683 .unwrap()
684 .as_millis() as u64;
685
686 let resolution = validator
687 .validate_with_policy(
688 &state,
689 42,
690 CrdtAuthContext::default(),
691 &change,
692 b"delta".to_vec(),
693 now,
694 )
695 .unwrap();
696
697 match resolution {
698 PolicyResolution::AutoResolved(ResolvedAction::RenamedField { field, new_value }) => {
699 assert_eq!(field, "email");
700 assert!(new_value.contains("_1"));
701 }
702 _ => panic!("expected RenamedField resolution"),
703 }
704 }
705
706 #[test]
707 fn validate_with_policy_cascade_defer() {
708 let (state, cs) = setup();
709 let mut policies = crate::policy::PolicyRegistry::new();
710 let mut policy = crate::policy::CollectionPolicy::ephemeral();
711 policy.foreign_key = crate::policy::ConflictPolicy::CascadeDefer {
712 max_retries: 3,
713 ttl_secs: 60,
714 };
715 policies.set("posts", policy);
716
717 let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
718
719 let change = ProposedChange {
721 collection: "posts".into(),
722 row_id: "p1".into(),
723 fields: vec![("author_id".into(), LoroValue::String("u1".into()))],
724 };
725
726 let now = std::time::SystemTime::now()
727 .duration_since(std::time::UNIX_EPOCH)
728 .unwrap()
729 .as_millis() as u64;
730
731 let resolution = validator
732 .validate_with_policy(
733 &state,
734 42,
735 CrdtAuthContext::default(),
736 &change,
737 b"delta".to_vec(),
738 now,
739 )
740 .unwrap();
741
742 match resolution {
743 PolicyResolution::Deferred {
744 retry_after_ms,
745 attempt,
746 } => {
747 assert_eq!(attempt, 0);
748 assert_eq!(retry_after_ms, 500); }
750 _ => panic!("expected Deferred resolution"),
751 }
752
753 assert_eq!(validator.deferred().len(), 1);
755 }
756
757 #[test]
758 fn validate_with_policy_escalate_to_dlq() {
759 let (state, cs) = setup();
760 let mut policies = crate::policy::PolicyRegistry::new();
761 let mut policy = crate::policy::CollectionPolicy::ephemeral();
762 policy.not_null = crate::policy::ConflictPolicy::EscalateToDlq;
763 policies.set("users", policy);
764
765 let mut validator = Validator::new_with_policies(cs, 100, policies, 100);
766
767 let change = ProposedChange {
768 collection: "users".into(),
769 row_id: "u1".into(),
770 fields: vec![("email".into(), LoroValue::String("a@b.com".into()))],
771 };
772
773 let now = std::time::SystemTime::now()
774 .duration_since(std::time::UNIX_EPOCH)
775 .unwrap()
776 .as_millis() as u64;
777
778 let resolution = validator
779 .validate_with_policy(
780 &state,
781 42,
782 CrdtAuthContext::default(),
783 &change,
784 b"delta".to_vec(),
785 now,
786 )
787 .unwrap();
788
789 assert!(matches!(resolution, PolicyResolution::Escalate));
790 assert_eq!(validator.dlq().len(), 1);
791 }
792}