#![allow(clippy::expect_used, clippy::panic)]
use std::time::Instant;
use fathomdb_engine::{
ChunkPolicy, EngineRuntime, FtsPropertyPathSpec, NodeInsert, NodeRetire, ProvenanceMode,
RebuildMode, TelemetryLevel, WriteRequest,
};
use fathomdb_query::QueryBuilder;
use rusqlite::OptionalExtension as _;
fn open_engine(dir: &tempfile::TempDir) -> EngineRuntime {
EngineRuntime::open(
dir.path().join("test.db"),
ProvenanceMode::Warn,
None,
2,
TelemetryLevel::Counters,
None,
)
.expect("open engine")
}
fn make_write_request(label: &str, nodes: Vec<NodeInsert>) -> WriteRequest {
WriteRequest {
label: label.to_owned(),
nodes,
node_retires: vec![],
edges: vec![],
edge_retires: vec![],
chunks: vec![],
runs: vec![],
steps: vec![],
actions: vec![],
optional_backfills: vec![],
vec_inserts: vec![],
operational_writes: vec![],
}
}
#[test]
fn eager_register_returns_schema_record() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
let svc = engine.admin().service();
let record = svc
.register_fts_property_schema_with_entries(
"Meeting",
&[FtsPropertyPathSpec::scalar("$.title")],
None,
&[],
RebuildMode::Eager,
)
.expect("register eager");
assert_eq!(record.kind, "Meeting");
assert_eq!(record.property_paths, vec!["$.title"]);
}
#[test]
fn async_register_is_fast() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
let svc = engine.admin().service();
let start = Instant::now();
let record = svc
.register_fts_property_schema_with_entries(
"Meeting",
&[FtsPropertyPathSpec::scalar("$.title")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
let elapsed = start.elapsed();
assert_eq!(record.kind, "Meeting");
assert!(
elapsed.as_millis() < 500,
"async register took {}ms, expected <500ms",
elapsed.as_millis()
);
}
#[test]
fn async_register_creates_rebuild_state_row() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"Task",
&[FtsPropertyPathSpec::scalar("$.name")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
std::thread::sleep(std::time::Duration::from_millis(200));
let state = svc
.get_property_fts_rebuild_state("Task")
.expect("get state");
assert!(
state.is_some(),
"expected rebuild state row for 'Task' after async register"
);
}
#[test]
fn async_rebuild_populates_staging_table() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
for i in 0..5u32 {
engine
.writer()
.submit(make_write_request(
&format!("seed-{i}"),
vec![NodeInsert {
row_id: format!("r{i}"),
logical_id: format!("note:{i}"),
kind: "Note".to_owned(),
properties: format!(r#"{{"body":"hello {i}"}}"#),
source_ref: Some("test".to_owned()),
upsert: false,
chunk_policy: ChunkPolicy::Preserve,
content_ref: None,
}],
))
.expect("write node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"Note",
&[FtsPropertyPathSpec::scalar("$.body")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
let deadline = Instant::now() + std::time::Duration::from_secs(5);
loop {
std::thread::sleep(std::time::Duration::from_millis(50));
let state = svc
.get_property_fts_rebuild_state("Note")
.expect("get state");
let done = state
.as_ref()
.is_some_and(|s| s.state == "SWAPPING" || s.state == "COMPLETE");
if done {
break;
}
assert!(
Instant::now() <= deadline,
"rebuild did not reach SWAPPING within 5s, state={:?}",
svc.get_property_fts_rebuild_state("Note")
);
}
let state = svc
.get_property_fts_rebuild_state("Note")
.expect("get state")
.expect("state row must exist");
if state.state == "SWAPPING" {
let count = svc.count_staging_rows("Note").expect("count staging rows");
assert_eq!(
count, 5,
"expected 5 staging rows for 'Note' during SWAPPING, got {count}"
);
} else {
let conn =
rusqlite::Connection::open(dir.path().join("test.db")).expect("open raw connection");
let note_table = fathomdb_schema::fts_kind_table_name("Note");
let count: i64 = conn
.query_row(&format!("SELECT count(*) FROM {note_table}"), [], |r| {
r.get(0)
})
.expect("count fts rows");
assert_eq!(
count, 5,
"expected 5 {note_table} rows for 'Note' after swap, got {count}"
);
}
}
#[test]
fn engine_shutdown_is_clean() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
{
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"Foo",
&[FtsPropertyPathSpec::scalar("$.x")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
}
drop(engine);
}
fn make_retire_request(label: &str, logical_id: &str) -> WriteRequest {
WriteRequest {
label: label.to_owned(),
nodes: vec![],
node_retires: vec![NodeRetire {
logical_id: logical_id.to_owned(),
source_ref: Some("test".to_owned()),
}],
edges: vec![],
edge_retires: vec![],
chunks: vec![],
runs: vec![],
steps: vec![],
actions: vec![],
optional_backfills: vec![],
vec_inserts: vec![],
operational_writes: vec![],
}
}
fn make_node(id: &str, kind: &str, props: &str) -> NodeInsert {
NodeInsert {
row_id: format!("row-{id}"),
logical_id: id.to_owned(),
kind: kind.to_owned(),
properties: props.to_owned(),
source_ref: Some("test".to_owned()),
upsert: false,
chunk_policy: ChunkPolicy::Preserve,
content_ref: None,
}
}
fn wait_for_state(
svc: &fathomdb_engine::AdminService,
kind: &str,
targets: &[&str],
deadline_secs: u64,
) -> String {
let deadline = Instant::now() + std::time::Duration::from_secs(deadline_secs);
loop {
std::thread::sleep(std::time::Duration::from_millis(20));
let state = svc
.get_property_fts_rebuild_state(kind)
.expect("get_property_fts_rebuild_state");
if let Some(row) = state
&& targets.iter().any(|t| row.state == *t)
{
return row.state.clone();
}
assert!(
Instant::now() <= deadline,
"timed out waiting for kind={kind} to reach state in {targets:?}"
);
}
}
#[test]
fn write_during_rebuild_populates_staging() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"Ticket",
&[FtsPropertyPathSpec::scalar("$.title")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
wait_for_state(&svc, "Ticket", &["BUILDING", "SWAPPING", "COMPLETE"], 5);
let new_node = make_node("ticket:1", "Ticket", r#"{"title":"urgent bug"}"#);
engine
.writer()
.submit(make_write_request("w1", vec![new_node]))
.expect("write node");
let current_state = svc
.get_property_fts_rebuild_state("Ticket")
.expect("get state")
.expect("state row must exist");
if current_state.state == "BUILDING" || current_state.state == "SWAPPING" {
let in_staging = svc
.staging_row_exists("Ticket", "ticket:1")
.expect("staging_row_exists");
assert!(
in_staging,
"expected ticket:1 to be in staging table during rebuild"
);
} else {
let compiled = QueryBuilder::nodes("Ticket")
.text_search("urgent", 10)
.limit(10)
.compile()
.expect("compile");
let rows = engine
.coordinator()
.execute_compiled_read(&compiled)
.expect("query after rebuild");
assert!(
!rows.nodes.is_empty(),
"ticket:1 should be findable via FTS after rebuild COMPLETE"
);
}
}
#[test]
fn delete_during_rebuild_removes_from_both_tables() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
let node = make_node("ticket:del", "TicketDel", r#"{"title":"to be deleted"}"#);
engine
.writer()
.submit(make_write_request("seed", vec![node]))
.expect("write seed node");
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"TicketDel",
&[FtsPropertyPathSpec::scalar("$.title")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
let deadline = Instant::now() + std::time::Duration::from_secs(5);
loop {
std::thread::sleep(std::time::Duration::from_millis(20));
let in_staging = svc
.staging_row_exists("TicketDel", "ticket:del")
.expect("staging_row_exists");
if in_staging {
break;
}
let state = svc
.get_property_fts_rebuild_state("TicketDel")
.expect("get state");
if state.as_ref().is_some_and(|s| s.state == "COMPLETE") {
break;
}
assert!(
Instant::now() <= deadline,
"ticket:del never appeared in staging and rebuild never completed within 5s"
);
}
engine
.writer()
.submit(make_retire_request("retire-del", "ticket:del"))
.expect("retire node");
let in_staging = svc
.staging_row_exists("TicketDel", "ticket:del")
.expect("staging_row_exists after retire");
assert!(
!in_staging,
"ticket:del should not be in staging after retire"
);
let conn = rusqlite::Connection::open(dir.path().join("test.db")).expect("open raw connection");
let ticketdel_table = fathomdb_schema::fts_kind_table_name("TicketDel");
let table_exists: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
rusqlite::params![ticketdel_table],
|_| Ok(true),
)
.optional()
.expect("sqlite_master query")
.unwrap_or(false);
let live_count: i64 = if table_exists {
conn.query_row(
&format!("SELECT count(*) FROM {ticketdel_table} WHERE node_logical_id = 'ticket:del'"),
[],
|r| r.get(0),
)
.expect("count live fts")
} else {
0
};
assert_eq!(
live_count, 0,
"ticket:del should be removed from {ticketdel_table} after retire"
);
}
#[test]
fn read_during_first_registration_uses_scan_fallback() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
for i in 0..3u32 {
let node = make_node(
&format!("note:{i}"),
"ScanNote",
&format!(r#"{{"body":"findme note {i}"}}"#),
);
engine
.writer()
.submit(make_write_request(&format!("seed-{i}"), vec![node]))
.expect("write node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"ScanNote",
&[FtsPropertyPathSpec::scalar("$.body")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
let state = svc
.get_property_fts_rebuild_state("ScanNote")
.expect("get state")
.expect("state must exist");
assert!(
state.is_first_registration,
"expected is_first_registration=true for first async registration"
);
let compiled = QueryBuilder::nodes("ScanNote")
.text_search("findme", 10)
.limit(10)
.compile()
.expect("compiled query");
let rows = engine
.coordinator()
.execute_compiled_read(&compiled)
.expect("execute read during first-registration rebuild");
assert!(
!rows.nodes.is_empty(),
"scan fallback should return nodes during first-registration rebuild, got 0 results"
);
assert_eq!(
rows.nodes.len(),
3,
"scan fallback should return all 3 matching nodes"
);
}
#[test]
fn read_during_re_registration_uses_live_fts_table() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"ReregKind",
&[FtsPropertyPathSpec::scalar("$.title")],
None,
&[],
RebuildMode::Eager,
)
.expect("register eager");
for i in 0..3u32 {
let node = make_node(
&format!("rereg:{i}"),
"ReregKind",
&format!(r#"{{"title":"rereg node {i}"}}"#),
);
engine
.writer()
.submit(make_write_request(&format!("seed-{i}"), vec![node]))
.expect("write node");
}
svc.register_fts_property_schema_with_entries(
"ReregKind",
&[FtsPropertyPathSpec::scalar("$.title")],
None,
&[],
RebuildMode::Async,
)
.expect("re-register async");
let state = svc
.get_property_fts_rebuild_state("ReregKind")
.expect("get state")
.expect("state must exist");
assert!(
!state.is_first_registration,
"expected is_first_registration=false for re-registration"
);
let compiled = QueryBuilder::nodes("ReregKind")
.text_search("rereg", 10)
.limit(10)
.compile()
.expect("compiled query");
let rows = engine
.coordinator()
.execute_compiled_read(&compiled)
.expect("execute read during re-registration rebuild");
assert_eq!(
rows.nodes.len(),
3,
"re-registration should use live FTS table and return all 3 nodes"
);
}
#[test]
fn read_during_shape_incompatible_re_registration_sees_empty_table() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"IncompatKind",
&[FtsPropertyPathSpec::scalar("$.title")],
None,
&[],
RebuildMode::Eager,
)
.expect("register eager");
let mut nodes = Vec::with_capacity(200);
for i in 0..200u32 {
nodes.push(make_node(
&format!("incompat:{i}"),
"IncompatKind",
&format!(r#"{{"title":"incompat node {i}"}}"#),
));
}
engine
.writer()
.submit(make_write_request("seed-incompat", nodes))
.expect("write nodes");
svc.register_fts_property_schema_with_entries(
"IncompatKind",
&[FtsPropertyPathSpec::scalar("$.title").with_weight(1.0)],
None,
&[],
RebuildMode::Async,
)
.expect("re-register async (shape-incompatible)");
let state = svc
.get_property_fts_rebuild_state("IncompatKind")
.expect("get state")
.expect("state must exist");
assert!(
!state.is_first_registration,
"expected is_first_registration=false for re-registration"
);
let compiled = QueryBuilder::nodes("IncompatKind")
.text_search("incompat", 10)
.limit(10)
.compile()
.expect("compiled query");
let rows = engine
.coordinator()
.execute_compiled_read(&compiled)
.expect("execute read during shape-incompatible re-registration rebuild");
let state_now = svc
.get_property_fts_rebuild_state("IncompatKind")
.expect("get state")
.expect("state exists")
.state;
if state_now != "COMPLETE" {
assert_eq!(
rows.nodes.len(),
0,
"shape-incompatible re-registration should present an empty FTS table during rebuild; \
got {} rows (state={})",
rows.nodes.len(),
state_now
);
}
}
#[test]
fn async_rebuild_completes_and_queries_new_schema() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
for i in 0..3u32 {
engine
.writer()
.submit(make_write_request(
&format!("seed-{i}"),
vec![make_node(
&format!("article:{i}"),
"Article",
&format!(r#"{{"headline":"searchable headline {i}"}}"#),
)],
))
.expect("write node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"Article",
&[FtsPropertyPathSpec::scalar("$.headline")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
wait_for_state(&svc, "Article", &["COMPLETE"], 10);
let compiled = QueryBuilder::nodes("Article")
.text_search("searchable", 10)
.limit(10)
.compile()
.expect("compile");
let rows = engine
.coordinator()
.execute_compiled_read(&compiled)
.expect("execute read after rebuild COMPLETE");
assert_eq!(
rows.nodes.len(),
3,
"FTS query after rebuild COMPLETE should return all 3 nodes, got {}",
rows.nodes.len()
);
}
#[test]
fn crash_recovery_mid_building_marks_failed() {
let dir = tempfile::tempdir().expect("temp dir");
let db_path = dir.path().join("test.db");
{
let engine = EngineRuntime::open(
&db_path,
ProvenanceMode::Warn,
None,
2,
TelemetryLevel::Counters,
None,
)
.expect("open engine");
for i in 0..5u32 {
engine
.writer()
.submit(make_write_request(
&format!("seed-{i}"),
vec![make_node(
&format!("crash:{i}"),
"CrashKind",
&format!(r#"{{"note":"note {i}"}}"#),
)],
))
.expect("write node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"CrashKind",
&[FtsPropertyPathSpec::scalar("$.note")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
wait_for_state(&svc, "CrashKind", &["BUILDING", "SWAPPING", "COMPLETE"], 5);
}
{
let raw_conn = rusqlite::Connection::open(&db_path).expect("open raw");
raw_conn
.execute(
"UPDATE fts_property_rebuild_state SET state = 'BUILDING' WHERE kind = 'CrashKind'",
[],
)
.expect("force BUILDING state");
raw_conn
.execute(
"INSERT OR IGNORE INTO fts_property_rebuild_staging \
(kind, node_logical_id, text_content) VALUES ('CrashKind', 'fake:1', 'fake')",
[],
)
.expect("insert fake staging row");
}
let engine2 = EngineRuntime::open(
&db_path,
ProvenanceMode::Warn,
None,
2,
TelemetryLevel::Counters,
None,
)
.expect("reopen engine");
let svc2 = engine2.admin().service();
let state = svc2
.get_property_fts_rebuild_state("CrashKind")
.expect("get state after reopen")
.expect("state row must exist");
assert_eq!(
state.state, "FAILED",
"crash recovery should mark interrupted rebuild as FAILED, got '{}'",
state.state
);
let staging_count = svc2
.count_staging_rows("CrashKind")
.expect("count staging rows");
assert_eq!(
staging_count, 0,
"crash recovery should clean up staging table, got {staging_count} rows"
);
}
#[test]
fn get_property_fts_rebuild_progress_returns_progress() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
for i in 0..3u32 {
engine
.writer()
.submit(make_write_request(
&format!("seed-{i}"),
vec![make_node(
&format!("prog:{i}"),
"ProgKind",
&format!(r#"{{"text":"progress text {i}"}}"#),
)],
))
.expect("write node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"ProgKind",
&[FtsPropertyPathSpec::scalar("$.text")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
let deadline = Instant::now() + std::time::Duration::from_secs(10);
loop {
std::thread::sleep(std::time::Duration::from_millis(20));
let progress = engine
.coordinator()
.get_property_fts_rebuild_progress("ProgKind")
.expect("get_property_fts_rebuild_progress");
if let Some(p) = progress
&& p.state == "COMPLETE"
{
assert!(
p.rows_done > 0,
"rows_done should be > 0, got {}",
p.rows_done
);
assert!(
p.started_at > 0,
"started_at should be a unix millis timestamp > 0"
);
break;
}
assert!(
Instant::now() <= deadline,
"rebuild did not reach COMPLETE within 10s"
);
}
}
#[test]
fn rebuild_completes_and_fts_table_is_queryable() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
for i in 0..4u32 {
engine
.writer()
.submit(make_write_request(
&format!("seed-{i}"),
vec![make_node(
&format!("doc:{i}"),
"DocKind",
&format!(r#"{{"content":"important document {i}"}}"#),
)],
))
.expect("write node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"DocKind",
&[FtsPropertyPathSpec::scalar("$.content")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
wait_for_state(&svc, "DocKind", &["COMPLETE"], 10);
let conn = rusqlite::Connection::open(dir.path().join("test.db")).expect("open raw connection");
let dockind_table = fathomdb_schema::fts_kind_table_name("DocKind");
let fts_count: i64 = conn
.query_row(&format!("SELECT count(*) FROM {dockind_table}"), [], |r| {
r.get(0)
})
.expect("count fts rows");
assert_eq!(
fts_count, 4,
"expected 4 {dockind_table} rows after rebuild, got {fts_count}"
);
let staging_count = svc.count_staging_rows("DocKind").expect("count staging");
assert_eq!(
staging_count, 0,
"staging should be empty after COMPLETE swap, got {staging_count}"
);
let compiled = QueryBuilder::nodes("DocKind")
.text_search("important", 10)
.limit(10)
.compile()
.expect("compile query");
let rows = engine
.coordinator()
.execute_compiled_read(&compiled)
.expect("execute text search");
assert_eq!(
rows.nodes.len(),
4,
"text search after rebuild should return all 4 nodes, got {}",
rows.nodes.len()
);
}
#[test]
fn concurrent_writes_during_rebuild_are_indexed_in_final_fts() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
for i in 0..5u32 {
engine
.writer()
.submit(make_write_request(
&format!("seed-{i}"),
vec![make_node(
&format!("concurrent:{i}"),
"ConcKind",
&format!(r#"{{"msg":"concurrent message {i}"}}"#),
)],
))
.expect("write seed node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"ConcKind",
&[FtsPropertyPathSpec::scalar("$.msg")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
for _ in 0..200 {
let p = engine
.coordinator()
.get_property_fts_rebuild_progress("ConcKind")
.expect("get_property_fts_rebuild_progress")
.expect("rebuild state row should exist");
if p.state == "BUILDING" {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
for i in 5..8u32 {
engine
.writer()
.submit(make_write_request(
&format!("during-{i}"),
vec![make_node(
&format!("concurrent:{i}"),
"ConcKind",
&format!(r#"{{"msg":"concurrent message {i}"}}"#),
)],
))
.expect("write node during rebuild");
}
wait_for_state(&svc, "ConcKind", &["COMPLETE"], 10);
let compiled = QueryBuilder::nodes("ConcKind")
.text_search("concurrent", 10)
.limit(20)
.compile()
.expect("compile query");
let rows = engine
.coordinator()
.execute_compiled_read(&compiled)
.expect("execute text search");
assert_eq!(
rows.nodes.len(),
8,
"all 8 nodes (5 seeded + 3 written during rebuild) should be in FTS after COMPLETE, got {}",
rows.nodes.len()
);
}
#[test]
fn rebuild_progress_transitions_through_states() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
for i in 0..5u32 {
engine
.writer()
.submit(make_write_request(
&format!("seed-{i}"),
vec![make_node(
&format!("trans:{i}"),
"TransKind",
&format!(r#"{{"label":"transition label {i}"}}"#),
)],
))
.expect("write node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"TransKind",
&[FtsPropertyPathSpec::scalar("$.label")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
let mut observed: Vec<String> = Vec::new();
let deadline = Instant::now() + std::time::Duration::from_secs(10);
loop {
std::thread::sleep(std::time::Duration::from_millis(10));
let progress = engine
.coordinator()
.get_property_fts_rebuild_progress("TransKind")
.expect("get progress");
if let Some(p) = progress {
if observed.last().map(|s: &String| s.as_str()) != Some(p.state.as_str()) {
observed.push(p.state.clone());
}
if p.state == "COMPLETE" || p.state == "FAILED" {
break;
}
}
assert!(
Instant::now() <= deadline,
"rebuild did not reach a terminal state within 10s; observed states: {observed:?}"
);
}
assert_eq!(
observed.last().map(String::as_str),
Some("COMPLETE"),
"final state must be COMPLETE, observed: {observed:?}"
);
let expected_order = ["PENDING", "BUILDING", "SWAPPING", "COMPLETE"];
let mut last_pos: Option<usize> = None;
for state in &observed {
let pos = expected_order
.iter()
.position(|&s| s == state.as_str())
.unwrap_or_else(|| panic!("unexpected state observed: {state}"));
if let Some(prev) = last_pos {
assert!(
pos >= prev,
"state ordering violated: saw {state} (pos {pos}) after pos {prev}; full sequence: {observed:?}"
);
}
last_pos = Some(pos);
}
}
#[test]
fn re_registration_triggers_new_rebuild() {
let dir = tempfile::tempdir().expect("temp dir");
let engine = open_engine(&dir);
for i in 0..3u32 {
engine
.writer()
.submit(make_write_request(
&format!("seed-{i}"),
vec![make_node(
&format!("rereg2:{i}"),
"Rereg2Kind",
&format!(r#"{{"name":"rereg name {i}","tag":"uniquetag {i}"}}"#),
)],
))
.expect("write node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"Rereg2Kind",
&[FtsPropertyPathSpec::scalar("$.name")],
None,
&[],
RebuildMode::Async,
)
.expect("first register async");
wait_for_state(&svc, "Rereg2Kind", &["COMPLETE"], 10);
let tag_query = QueryBuilder::nodes("Rereg2Kind")
.text_search("uniquetag", 10)
.limit(10)
.compile()
.expect("compile query for tag");
let tag_rows_before = engine
.coordinator()
.execute_compiled_read(&tag_query)
.expect("search before second register");
assert_eq!(
tag_rows_before.nodes.len(),
0,
"$.tag should not be indexed after first registration (only $.name indexed)"
);
svc.register_fts_property_schema_with_entries(
"Rereg2Kind",
&[
FtsPropertyPathSpec::scalar("$.name"),
FtsPropertyPathSpec::scalar("$.tag"),
],
None,
&[],
RebuildMode::Async,
)
.expect("second register async");
wait_for_state(
&svc,
"Rereg2Kind",
&["PENDING", "BUILDING", "SWAPPING", "COMPLETE"],
5,
);
wait_for_state(&svc, "Rereg2Kind", &["COMPLETE"], 10);
let tag_rows_after = engine
.coordinator()
.execute_compiled_read(&tag_query)
.expect("search after second rebuild");
assert_eq!(
tag_rows_after.nodes.len(),
3,
"$.tag ('uniquetag') should be indexed after second rebuild, got {}",
tag_rows_after.nodes.len()
);
}
#[test]
fn crash_recovery_clears_staging_rows() {
let dir = tempfile::tempdir().expect("temp dir");
let db_path = dir.path().join("test.db");
{
let engine = EngineRuntime::open(
&db_path,
ProvenanceMode::Warn,
None,
2,
TelemetryLevel::Counters,
None,
)
.expect("open engine");
for i in 0..4u32 {
engine
.writer()
.submit(make_write_request(
&format!("seed-{i}"),
vec![make_node(
&format!("clr:{i}"),
"ClrKind",
&format!(r#"{{"data":"clr data {i}"}}"#),
)],
))
.expect("write node");
}
let svc = engine.admin().service();
svc.register_fts_property_schema_with_entries(
"ClrKind",
&[FtsPropertyPathSpec::scalar("$.data")],
None,
&[],
RebuildMode::Async,
)
.expect("register async");
wait_for_state(&svc, "ClrKind", &["BUILDING", "SWAPPING", "COMPLETE"], 5);
}
{
let raw_conn = rusqlite::Connection::open(&db_path).expect("open raw connection");
raw_conn
.execute(
"UPDATE fts_property_rebuild_state SET state = 'BUILDING' WHERE kind = 'ClrKind'",
[],
)
.expect("force BUILDING state");
raw_conn
.execute(
"INSERT OR IGNORE INTO fts_property_rebuild_staging \
(kind, node_logical_id, text_content) VALUES ('ClrKind', 'fake:crash', 'leftover')",
[],
)
.expect("insert fake staging row");
}
let engine2 = EngineRuntime::open(
&db_path,
ProvenanceMode::Warn,
None,
2,
TelemetryLevel::Counters,
None,
)
.expect("reopen engine");
let svc2 = engine2.admin().service();
let staging_count = svc2
.count_staging_rows("ClrKind")
.expect("count staging rows after recovery");
assert_eq!(
staging_count, 0,
"crash recovery must clear all staging rows for 'ClrKind', got {staging_count}"
);
let state = svc2
.get_property_fts_rebuild_state("ClrKind")
.expect("get state after recovery")
.expect("state row must exist");
assert_eq!(
state.state, "FAILED",
"crash recovery must mark 'ClrKind' as FAILED, got '{}'",
state.state
);
}