use contextdb_core::{Lsn, RowId, Value, VectorIndexRef};
use contextdb_engine::Database;
use contextdb_engine::sync_types::{ChangeSet, VectorChange};
use contextdb_server::protocol::WireChangeSet;
use std::collections::HashMap;
use uuid::Uuid;
fn params(pairs: Vec<(&str, Value)>) -> HashMap<String, Value> {
pairs.into_iter().map(|(k, v)| (k.to_string(), v)).collect()
}
fn explain_binds_vector_index(trace: &str, table: &str, column: &str) -> bool {
let Some(vector_pos) = trace.find("VectorSearch") else {
return false;
};
let vector_operator = &trace[vector_pos..];
let vector_header = vector_operator
.split("candidates:")
.next()
.unwrap_or(vector_operator);
let debug_table = format!("table: \"{table}\"");
let debug_column = format!("column: \"{column}\"");
let legacy_pair = format!("VectorSearch(table={table}, column={column}");
(vector_header.contains(&debug_table) && vector_header.contains(&debug_column))
|| trace.contains(&legacy_pair)
}
#[test]
fn nv05_planner_emits_full_vector_index_ref_in_explain() {
let db = Database::open_memory();
db.execute(
"CREATE TABLE evidence (
id UUID PRIMARY KEY,
vector_text VECTOR(4),
vector_vision VECTOR(8)
)",
&HashMap::new(),
)
.expect("create evidence");
db.execute(
"CREATE TABLE digests (id UUID PRIMARY KEY, vector_text VECTOR(4))",
&HashMap::new(),
)
.expect("create digests");
let trace = db
.explain("SELECT id FROM evidence ORDER BY vector_text <=> $q LIMIT 5")
.expect("explain text query");
assert!(
explain_binds_vector_index(&trace, "evidence", "vector_text"),
"ORDER BY vector_text on evidence must produce VectorSearch(table=evidence, column=vector_text); got: {trace}"
);
assert!(
!trace.contains("column=embedding") && !trace.contains("column: \"embedding\""),
"planner must not hardcode column=embedding; got: {trace}"
);
let trace_vision = db
.explain("SELECT id FROM evidence ORDER BY vector_vision <=> $q LIMIT 5")
.expect("explain vision query");
assert!(
explain_binds_vector_index(&trace_vision, "evidence", "vector_vision"),
"ORDER BY vector_vision on evidence must produce VectorSearch(table=evidence, column=vector_vision); got: {trace_vision}"
);
let trace_digests = db
.explain("SELECT id FROM digests ORDER BY vector_text <=> $q LIMIT 5")
.expect("explain digests query");
assert!(
explain_binds_vector_index(&trace_digests, "digests", "vector_text"),
"ORDER BY vector_text on digests must produce VectorSearch(table=digests, column=vector_text) — \
a planner that hardcodes table=evidence (or always picks the first declared table) fails here; got: {trace_digests}"
);
assert_ne!(trace, trace_vision);
assert_ne!(trace, trace_digests);
}
#[test]
fn nv07a_sync_vector_change_round_trip_preserves_table_column_identity() {
let original = ChangeSet {
rows: vec![],
edges: vec![],
vectors: vec![
VectorChange {
index: VectorIndexRef::new("evidence", "vector_text"),
row_id: RowId(7),
vector: vec![1.0, 0.0, 0.0, 0.0],
lsn: Lsn(42),
},
VectorChange {
index: VectorIndexRef::new("evidence", "vector_vision"),
row_id: RowId(7),
vector: vec![0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
lsn: Lsn(42),
},
],
ddl: vec![],
};
let wire: WireChangeSet = original.clone().into();
let bytes = rmp_serde::to_vec(&wire)
.expect("rmp_serde encode (matches protocol_newtype_roundtrip pattern)");
let decoded: WireChangeSet = rmp_serde::from_slice(&bytes).expect("rmp_serde decode");
let restored: ChangeSet = decoded.into();
assert_eq!(
restored.vectors[0].index,
VectorIndexRef::new("evidence", "vector_text")
);
assert_eq!(
restored.vectors[1].index,
VectorIndexRef::new("evidence", "vector_vision")
);
}
#[test]
fn nv07b_sync_apply_routes_to_matching_index() {
use contextdb_engine::sync_types::{
ChangeSet, ConflictPolicies, ConflictPolicy, NaturalKey, RowChange,
};
let receiver = Database::open_memory();
receiver
.execute(
"CREATE TABLE evidence (
id UUID PRIMARY KEY,
vector_text VECTOR(4),
vector_vision VECTOR(8)
)",
&HashMap::new(),
)
.expect("create");
let id = Uuid::new_v4();
let row_id = RowId(1);
let lsn = Lsn(10);
let mut row_values: HashMap<String, Value> = HashMap::new();
row_values.insert("id".into(), Value::Uuid(id));
let cs = ChangeSet {
rows: vec![RowChange {
table: "evidence".into(),
natural_key: NaturalKey {
column: "id".into(),
value: Value::Uuid(id),
},
values: row_values,
deleted: false,
lsn,
}],
edges: vec![],
vectors: vec![
VectorChange {
index: VectorIndexRef::new("evidence", "vector_text"),
row_id,
vector: vec![1.0, 0.0, 0.0, 0.0],
lsn,
},
VectorChange {
index: VectorIndexRef::new("evidence", "vector_vision"),
row_id,
vector: vec![0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
lsn,
},
],
ddl: vec![],
};
receiver
.apply_changes(cs, &ConflictPolicies::uniform(ConflictPolicy::ServerWins))
.expect("apply changeset");
let text_hits = receiver
.execute(
"SELECT id FROM evidence ORDER BY vector_text <=> $q LIMIT 1",
¶ms(vec![("q", Value::Vector(vec![1.0, 0.0, 0.0, 0.0]))]),
)
.expect("text search");
let id_idx = text_hits.columns.iter().position(|c| c == "id").unwrap();
assert_eq!(text_hits.rows.len(), 1);
assert_eq!(text_hits.rows[0][id_idx], Value::Uuid(id));
let vision_hits = receiver
.execute(
"SELECT id FROM evidence ORDER BY vector_vision <=> $q LIMIT 1",
¶ms(vec![(
"q",
Value::Vector(vec![0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
)]),
)
.expect("vision search");
assert_eq!(vision_hits.rows.len(), 1);
assert_eq!(vision_hits.rows[0][id_idx], Value::Uuid(id));
let mismatched = receiver.execute(
"SELECT id FROM evidence ORDER BY vector_vision <=> $q LIMIT 1",
¶ms(vec![("q", Value::Vector(vec![1.0, 0.0, 0.0, 0.0]))]),
);
assert!(matches!(
mismatched,
Err(contextdb_core::Error::VectorIndexDimensionMismatch { ref index, expected: 8, actual: 4 })
if *index == VectorIndexRef::new("evidence", "vector_vision")
));
}
#[test]
fn nv07c_receiver_promoted_to_leader_emits_correct_index_keyed_envelopes() {
use contextdb_engine::sync_types::{
ChangeSet, ConflictPolicies, ConflictPolicy, NaturalKey, RowChange,
};
let receiver = Database::open_memory();
receiver
.execute(
"CREATE TABLE evidence (
id UUID PRIMARY KEY,
vector_text VECTOR(4),
vector_vision VECTOR(8)
)",
&HashMap::new(),
)
.expect("create");
let id = Uuid::new_v4();
let lsn = Lsn(10);
let mut row_values: HashMap<String, Value> = HashMap::new();
row_values.insert("id".into(), Value::Uuid(id));
row_values.insert(
"vector_text".into(),
Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
);
row_values.insert(
"vector_vision".into(),
Value::Vector(vec![0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
);
let cs = ChangeSet {
rows: vec![RowChange {
table: "evidence".into(),
natural_key: NaturalKey {
column: "id".into(),
value: Value::Uuid(id),
},
values: row_values,
deleted: false,
lsn,
}],
edges: vec![],
vectors: vec![
VectorChange {
index: VectorIndexRef::new("evidence", "vector_text"),
row_id: RowId(1),
vector: vec![1.0, 0.0, 0.0, 0.0],
lsn,
},
VectorChange {
index: VectorIndexRef::new("evidence", "vector_vision"),
row_id: RowId(1),
vector: vec![0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
lsn,
},
],
ddl: vec![],
};
receiver
.apply_changes(cs, &ConflictPolicies::uniform(ConflictPolicy::ServerWins))
.expect("apply");
let leader_lsn_before = receiver.current_lsn();
let new_id = Uuid::new_v4();
let new_text_vec = vec![0.0_f32, 0.0, 1.0, 0.0];
let new_vision_vec = vec![0.0_f32, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0];
receiver
.execute(
"INSERT INTO evidence (id, vector_text, vector_vision) VALUES ($id, $t, $v)",
&HashMap::from([
("id".to_string(), Value::Uuid(new_id)),
("t".to_string(), Value::Vector(new_text_vec.clone())),
("v".to_string(), Value::Vector(new_vision_vec.clone())),
]),
)
.expect("native insert on promoted leader");
let outgoing = receiver.changes_since(leader_lsn_before);
let by_index: std::collections::HashSet<&VectorIndexRef> =
outgoing.vectors.iter().map(|c| &c.index).collect();
assert!(
by_index.contains(&VectorIndexRef::new("evidence", "vector_text")),
"promoted-leader changeset must carry the vector_text envelope explicitly"
);
assert!(
by_index.contains(&VectorIndexRef::new("evidence", "vector_vision")),
"promoted-leader changeset must carry the vector_vision envelope explicitly"
);
assert!(
outgoing
.vectors
.iter()
.all(|c| !c.index.table.is_empty() && !c.index.column.is_empty()),
"no envelope may carry a default VectorIndexRef in production state"
);
let outgoing_by_index: HashMap<VectorIndexRef, &VectorChange> = outgoing
.vectors
.iter()
.map(|change| (change.index.clone(), change))
.collect();
assert_eq!(
&outgoing_by_index
.get(&VectorIndexRef::new("evidence", "vector_text"))
.expect("outgoing vector_text change")
.vector,
&new_text_vec,
"promoted-leader vector_text envelope must carry the live vector_text payload"
);
assert_eq!(
&outgoing_by_index
.get(&VectorIndexRef::new("evidence", "vector_vision"))
.expect("outgoing vector_vision change")
.vector,
&new_vision_vec,
"promoted-leader vector_vision envelope must carry the live vector_vision payload"
);
let mut envelope_only = outgoing.clone();
for row in &mut envelope_only.rows {
row.values.remove("vector_text");
row.values.remove("vector_vision");
}
let downstream = Database::open_memory();
downstream
.execute(
"CREATE TABLE evidence (
id UUID PRIMARY KEY,
vector_text VECTOR(4),
vector_vision VECTOR(8)
)",
&HashMap::new(),
)
.expect("downstream create");
downstream
.apply_changes(
envelope_only,
&ConflictPolicies::uniform(ConflictPolicy::ServerWins),
)
.expect("downstream applies promoted-leader vector-envelope-only delta");
let text_hits = downstream
.execute(
"SELECT id FROM evidence ORDER BY vector_text <=> $q LIMIT 1",
¶ms(vec![("q", Value::Vector(new_text_vec))]),
)
.expect("downstream text search");
let id_idx = text_hits.columns.iter().position(|c| c == "id").unwrap();
assert_eq!(text_hits.rows[0][id_idx], Value::Uuid(new_id));
let vision_hits = downstream
.execute(
"SELECT id FROM evidence ORDER BY vector_vision <=> $q LIMIT 1",
¶ms(vec![("q", Value::Vector(new_vision_vec))]),
)
.expect("downstream vision search");
assert_eq!(
vision_hits.rows[0][id_idx],
Value::Uuid(new_id),
"downstream receiver must materialize promoted-leader vector_vision payload from the generated envelope"
);
}