use std::collections::{HashMap, HashSet};
use arrow_array::{Array, RecordBatch, StringArray};
use datafusion::prelude::{Expr, col, lit};
use datafusion::scalar::ScalarValue;
use futures::TryStreamExt;
use lance::Dataset;
use omnigraph_compiler::catalog::{Catalog, EdgeType};
use crate::db::{Omnigraph, Snapshot};
use crate::error::{MergeConflict, MergeConflictKind, OmniError, Result};
use crate::loader::{
composite_unique_key, format_tuple, validate_enum_constraints, validate_value_constraints,
};
use crate::table_store::TableStore;
#[derive(Debug, Clone)]
pub(crate) struct Violation {
pub table_key: String,
pub row_id: Option<String>,
pub kind: MergeConflictKind,
pub message: String,
}
impl Violation {
pub(crate) fn into_merge_conflict(self) -> MergeConflict {
MergeConflict {
table_key: self.table_key,
row_id: self.row_id,
kind: self.kind,
message: self.message,
}
}
pub(crate) fn into_omni_error(self) -> OmniError {
OmniError::manifest(self.message)
}
}
#[derive(Debug, Default)]
pub(crate) struct TableChange {
pub added: Vec<RecordBatch>,
pub changed: Vec<RecordBatch>,
pub deleted_ids: Vec<String>,
}
impl TableChange {
pub fn value_batches(&self) -> impl Iterator<Item = &RecordBatch> {
self.added.iter().chain(self.changed.iter())
}
}
pub(crate) type ChangeSet = HashMap<String, TableChange>;
pub(crate) fn evaluate_value_constraints(changeset: &ChangeSet, catalog: &Catalog) -> Vec<Violation> {
let mut violations = Vec::new();
for (table_key, change) in changeset {
if let Some(type_name) = table_key.strip_prefix("node:") {
let Some(node_type) = catalog.node_types.get(type_name) else {
continue;
};
for batch in change.value_batches() {
if let Err(err) = validate_value_constraints(batch, node_type) {
violations.push(value_violation(table_key, err));
}
if let Err(err) = validate_enum_constraints(batch, &node_type.properties, type_name) {
violations.push(value_violation(table_key, err));
}
}
} else if let Some(type_name) = table_key.strip_prefix("edge:") {
let Some(edge_type) = catalog.edge_types.get(type_name) else {
continue;
};
for batch in change.value_batches() {
if let Err(err) = validate_enum_constraints(batch, &edge_type.properties, type_name) {
violations.push(value_violation(table_key, err));
}
}
}
}
violations
}
fn value_violation(table_key: &str, err: OmniError) -> Violation {
Violation {
table_key: table_key.to_string(),
row_id: None,
kind: MergeConflictKind::ValueConstraintViolation,
message: err.to_string(),
}
}
#[derive(Debug, Clone)]
pub(crate) enum Constraint {
Value,
Unique {
table_key: String,
columns: Vec<String>,
is_key: bool,
},
EdgeRi {
table_key: String,
from_type: String,
to_type: String,
},
Cardinality {
table_key: String,
},
}
pub(crate) fn constraints_for(catalog: &Catalog) -> Vec<Constraint> {
let mut out = vec![Constraint::Value];
for (name, node_type) in &catalog.node_types {
let table_key = format!("node:{name}");
if let Some(key) = &node_type.key {
out.push(Constraint::Unique {
table_key: table_key.clone(),
columns: key.clone(),
is_key: true,
});
}
for columns in &node_type.unique_constraints {
if Some(columns) == node_type.key.as_ref() {
continue; }
out.push(Constraint::Unique {
table_key: table_key.clone(),
columns: columns.clone(),
is_key: false,
});
}
}
for (name, edge_type) in &catalog.edge_types {
let table_key = format!("edge:{name}");
for columns in &edge_type.unique_constraints {
out.push(Constraint::Unique {
table_key: table_key.clone(),
columns: columns.clone(),
is_key: false,
});
}
out.push(Constraint::EdgeRi {
table_key: table_key.clone(),
from_type: edge_type.from_type.clone(),
to_type: edge_type.to_type.clone(),
});
out.push(Constraint::Cardinality { table_key });
}
out
}
pub(crate) struct CommittedState<'a> {
committed: Option<&'a Snapshot>,
overwritten: HashSet<String>,
live: Option<(&'a Omnigraph, Option<&'a str>)>,
}
impl<'a> CommittedState<'a> {
pub(crate) fn merge(target: &'a Snapshot) -> Self {
Self {
committed: Some(target),
overwritten: HashSet::new(),
live: None,
}
}
pub(crate) fn write(committed: &'a Snapshot, db: &'a Omnigraph, branch: Option<&'a str>) -> Self {
Self {
committed: Some(committed),
overwritten: HashSet::new(),
live: Some((db, branch)),
}
}
pub(crate) fn load(
base: &'a Snapshot,
mode: crate::loader::LoadMode,
changeset: &ChangeSet,
) -> Self {
let overwritten = match mode {
crate::loader::LoadMode::Overwrite => changeset.keys().cloned().collect(),
crate::loader::LoadMode::Append | crate::loader::LoadMode::Merge => HashSet::new(),
};
Self {
committed: Some(base),
overwritten,
live: None,
}
}
async fn open(&self, table_key: &str) -> Result<Option<Dataset>> {
if self.overwritten.contains(table_key) {
return Ok(None);
}
let Some(committed) = self.committed else {
return Ok(None);
};
match committed.entry(table_key) {
Some(_) => Ok(Some(committed.open(table_key).await?)),
None => Ok(None),
}
}
async fn open_cardinality(&self, table_key: &str) -> Result<Option<Dataset>> {
if self.overwritten.contains(table_key) {
return Ok(None);
}
let Some(committed) = self.committed else {
return Ok(None);
};
let Some(entry) = committed.entry(table_key) else {
return Ok(None);
};
match self.live {
Some((db, branch)) => {
let full_path = db.storage().dataset_uri(&entry.table_path);
let handle = db.storage().open_dataset_head(&full_path, branch).await?;
Ok(Some(handle.dataset().clone()))
}
None => Ok(Some(committed.open(table_key).await?)),
}
}
async fn existing_ids(&self, table_key: &str, ids: &[String]) -> Result<HashSet<String>> {
let Some(ds) = self.open(table_key).await? else {
return Ok(HashSet::new());
};
if ids.is_empty() {
return Ok(HashSet::new());
}
let expr = col("id").in_list(ids.iter().map(|k| lit(k.clone())).collect(), false);
let batches = scan_filtered(&ds, &["id"], expr).await?;
let mut present = HashSet::new();
for batch in &batches {
let column = string_col(batch, "id")?;
for i in 0..column.len() {
if !column.is_null(i) {
present.insert(column.value(i).to_string());
}
}
}
Ok(present)
}
async fn unique_holders(
&self,
table_key: &str,
columns: &[String],
key_values: &[ScalarValue],
) -> Result<Vec<String>> {
let Some(ds) = self.open(table_key).await? else {
return Ok(Vec::new());
};
let mut expr: Option<Expr> = None;
for (column, value) in columns.iter().zip(key_values.iter()) {
let eq = col(column.as_str()).eq(lit(value.clone()));
expr = Some(match expr {
Some(acc) => acc.and(eq),
None => eq,
});
}
let Some(expr) = expr else {
return Ok(Vec::new());
};
let batches = scan_filtered(&ds, &["id"], expr).await?;
let mut ids = Vec::new();
for batch in &batches {
let column = string_col(batch, "id")?;
for i in 0..column.len() {
if !column.is_null(i) {
ids.push(column.value(i).to_string());
}
}
}
Ok(ids)
}
async fn committed_edges(
&self,
edge_table: &str,
key_col: &str,
keys: &[String],
) -> Result<Vec<(String, String)>> {
let Some(ds) = self.open_cardinality(edge_table).await? else {
return Ok(Vec::new());
};
if keys.is_empty() {
return Ok(Vec::new());
}
let expr = col(key_col).in_list(keys.iter().map(|k| lit(k.clone())).collect(), false);
let batches = scan_filtered(&ds, &["id", "src"], expr).await?;
let mut out = Vec::new();
for batch in &batches {
let ids = string_col(batch, "id")?;
let srcs = string_col(batch, "src")?;
for i in 0..batch.num_rows() {
out.push((ids.value(i).to_string(), srcs.value(i).to_string()));
}
}
Ok(out)
}
async fn edges_referencing(
&self,
edge_table: &str,
src_nodes: &[String],
dst_nodes: &[String],
) -> Result<Vec<(String, String, String)>> {
if src_nodes.is_empty() && dst_nodes.is_empty() {
return Ok(Vec::new());
}
let Some(ds) = self.open(edge_table).await? else {
return Ok(Vec::new());
};
let mut expr: Option<Expr> = None;
if !src_nodes.is_empty() {
expr =
Some(col("src").in_list(src_nodes.iter().map(|k| lit(k.clone())).collect(), false));
}
if !dst_nodes.is_empty() {
let dst = col("dst").in_list(dst_nodes.iter().map(|k| lit(k.clone())).collect(), false);
expr = Some(match expr {
Some(acc) => acc.or(dst),
None => dst,
});
}
let batches = scan_filtered(&ds, &["id", "src", "dst"], expr.unwrap()).await?;
let mut out = Vec::new();
for batch in &batches {
let ids = string_col(batch, "id")?;
let srcs = string_col(batch, "src")?;
let dsts = string_col(batch, "dst")?;
for i in 0..batch.num_rows() {
out.push((
ids.value(i).to_string(),
srcs.value(i).to_string(),
dsts.value(i).to_string(),
));
}
}
Ok(out)
}
}
async fn scan_filtered(ds: &Dataset, projection: &[&str], expr: Expr) -> Result<Vec<RecordBatch>> {
TableStore::scan_stream_with(ds, Some(projection), None, None, false, move |scanner| {
scanner.filter_expr(expr);
Ok(())
})
.await?
.try_collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
async fn scan_all(ds: &Dataset, projection: &[&str]) -> Result<Vec<RecordBatch>> {
TableStore::scan_stream_with(ds, Some(projection), None, None, false, |_| Ok(()))
.await?
.try_collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
pub(crate) async fn overwrite_removed_ids(
base: &Snapshot,
table_key: &str,
change: &TableChange,
) -> Result<Vec<String>> {
if base.entry(table_key).is_none() {
return Ok(Vec::new());
}
let mut new_ids: HashSet<String> = HashSet::new();
for batch in change.value_batches() {
let column = string_col(batch, "id")?;
for i in 0..column.len() {
if !column.is_null(i) {
new_ids.insert(column.value(i).to_string());
}
}
}
let ds = base.open(table_key).await?;
let mut removed = Vec::new();
for batch in &scan_all(&ds, &["id"]).await? {
let column = string_col(batch, "id")?;
for i in 0..column.len() {
if !column.is_null(i) && !new_ids.contains(column.value(i)) {
removed.push(column.value(i).to_string());
}
}
}
Ok(removed)
}
fn string_col<'b>(batch: &'b RecordBatch, name: &str) -> Result<&'b StringArray> {
batch
.column_by_name(name)
.ok_or_else(|| OmniError::manifest(format!("batch missing column '{name}'")))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest(format!("column '{name}' is not Utf8")))
}
fn delta_id_set(change: &TableChange) -> Result<HashSet<String>> {
let mut ids = HashSet::new();
for batch in change.value_batches() {
let column = string_col(batch, "id")?;
for i in 0..column.len() {
if !column.is_null(i) {
ids.insert(column.value(i).to_string());
}
}
}
Ok(ids)
}
fn delta_edge_src(change: &TableChange) -> Result<Vec<(String, String)>> {
let mut out = Vec::new();
for batch in change.value_batches() {
let ids = string_col(batch, "id")?;
let srcs = string_col(batch, "src")?;
for i in 0..batch.num_rows() {
out.push((ids.value(i).to_string(), srcs.value(i).to_string()));
}
}
Ok(out)
}
pub(crate) async fn validate_changeset(
changeset: &ChangeSet,
committed: &CommittedState<'_>,
catalog: &Catalog,
) -> Result<()> {
if changeset.is_empty() {
return Ok(());
}
let constraints = constraints_for(catalog);
let violations = evaluate(&constraints, changeset, committed, catalog).await?;
match violations.into_iter().next() {
Some(violation) => Err(violation.into_omni_error()),
None => Ok(()),
}
}
pub(crate) async fn evaluate(
constraints: &[Constraint],
changeset: &ChangeSet,
committed: &CommittedState<'_>,
catalog: &Catalog,
) -> Result<Vec<Violation>> {
let mut violations = Vec::new();
for constraint in constraints {
match constraint {
Constraint::Value => {
violations.extend(evaluate_value_constraints(changeset, catalog));
}
Constraint::Unique {
table_key,
columns,
is_key,
} => {
if let Some(change) = changeset.get(table_key) {
violations.extend(
evaluate_unique(table_key, columns, *is_key, change, committed).await?,
);
}
}
Constraint::EdgeRi {
table_key,
from_type,
to_type,
} => {
let node_deleted = |node_type: &str| {
changeset
.get(&format!("node:{node_type}"))
.map(|change| !change.deleted_ids.is_empty())
.unwrap_or(false)
};
let change = changeset.get(table_key);
if change.is_some() || node_deleted(from_type) || node_deleted(to_type) {
violations.extend(
evaluate_edge_ri(table_key, from_type, to_type, change, changeset, committed)
.await?,
);
}
}
Constraint::Cardinality { table_key } => {
if let Some(change) = changeset.get(table_key) {
let Some(type_name) = table_key.strip_prefix("edge:") else {
continue;
};
if let Some(edge_type) = catalog.edge_types.get(type_name) {
violations.extend(
evaluate_cardinality(table_key, edge_type, change, changeset, committed)
.await?,
);
}
}
}
}
}
Ok(violations)
}
async fn evaluate_unique(
table_key: &str,
columns: &[String],
is_key: bool,
change: &TableChange,
committed: &CommittedState<'_>,
) -> Result<Vec<Violation>> {
let mut violations = Vec::new();
let delta_ids = delta_id_set(change)?;
let deleted: HashSet<&String> = change.deleted_ids.iter().collect();
let mut final_by_id: HashMap<String, (Vec<String>, Vec<ScalarValue>)> = HashMap::new();
for batch in change.value_batches() {
let group_columns = columns
.iter()
.map(|name| {
batch.column_by_name(name).cloned().ok_or_else(|| {
OmniError::manifest(format!("table {table_key} missing unique column '{name}'"))
})
})
.collect::<Result<Vec<_>>>()?;
let ids = string_col(batch, "id")?;
let mut seen_in_batch: HashMap<Vec<String>, String> = HashMap::new();
for row in 0..batch.num_rows() {
let id = ids.value(row).to_string();
let Some(key) = composite_unique_key(&group_columns, row)? else {
final_by_id.remove(&id);
continue;
};
if let Some(prior) = seen_in_batch.insert(key.clone(), id.clone()) {
violations.push(unique_violation(table_key, columns, &key, &id, &prior));
}
let values = group_columns
.iter()
.map(|arr| ScalarValue::try_from_array(arr, row))
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| OmniError::manifest(e.to_string()))?;
final_by_id.insert(id, (key, values));
}
}
if !violations.is_empty() {
return Ok(violations);
}
let mut entries: Vec<(String, (Vec<String>, Vec<ScalarValue>))> =
final_by_id.into_iter().collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
let mut holder_by_key: HashMap<&Vec<String>, &String> = HashMap::new();
for (id, (key, _)) in &entries {
if let Some(other) = holder_by_key.insert(key, id) {
if other != id {
violations.push(unique_violation(table_key, columns, key, id, other));
}
}
}
if !is_key {
for (id, (key, values)) in &entries {
for holder in committed.unique_holders(table_key, columns, values).await? {
if !delta_ids.contains(&holder) && !deleted.contains(&holder) {
violations.push(unique_violation(table_key, columns, key, id, &holder));
break;
}
}
}
}
Ok(violations)
}
async fn evaluate_edge_ri(
edge_table: &str,
from_type: &str,
to_type: &str,
change: Option<&TableChange>,
changeset: &ChangeSet,
committed: &CommittedState<'_>,
) -> Result<Vec<Violation>> {
let from_table = format!("node:{from_type}");
let to_table = format!("node:{to_type}");
let mut violations = Vec::new();
let mut delta_edge_ids: HashSet<String> = HashSet::new();
if let Some(change) = change {
let mut edges = Vec::new();
for batch in change.value_batches() {
let ids = string_col(batch, "id")?;
let srcs = string_col(batch, "src")?;
let dsts = string_col(batch, "dst")?;
for i in 0..batch.num_rows() {
let id = ids.value(i).to_string();
delta_edge_ids.insert(id.clone());
edges.push((id, srcs.value(i).to_string(), dsts.value(i).to_string()));
}
}
if !edges.is_empty() {
let srcs: Vec<String> = edges.iter().map(|(_, src, _)| src.clone()).collect();
let dsts: Vec<String> = edges.iter().map(|(_, _, dst)| dst.clone()).collect();
let from_exist = merged_node_existence(&from_table, &srcs, changeset, committed).await?;
let to_exist = merged_node_existence(&to_table, &dsts, changeset, committed).await?;
for (id, src, dst) in &edges {
if !from_exist.contains(src) {
violations.push(orphan_violation(edge_table, id, "src", src, from_type));
}
if !to_exist.contains(dst) {
violations.push(orphan_violation(edge_table, id, "dst", dst, to_type));
}
}
}
}
let deleted_from: Vec<String> = changeset
.get(&from_table)
.map(|change| change.deleted_ids.clone())
.unwrap_or_default();
let deleted_to: Vec<String> = changeset
.get(&to_table)
.map(|change| change.deleted_ids.clone())
.unwrap_or_default();
if !deleted_from.is_empty() || !deleted_to.is_empty() {
let removed: HashSet<&String> = change
.map(|change| change.deleted_ids.iter().collect())
.unwrap_or_default();
let from_set: HashSet<&String> = deleted_from.iter().collect();
let to_set: HashSet<&String> = deleted_to.iter().collect();
for (id, src, dst) in committed
.edges_referencing(edge_table, &deleted_from, &deleted_to)
.await?
{
if delta_edge_ids.contains(&id) || removed.contains(&id) {
continue;
}
if from_set.contains(&src) {
violations.push(orphan_violation(edge_table, &id, "src", &src, from_type));
}
if to_set.contains(&dst) {
violations.push(orphan_violation(edge_table, &id, "dst", &dst, to_type));
}
}
}
Ok(violations)
}
async fn merged_node_existence(
node_table: &str,
ids: &[String],
changeset: &ChangeSet,
committed: &CommittedState<'_>,
) -> Result<HashSet<String>> {
let (added_changed, deleted) = match changeset.get(node_table) {
Some(change) => (
delta_id_set(change)?,
change.deleted_ids.iter().cloned().collect::<HashSet<_>>(),
),
None => (HashSet::new(), HashSet::new()),
};
let mut exist = HashSet::new();
let mut to_probe = Vec::new();
for id in ids {
if added_changed.contains(id) {
exist.insert(id.clone());
} else if !deleted.contains(id) {
to_probe.push(id.clone());
}
}
for id in committed.existing_ids(node_table, &to_probe).await? {
exist.insert(id);
}
Ok(exist)
}
async fn evaluate_cardinality(
edge_table: &str,
edge_type: &EdgeType,
change: &TableChange,
changeset: &ChangeSet,
committed: &CommittedState<'_>,
) -> Result<Vec<Violation>> {
let card = &edge_type.cardinality;
if card.min == 0 && card.max.is_none() {
return Ok(Vec::new());
}
let delta_edges = delta_edge_src(change)?;
let removed_ids: Vec<String> = change.deleted_ids.clone();
let removed_id_set: HashSet<&String> = removed_ids.iter().collect();
let mut delta_by_id: HashMap<String, String> = HashMap::new();
for (id, src) in &delta_edges {
delta_by_id.insert(id.clone(), src.clone());
}
let changed_ids: Vec<String> = delta_by_id.keys().cloned().collect();
let delta_id_set: HashSet<&String> = changed_ids.iter().collect();
let removed_edges = committed.committed_edges(edge_table, "id", &removed_ids).await?;
let moved_from = committed.committed_edges(edge_table, "id", &changed_ids).await?;
let deleted_src_nodes: HashSet<String> = changeset
.get(&format!("node:{}", edge_type.from_type))
.map(|change| change.deleted_ids.iter().cloned().collect())
.unwrap_or_default();
let mut affected: HashSet<String> = HashSet::new();
for src in delta_by_id.values() {
affected.insert(src.clone());
}
for (_, src) in removed_edges.iter().chain(moved_from.iter()) {
affected.insert(src.clone());
}
affected.retain(|src| !deleted_src_nodes.contains(src));
if affected.is_empty() {
return Ok(Vec::new());
}
let affected_vec: Vec<String> = affected.iter().cloned().collect();
let committed_for_affected = committed
.committed_edges(edge_table, "src", &affected_vec)
.await?;
let mut per_src: HashMap<String, HashSet<String>> = HashMap::new();
for (id, src) in &committed_for_affected {
if removed_id_set.contains(id) || delta_id_set.contains(id) {
continue;
}
per_src.entry(src.clone()).or_default().insert(id.clone());
}
for (id, src) in &delta_by_id {
per_src.entry(src.clone()).or_default().insert(id.clone());
}
let mut violations = Vec::new();
for src in &affected {
let count = per_src.get(src).map(|ids| ids.len() as u32).unwrap_or(0);
if let Some(max) = card.max {
if count > max {
violations.push(cardinality_violation(
edge_table,
&edge_type.name,
src,
count,
"max",
max,
));
}
}
if count < card.min {
violations.push(cardinality_violation(
edge_table,
&edge_type.name,
src,
count,
"min",
card.min,
));
}
}
Ok(violations)
}
fn unique_violation(
table_key: &str,
columns: &[String],
key: &[String],
id: &str,
other: &str,
) -> Violation {
let type_name = table_key
.strip_prefix("node:")
.or_else(|| table_key.strip_prefix("edge:"))
.unwrap_or(table_key);
Violation {
table_key: table_key.to_string(),
row_id: Some(id.to_string()),
kind: MergeConflictKind::UniqueViolation,
message: format!(
"@unique violation on {type_name}.{}: value '{}' held by '{other}' and '{id}'",
format_tuple(columns),
format_tuple(key)
),
}
}
fn orphan_violation(
edge_table: &str,
edge_id: &str,
label: &str,
endpoint: &str,
node_type: &str,
) -> Violation {
Violation {
table_key: edge_table.to_string(),
row_id: Some(edge_id.to_string()),
kind: MergeConflictKind::OrphanEdge,
message: format!("{label} '{endpoint}' not found in {node_type}"),
}
}
fn cardinality_violation(
edge_table: &str,
edge_name: &str,
src: &str,
count: u32,
bound: &str,
limit: u32,
) -> Violation {
Violation {
table_key: edge_table.to_string(),
row_id: None,
kind: MergeConflictKind::CardinalityViolation,
message: format!(
"@card violation on edge {edge_name}: source '{src}' has {count} edges ({bound} {limit})"
),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use arrow_array::StringArray;
use arrow_schema::{DataType, Field, Schema};
use omnigraph_compiler::catalog::build_catalog;
use omnigraph_compiler::schema::parser::parse_schema;
const DOC_SCHEMA: &str = "node Doc {\n slug: String @key\n status: enum(draft, published)\n}\n";
fn catalog(src: &str) -> Catalog {
build_catalog(&parse_schema(src).unwrap()).unwrap()
}
fn status_change(values: &[&str]) -> ChangeSet {
let schema = Arc::new(Schema::new(vec![Field::new("status", DataType::Utf8, true)]));
let batch =
RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(values.to_vec())) as _])
.unwrap();
let mut change = TableChange::default();
change.changed.push(batch);
let mut cs = ChangeSet::new();
cs.insert("node:Doc".to_string(), change);
cs
}
#[test]
fn evaluator_flags_out_of_enum_value_in_delta() {
let v = evaluate_value_constraints(&status_change(&["bogus"]), &catalog(DOC_SCHEMA));
assert_eq!(v.len(), 1, "expected one enum violation, got {v:?}");
assert_eq!(v[0].kind, MergeConflictKind::ValueConstraintViolation);
assert!(v[0].message.contains("bogus"), "message was: {}", v[0].message);
}
#[test]
fn evaluator_accepts_valid_delta() {
assert!(
evaluate_value_constraints(&status_change(&["draft"]), &catalog(DOC_SCHEMA)).is_empty()
);
}
#[test]
fn evaluator_ignores_empty_changeset() {
assert!(evaluate_value_constraints(&ChangeSet::new(), &catalog(DOC_SCHEMA)).is_empty());
}
}