use loro::LoroValue;
use nodedb_crdt::constraint::ConstraintSet;
use nodedb_crdt::policy::CollectionPolicy;
use nodedb_crdt::pre_validate::{self, PreValidationResult};
use nodedb_crdt::state::CrdtState;
use nodedb_crdt::validator::{ProposedChange, Validator};
use crate::types::TenantId;
pub struct TenantCrdtEngine {
tenant_id: TenantId,
state: CrdtState,
pub(crate) validator: Validator,
}
impl TenantCrdtEngine {
pub fn new(
tenant_id: TenantId,
peer_id: u64,
constraints: ConstraintSet,
) -> crate::Result<Self> {
Ok(Self {
tenant_id,
state: CrdtState::new(peer_id).map_err(crate::Error::Crdt)?,
validator: Validator::new(constraints, 1000),
})
}
pub fn peer_id(&self) -> u64 {
self.state.peer_id()
}
pub fn export_snapshot_bytes(&self) -> crate::Result<Vec<u8>> {
self.state.export_snapshot().map_err(crate::Error::Crdt)
}
pub fn read_snapshot(&self, collection: &str, row_id: &str) -> crate::Result<Option<Vec<u8>>> {
if self.state.row_exists(collection, row_id) {
Ok(Some(
self.state.export_snapshot().map_err(crate::Error::Crdt)?,
))
} else {
Ok(None)
}
}
pub fn read_row(&self, collection: &str, row_id: &str) -> Option<LoroValue> {
self.state.read_row(collection, row_id)
}
pub fn pre_validate(&self, change: &ProposedChange) -> PreValidationResult {
pre_validate::pre_validate(&self.validator, &self.state, change)
}
pub fn apply_committed_delta(&self, delta: &[u8]) -> crate::Result<()> {
self.state.import(delta).map_err(crate::Error::Crdt)
}
pub fn validate_and_apply(
&mut self,
peer_id: u64,
auth: nodedb_crdt::CrdtAuthContext,
change: &ProposedChange,
delta_bytes: Vec<u8>,
) -> crate::Result<()> {
self.validator
.validate_or_reject(&self.state, peer_id, auth, change, delta_bytes)
.map_err(crate::Error::Crdt)?;
let fields: Vec<(&str, LoroValue)> = change
.fields
.iter()
.map(|(k, v)| (k.as_str(), v.clone()))
.collect();
self.state
.upsert(&change.collection, &change.row_id, &fields)
.map_err(crate::Error::Crdt)
}
pub fn dlq_len(&self) -> usize {
self.validator.dlq().len()
}
pub fn row_exists(&self, collection: &str, row_id: &str) -> bool {
self.state.row_exists(collection, row_id)
}
pub fn tenant_id(&self) -> TenantId {
self.tenant_id
}
pub fn set_collection_policy(
&mut self,
collection: &str,
policy_json: &str,
) -> crate::Result<()> {
let policy: CollectionPolicy =
serde_json::from_str(policy_json).map_err(|e| crate::Error::BadRequest {
detail: format!("invalid collection policy JSON: {e}"),
})?;
Self::validate_policy(&policy)?;
self.validator.policies_mut().set(collection, policy);
Ok(())
}
fn validate_policy(policy: &CollectionPolicy) -> crate::Result<()> {
Self::validate_conflict_policy(&policy.unique, "unique")?;
Self::validate_conflict_policy(&policy.foreign_key, "foreign_key")?;
Self::validate_conflict_policy(&policy.not_null, "not_null")?;
Self::validate_conflict_policy(&policy.check, "check")?;
Ok(())
}
fn validate_conflict_policy(
policy: &nodedb_crdt::policy::ConflictPolicy,
field_name: &str,
) -> crate::Result<()> {
use nodedb_crdt::policy::ConflictPolicy;
match policy {
ConflictPolicy::CascadeDefer {
max_retries,
ttl_secs,
} => {
if *max_retries == 0 {
return Err(crate::Error::BadRequest {
detail: format!("{field_name}: max_retries must be > 0"),
});
}
if *ttl_secs == 0 {
return Err(crate::Error::BadRequest {
detail: format!("{field_name}: ttl_secs must be > 0"),
});
}
}
ConflictPolicy::Custom {
webhook_url,
timeout_secs,
} => {
if webhook_url.is_empty() {
return Err(crate::Error::BadRequest {
detail: format!("{field_name}: webhook_url must not be empty"),
});
}
if !webhook_url.starts_with("http://") && !webhook_url.starts_with("https://") {
return Err(crate::Error::BadRequest {
detail: format!("{field_name}: webhook_url must be an HTTP(S) URL"),
});
}
if *timeout_secs == 0 {
return Err(crate::Error::BadRequest {
detail: format!("{field_name}: timeout_secs must be > 0"),
});
}
}
_ => {}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_constraints() -> ConstraintSet {
let mut cs = ConstraintSet::new();
cs.add_unique("users_email_unique", "users", "email");
cs.add_not_null("users_name_nn", "users", "name");
cs
}
#[test]
fn valid_write_applies() {
let mut engine = TenantCrdtEngine::new(TenantId::new(1), 0, test_constraints()).unwrap();
let change = ProposedChange {
collection: "users".into(),
row_id: "u1".into(),
fields: vec![
("name".into(), LoroValue::String("Alice".into())),
(
"email".into(),
LoroValue::String("alice@example.com".into()),
),
],
};
engine
.validate_and_apply(
1,
nodedb_crdt::CrdtAuthContext::default(),
&change,
b"delta".to_vec(),
)
.unwrap();
assert!(engine.row_exists("users", "u1"));
assert_eq!(engine.dlq_len(), 0);
}
#[test]
fn constraint_violation_routes_to_dlq() {
let mut engine = TenantCrdtEngine::new(TenantId::new(1), 0, test_constraints()).unwrap();
engine
.validator
.policies_mut()
.set("users", CollectionPolicy::strict());
let change = ProposedChange {
collection: "users".into(),
row_id: "u1".into(),
fields: vec![("email".into(), LoroValue::String("a@b.com".into()))],
};
let err = engine
.validate_and_apply(
42,
nodedb_crdt::CrdtAuthContext::default(),
&change,
b"delta".to_vec(),
)
.unwrap_err();
assert!(matches!(err, crate::Error::Crdt(_)));
assert_eq!(engine.dlq_len(), 1);
}
#[test]
fn pre_validate_fast_rejects() {
let engine = TenantCrdtEngine::new(TenantId::new(1), 0, test_constraints()).unwrap();
let change = ProposedChange {
collection: "users".into(),
row_id: "u1".into(),
fields: vec![("email".into(), LoroValue::String("a@b.com".into()))],
};
match engine.pre_validate(&change) {
PreValidationResult::FastReject { constraint, .. } => {
assert_eq!(constraint, "users_name_nn");
}
_ => panic!("expected fast reject"),
}
}
#[test]
fn unique_violation_after_first_write() {
let mut engine = TenantCrdtEngine::new(TenantId::new(1), 0, test_constraints()).unwrap();
engine
.validator
.policies_mut()
.set("users", CollectionPolicy::strict());
let first = ProposedChange {
collection: "users".into(),
row_id: "u1".into(),
fields: vec![
("name".into(), LoroValue::String("Alice".into())),
(
"email".into(),
LoroValue::String("alice@example.com".into()),
),
],
};
engine
.validate_and_apply(
1,
nodedb_crdt::CrdtAuthContext::default(),
&first,
b"d1".to_vec(),
)
.unwrap();
let second = ProposedChange {
collection: "users".into(),
row_id: "u2".into(),
fields: vec![
("name".into(), LoroValue::String("Bob".into())),
(
"email".into(),
LoroValue::String("alice@example.com".into()),
),
],
};
assert!(
engine
.validate_and_apply(
2,
nodedb_crdt::CrdtAuthContext::default(),
&second,
b"d2".to_vec()
)
.is_err()
);
assert_eq!(engine.dlq_len(), 1);
}
}