Skip to main content

contextdb_engine/
database.rs

1use crate::composite_store::{ChangeLogEntry, CompositeStore};
2use crate::executor::execute_plan;
3use crate::persistence::RedbPersistence;
4use crate::persistent_store::PersistentCompositeStore;
5use crate::plugin::{
6    CommitEvent, CommitSource, CorePlugin, DatabasePlugin, PluginHealth, QueryOutcome,
7    SubscriptionMetrics,
8};
9use crate::rank_formula::{FormulaEvalError, RankFormula};
10use crate::schema_enforcer::validate_dml;
11use crate::sync_types::{
12    ApplyResult, ChangeSet, Conflict, ConflictPolicies, ConflictPolicy, DdlChange, EdgeChange,
13    NaturalKey, RowChange, VectorChange,
14};
15use contextdb_core::*;
16use contextdb_graph::{GraphStore, MemGraphExecutor};
17use contextdb_parser::Statement;
18use contextdb_parser::ast::{AlterAction, CreateTable, DataType};
19use contextdb_planner::PhysicalPlan;
20use contextdb_relational::{MemRelationalExecutor, RelationalStore};
21use contextdb_tx::{TxManager, WriteSetApplicator};
22use contextdb_vector::{HnswGraphStats, HnswIndex, MemVectorExecutor, VectorStore};
23use parking_lot::{Mutex, RwLock};
24use roaring::RoaringTreemap;
25use std::collections::{HashMap, HashSet, VecDeque};
26use std::path::Path;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
29use std::sync::{Arc, OnceLock};
30use std::thread::{self, JoinHandle};
31use std::time::{Duration, Instant};
32
33type DynStore = Box<dyn WriteSetApplicator>;
34const DEFAULT_SUBSCRIPTION_CAPACITY: usize = 64;
35const MAX_STATEMENT_CACHE_ENTRIES: usize = 1024;
36// redb may need a small metadata page on the next write, especially for a new
37// file with the format metadata table. Keep the disk-limit error deterministic
38// instead of starting a write that cannot commit cleanly.
39const MIN_DISK_WRITE_HEADROOM_BYTES: u64 = 1024;
40
41#[derive(Debug, Clone)]
42pub struct IndexCandidate {
43    pub name: String,
44    pub rejected_reason: std::borrow::Cow<'static, str>,
45}
46
47#[derive(Debug, Clone, Default)]
48pub struct QueryTrace {
49    pub physical_plan: &'static str,
50    pub index_used: Option<String>,
51    pub predicates_pushed: smallvec::SmallVec<[std::borrow::Cow<'static, str>; 4]>,
52    pub indexes_considered: smallvec::SmallVec<[IndexCandidate; 4]>,
53    pub sort_elided: bool,
54}
55
56impl QueryTrace {
57    /// Stub default: scan-labeled trace with no plan data. The no-op writes
58    /// this everywhere. Impl must replace construction sites with real plan
59    /// inspection.
60    pub fn scan() -> Self {
61        Self {
62            physical_plan: "Scan",
63            ..Default::default()
64        }
65    }
66}
67
68#[derive(Debug, Clone)]
69pub struct CascadeReport {
70    pub dropped_indexes: Vec<String>,
71}
72
73#[derive(Debug, Clone)]
74pub struct QueryResult {
75    pub columns: Vec<String>,
76    pub rows: Vec<Vec<Value>>,
77    pub rows_affected: u64,
78    pub trace: QueryTrace,
79    pub cascade: Option<CascadeReport>,
80}
81
82#[derive(Debug, Clone)]
83pub struct SemanticQuery {
84    pub table: String,
85    pub vector_column: String,
86    pub query: Vec<f32>,
87    pub limit: usize,
88    pub sort_key: Option<String>,
89    pub min_similarity: Option<f32>,
90    pub where_clause: Option<String>,
91}
92
93impl SemanticQuery {
94    pub fn new(
95        table: impl Into<String>,
96        vector_column: impl Into<String>,
97        query: Vec<f32>,
98        limit: usize,
99    ) -> Self {
100        Self {
101            table: table.into(),
102            vector_column: vector_column.into(),
103            query,
104            limit,
105            sort_key: None,
106            min_similarity: None,
107            where_clause: None,
108        }
109    }
110}
111
112#[derive(Debug, Clone, PartialEq)]
113pub struct SearchResult {
114    pub row_id: RowId,
115    pub values: HashMap<String, Value>,
116    pub vector_score: f32,
117    /// Always populated. Equals the formula's computed value when the search
118    /// uses a rank policy via `sort_key`, and equals `vector_score` (raw
119    /// cosine) in all other cases. Callers never unwrap this field.
120    pub rank: f32,
121}
122
123#[derive(Debug, Clone)]
124struct CachedStatement {
125    stmt: Statement,
126    plan: PhysicalPlan,
127}
128
129impl QueryResult {
130    pub fn empty() -> Self {
131        Self {
132            columns: vec![],
133            rows: vec![],
134            rows_affected: 0,
135            trace: QueryTrace::scan(),
136            cascade: None,
137        }
138    }
139
140    pub fn empty_with_affected(rows_affected: u64) -> Self {
141        Self {
142            columns: vec![],
143            rows: vec![],
144            rows_affected,
145            trace: QueryTrace::scan(),
146            cascade: None,
147        }
148    }
149}
150
151thread_local! {
152    static SNAPSHOT_OVERRIDE: std::cell::RefCell<Option<SnapshotId>> =
153        const { std::cell::RefCell::new(None) };
154}
155
156pub struct Database {
157    tx_mgr: Arc<TxManager<DynStore>>,
158    relational_store: Arc<RelationalStore>,
159    graph_store: Arc<GraphStore>,
160    vector_store: Arc<VectorStore>,
161    change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
162    ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
163    persistence: Option<Arc<RedbPersistence>>,
164    relational: MemRelationalExecutor<DynStore>,
165    graph: MemGraphExecutor<DynStore>,
166    vector: MemVectorExecutor<DynStore>,
167    session_tx: Mutex<Option<TxId>>,
168    instance_id: uuid::Uuid,
169    plugin: Arc<dyn DatabasePlugin>,
170    accountant: Arc<MemoryAccountant>,
171    conflict_policies: RwLock<ConflictPolicies>,
172    subscriptions: Mutex<SubscriptionState>,
173    pruning_runtime: Mutex<PruningRuntime>,
174    pruning_guard: Arc<Mutex<()>>,
175    disk_limit: AtomicU64,
176    disk_limit_startup_ceiling: AtomicU64,
177    sync_watermark: Arc<AtomicLsn>,
178    closed: AtomicBool,
179    rows_examined: AtomicU64,
180    statement_cache: RwLock<HashMap<String, Arc<CachedStatement>>>,
181    rank_formula_cache: RwLock<HashMap<(String, String), Arc<RankFormula>>>,
182    rank_policy_eval_count: AtomicU64,
183    rank_policy_formula_parse_count: AtomicU64,
184    corrupt_joined_values: RwLock<HashSet<(String, RowId, String)>>,
185}
186
187pub(crate) enum InsertRowResult {
188    Inserted(RowId),
189    NoOp,
190}
191
192#[derive(Debug, Default)]
193pub(crate) struct IndexScanTxOverlay {
194    pub deleted_row_ids: std::collections::HashSet<RowId>,
195    pub matching_inserts: Vec<VersionedRow>,
196}
197
198enum RowConstraintCheck {
199    Valid,
200    DuplicateUniqueNoOp,
201}
202
203enum ConstraintProbe {
204    NoIndex,
205    NoMatch,
206    Match(RowId),
207}
208
209impl std::fmt::Debug for Database {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        f.debug_struct("Database")
212            .field("instance_id", &self.instance_id)
213            .finish_non_exhaustive()
214    }
215}
216
217#[derive(Debug, Clone)]
218struct PropagationQueueEntry {
219    table: String,
220    uuid: uuid::Uuid,
221    target_state: String,
222    depth: u32,
223    abort_on_failure: bool,
224}
225
226#[derive(Debug, Clone, Copy)]
227struct PropagationSource<'a> {
228    table: &'a str,
229    uuid: uuid::Uuid,
230    state: &'a str,
231    depth: u32,
232}
233
234#[derive(Debug, Clone, Copy)]
235struct PropagationContext<'a> {
236    tx: TxId,
237    snapshot: SnapshotId,
238    metas: &'a HashMap<String, TableMeta>,
239}
240
241#[derive(Debug)]
242struct SubscriptionState {
243    subscribers: Vec<SyncSender<CommitEvent>>,
244    events_sent: u64,
245    events_dropped: u64,
246}
247
248impl SubscriptionState {
249    fn new() -> Self {
250        Self {
251            subscribers: Vec::new(),
252            events_sent: 0,
253            events_dropped: 0,
254        }
255    }
256}
257
258#[derive(Debug)]
259struct PruningRuntime {
260    shutdown: Arc<AtomicBool>,
261    handle: Option<JoinHandle<()>>,
262}
263
264impl PruningRuntime {
265    fn new() -> Self {
266        Self {
267            shutdown: Arc::new(AtomicBool::new(false)),
268            handle: None,
269        }
270    }
271}
272
273impl Database {
274    #[allow(clippy::too_many_arguments)]
275    fn build_db(
276        tx_mgr: Arc<TxManager<DynStore>>,
277        relational: Arc<RelationalStore>,
278        graph: Arc<GraphStore>,
279        vector_store: Arc<VectorStore>,
280        hnsw: Arc<OnceLock<parking_lot::RwLock<Option<HnswIndex>>>>,
281        change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
282        ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
283        persistence: Option<Arc<RedbPersistence>>,
284        plugin: Arc<dyn DatabasePlugin>,
285        accountant: Arc<MemoryAccountant>,
286        disk_limit: Option<u64>,
287        disk_limit_startup_ceiling: Option<u64>,
288    ) -> Self {
289        Self {
290            tx_mgr: tx_mgr.clone(),
291            relational_store: relational.clone(),
292            graph_store: graph.clone(),
293            vector_store: vector_store.clone(),
294            change_log,
295            ddl_log,
296            persistence,
297            relational: MemRelationalExecutor::new(relational, tx_mgr.clone()),
298            graph: MemGraphExecutor::new(graph, tx_mgr.clone()),
299            vector: MemVectorExecutor::new_with_accountant(
300                vector_store,
301                tx_mgr.clone(),
302                hnsw,
303                accountant.clone(),
304            ),
305            session_tx: Mutex::new(None),
306            instance_id: uuid::Uuid::new_v4(),
307            plugin,
308            accountant,
309            conflict_policies: RwLock::new(ConflictPolicies::uniform(ConflictPolicy::LatestWins)),
310            subscriptions: Mutex::new(SubscriptionState::new()),
311            pruning_runtime: Mutex::new(PruningRuntime::new()),
312            pruning_guard: Arc::new(Mutex::new(())),
313            disk_limit: AtomicU64::new(disk_limit.unwrap_or(0)),
314            disk_limit_startup_ceiling: AtomicU64::new(disk_limit_startup_ceiling.unwrap_or(0)),
315            sync_watermark: Arc::new(AtomicLsn::new(Lsn(0))),
316            closed: AtomicBool::new(false),
317            rows_examined: AtomicU64::new(0),
318            statement_cache: RwLock::new(HashMap::new()),
319            rank_formula_cache: RwLock::new(HashMap::new()),
320            rank_policy_eval_count: AtomicU64::new(0),
321            rank_policy_formula_parse_count: AtomicU64::new(0),
322            corrupt_joined_values: RwLock::new(HashSet::new()),
323        }
324    }
325
326    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
327        Self::open_with_config(
328            path,
329            Arc::new(CorePlugin),
330            Arc::new(MemoryAccountant::no_limit()),
331        )
332    }
333
334    pub fn open_memory() -> Self {
335        Self::open_memory_with_plugin_and_accountant(
336            Arc::new(CorePlugin),
337            Arc::new(MemoryAccountant::no_limit()),
338        )
339        .expect("failed to open in-memory database")
340    }
341
342    fn open_loaded(
343        path: impl AsRef<Path>,
344        plugin: Arc<dyn DatabasePlugin>,
345        mut accountant: Arc<MemoryAccountant>,
346        startup_disk_limit: Option<u64>,
347    ) -> Result<Self> {
348        let path = path.as_ref();
349        let persistence = if path.exists() {
350            Arc::new(RedbPersistence::open(path)?)
351        } else {
352            Arc::new(RedbPersistence::create(path)?)
353        };
354        if accountant.usage().limit.is_none()
355            && let Some(limit) = persistence.load_config_value::<usize>("memory_limit")?
356        {
357            accountant = Arc::new(MemoryAccountant::with_budget(limit));
358        }
359        let persisted_disk_limit = persistence.load_config_value::<u64>("disk_limit")?;
360        let startup_disk_ceiling = startup_disk_limit;
361        let effective_disk_limit = match (persisted_disk_limit, startup_disk_limit) {
362            (Some(persisted), Some(ceiling)) => Some(persisted.min(ceiling)),
363            (Some(persisted), None) => Some(persisted),
364            (None, Some(ceiling)) => Some(ceiling),
365            (None, None) => None,
366        };
367
368        let all_meta = persistence.load_all_table_meta()?;
369
370        let relational = Arc::new(RelationalStore::new());
371        for (name, meta) in &all_meta {
372            relational.create_table(name, meta.clone());
373            // Register EVERY index declared in TableMeta.indexes — this
374            // includes auto-indexes (kind=Auto) synthesized at CREATE TABLE
375            // time AND user-declared indexes (kind=UserDeclared).
376            for decl in &meta.indexes {
377                relational.create_index_storage(name, &decl.name, decl.columns.clone());
378            }
379            for row in persistence.load_relational_table(name)? {
380                relational.insert_loaded_row(name, row);
381            }
382        }
383
384        let graph = Arc::new(GraphStore::new());
385        for edge in persistence.load_forward_edges()? {
386            graph.insert_loaded_edge(edge);
387        }
388
389        let hnsw = Arc::new(OnceLock::new());
390        let vector = Arc::new(VectorStore::new(hnsw.clone()));
391        for (table_name, meta) in &all_meta {
392            for column in &meta.columns {
393                if let ColumnType::Vector(dimension) = column.column_type {
394                    vector.register_index(
395                        VectorIndexRef::new(table_name, column.name.clone()),
396                        dimension,
397                        column.quantization,
398                    );
399                }
400            }
401        }
402        let loaded_vectors = persistence.load_vectors()?;
403        for entry in &loaded_vectors {
404            vector.insert_loaded_vector(entry.clone());
405        }
406        hydrate_relational_vector_values(&relational, &loaded_vectors);
407
408        let max_row_id = relational.max_row_id();
409        let max_tx = max_tx_across_all(&relational, &graph, &vector);
410        let max_lsn = max_lsn_across_all(&relational, &graph, &vector);
411        relational.set_next_row_id(RowId(max_row_id.0.saturating_add(1)));
412
413        let change_log = Arc::new(RwLock::new(persistence.load_change_log()?));
414        let ddl_log = Arc::new(RwLock::new(persistence.load_ddl_log()?));
415        let composite = CompositeStore::new(
416            relational.clone(),
417            graph.clone(),
418            vector.clone(),
419            change_log.clone(),
420            ddl_log.clone(),
421        );
422        let persistent = PersistentCompositeStore::new(composite, persistence.clone());
423        let store: DynStore = Box::new(persistent);
424        let tx_mgr = Arc::new(TxManager::new_with_counters(
425            store,
426            TxId(max_tx.0.saturating_add(1)),
427            Lsn(max_lsn.0.saturating_add(1)),
428            max_tx,
429        ));
430
431        let db = Self::build_db(
432            tx_mgr,
433            relational,
434            graph,
435            vector,
436            hnsw,
437            change_log,
438            ddl_log,
439            Some(persistence),
440            plugin,
441            accountant,
442            effective_disk_limit,
443            startup_disk_ceiling,
444        );
445
446        for meta in all_meta.values() {
447            if !meta.dag_edge_types.is_empty() {
448                db.graph.register_dag_edge_types(&meta.dag_edge_types);
449            }
450        }
451        db.rebuild_rank_formula_cache_from_meta(&all_meta)?;
452
453        db.account_loaded_state()?;
454        maybe_prebuild_hnsw(&db.vector_store, db.accountant());
455
456        Ok(db)
457    }
458
459    fn open_memory_internal(
460        plugin: Arc<dyn DatabasePlugin>,
461        accountant: Arc<MemoryAccountant>,
462    ) -> Result<Self> {
463        let relational = Arc::new(RelationalStore::new());
464        let graph = Arc::new(GraphStore::new());
465        let hnsw = Arc::new(OnceLock::new());
466        let vector = Arc::new(VectorStore::new(hnsw.clone()));
467        let change_log = Arc::new(RwLock::new(Vec::new()));
468        let ddl_log = Arc::new(RwLock::new(Vec::new()));
469        let store: DynStore = Box::new(CompositeStore::new(
470            relational.clone(),
471            graph.clone(),
472            vector.clone(),
473            change_log.clone(),
474            ddl_log.clone(),
475        ));
476        let tx_mgr = Arc::new(TxManager::new(store));
477
478        let db = Self::build_db(
479            tx_mgr, relational, graph, vector, hnsw, change_log, ddl_log, None, plugin, accountant,
480            None, None,
481        );
482        maybe_prebuild_hnsw(&db.vector_store, db.accountant());
483        Ok(db)
484    }
485
486    pub fn begin(&self) -> TxId {
487        self.tx_mgr.begin()
488    }
489
490    pub fn commit(&self, tx: TxId) -> Result<()> {
491        self.commit_with_source(tx, CommitSource::User)
492    }
493
494    pub fn rollback(&self, tx: TxId) -> Result<()> {
495        let ws = self.tx_mgr.cloned_write_set(tx)?;
496        self.release_insert_allocations(&ws);
497        self.tx_mgr.rollback(tx)
498    }
499
500    pub fn snapshot(&self) -> SnapshotId {
501        self.tx_mgr.snapshot()
502    }
503
504    pub fn execute(&self, sql: &str, params: &HashMap<String, Value>) -> Result<QueryResult> {
505        if let Some(cached) = self.cached_statement(sql) {
506            let active_tx = *self.session_tx.lock();
507            return self.execute_statement_with_plan(
508                &cached.stmt,
509                sql,
510                params,
511                active_tx,
512                Some(&cached.plan),
513            );
514        }
515
516        let stmt = contextdb_parser::parse(sql)?;
517
518        match &stmt {
519            Statement::Begin => {
520                let mut session = self.session_tx.lock();
521                if session.is_none() {
522                    *session = Some(self.begin());
523                }
524                return Ok(QueryResult::empty());
525            }
526            Statement::Commit => {
527                let mut session = self.session_tx.lock();
528                if let Some(tx) = session.take() {
529                    self.commit_with_source(tx, CommitSource::User)?;
530                }
531                return Ok(QueryResult::empty());
532            }
533            Statement::Rollback => {
534                let mut session = self.session_tx.lock();
535                if let Some(tx) = *session {
536                    self.rollback(tx)?;
537                    *session = None;
538                }
539                return Ok(QueryResult::empty());
540            }
541            _ => {}
542        }
543
544        let active_tx = *self.session_tx.lock();
545        self.execute_statement(&stmt, sql, params, active_tx)
546    }
547
548    fn cached_statement(&self, sql: &str) -> Option<Arc<CachedStatement>> {
549        self.statement_cache.read().get(sql).cloned()
550    }
551
552    fn cache_statement_if_eligible(&self, sql: &str, stmt: &Statement, plan: &PhysicalPlan) {
553        if !Self::is_statement_cache_eligible(stmt, plan) {
554            return;
555        }
556
557        let mut cache = self.statement_cache.write();
558        if cache.contains_key(sql) {
559            return;
560        }
561        if cache.len() >= MAX_STATEMENT_CACHE_ENTRIES {
562            return;
563        }
564        cache.insert(
565            sql.to_string(),
566            Arc::new(CachedStatement {
567                stmt: stmt.clone(),
568                plan: plan.clone(),
569            }),
570        );
571    }
572
573    fn is_statement_cache_eligible(stmt: &Statement, plan: &PhysicalPlan) -> bool {
574        matches!((stmt, plan), (Statement::Insert(ins), PhysicalPlan::Insert(_))
575            if !ins.table.eq_ignore_ascii_case("GRAPH")
576                && !ins.table.eq_ignore_ascii_case("__edges"))
577    }
578
579    pub(crate) fn clear_statement_cache(&self) {
580        self.statement_cache.write().clear();
581    }
582
583    #[doc(hidden)]
584    pub fn __statement_cache_len(&self) -> usize {
585        self.statement_cache.read().len()
586    }
587
588    fn execute_autocommit(
589        &self,
590        plan: &PhysicalPlan,
591        params: &HashMap<String, Value>,
592    ) -> Result<QueryResult> {
593        // Reset per-query rows_examined once at the entry point so every
594        // sub-plan (union, CTE, subquery IndexScan) accumulates into the
595        // shared counter rather than overwriting prior counts.
596        self.__reset_rows_examined();
597        match plan {
598            PhysicalPlan::Insert(_) | PhysicalPlan::Delete(_) | PhysicalPlan::Update(_) => {
599                let tx = self.begin();
600                let result = execute_plan(self, plan, params, Some(tx));
601                match result {
602                    Ok(qr) => {
603                        self.commit_with_source(tx, CommitSource::AutoCommit)?;
604                        Ok(qr)
605                    }
606                    Err(e) => {
607                        let _ = self.rollback(tx);
608                        Err(e)
609                    }
610                }
611            }
612            _ => execute_plan(self, plan, params, None),
613        }
614    }
615
616    pub fn explain(&self, sql: &str) -> Result<String> {
617        let stmt = contextdb_parser::parse(sql)?;
618        let plan = contextdb_planner::plan(&stmt)?;
619        let vector_index = vector_index_from_plan(&plan);
620        if let Some(index) = &vector_index
621            && let Some(state) = self.vector_store.try_state(index)
622            && state.entry_count() >= 1000
623            && state.hnsw_len().is_none()
624        {
625            let query = vec![0.0_f32; state.dimension()];
626            let _ = self.query_vector_strict(index.clone(), &query, 1, None, self.snapshot());
627        }
628        let mut output = plan.explain();
629        let uses_hnsw = vector_index
630            .as_ref()
631            .is_some_and(|index| self.vector_store.has_hnsw_index_for(index));
632        if uses_hnsw {
633            output = output.replace("VectorSearch(", "HNSWSearch(");
634            output = output.replace("VectorSearch {", "HNSWSearch {");
635        } else {
636            output = output.replace("VectorSearch(", "VectorSearch(strategy=BruteForce, ");
637            output = output.replace("VectorSearch {", "VectorSearch { strategy: BruteForce,");
638        }
639        Ok(output)
640    }
641
642    pub fn execute_in_tx(
643        &self,
644        tx: TxId,
645        sql: &str,
646        params: &HashMap<String, Value>,
647    ) -> Result<QueryResult> {
648        let stmt = contextdb_parser::parse(sql)?;
649        self.execute_statement(&stmt, sql, params, Some(tx))
650    }
651
652    fn commit_with_source(&self, tx: TxId, source: CommitSource) -> Result<()> {
653        let mut ws = self.tx_mgr.cloned_write_set(tx)?;
654
655        if !ws.is_empty()
656            && let Err(err) = self.validate_foreign_keys_in_tx(tx)
657        {
658            let _ = self.rollback(tx);
659            return Err(err);
660        }
661
662        if !ws.is_empty()
663            && let Err(err) = self.plugin.pre_commit(&ws, source)
664        {
665            let _ = self.rollback(tx);
666            return Err(err);
667        }
668
669        let lsn = self.tx_mgr.commit_with_lsn(tx)?;
670
671        if !ws.is_empty() {
672            self.release_delete_allocations(&ws);
673            ws.stamp_lsn(lsn);
674            self.plugin.post_commit(&ws, source);
675            self.publish_commit_event_if_subscribers(&ws, source, lsn);
676        }
677
678        Ok(())
679    }
680
681    fn build_commit_event(
682        ws: &contextdb_tx::WriteSet,
683        source: CommitSource,
684        lsn: Lsn,
685    ) -> CommitEvent {
686        let mut tables_changed: Vec<String> = ws
687            .relational_inserts
688            .iter()
689            .map(|(table, _)| table.clone())
690            .chain(
691                ws.relational_deletes
692                    .iter()
693                    .map(|(table, _, _)| table.clone()),
694            )
695            .chain(
696                ws.vector_inserts
697                    .iter()
698                    .map(|entry| entry.index.table.clone()),
699            )
700            .chain(
701                ws.vector_deletes
702                    .iter()
703                    .map(|(index, _, _)| index.table.clone()),
704            )
705            .chain(
706                ws.vector_moves
707                    .iter()
708                    .map(|(index, _, _, _)| index.table.clone()),
709            )
710            .collect::<HashSet<_>>()
711            .into_iter()
712            .collect();
713        tables_changed.sort();
714
715        CommitEvent {
716            source,
717            lsn,
718            tables_changed,
719            row_count: ws.relational_inserts.len()
720                + ws.relational_deletes.len()
721                + ws.adj_inserts.len()
722                + ws.adj_deletes.len()
723                + ws.vector_inserts.len()
724                + ws.vector_deletes.len()
725                + ws.vector_moves.len(),
726        }
727    }
728
729    fn publish_commit_event_if_subscribers(
730        &self,
731        ws: &contextdb_tx::WriteSet,
732        source: CommitSource,
733        lsn: Lsn,
734    ) {
735        let mut subscriptions = self.subscriptions.lock();
736        if subscriptions.subscribers.is_empty() {
737            return;
738        }
739        let event = Self::build_commit_event(ws, source, lsn);
740        let subscribers = std::mem::take(&mut subscriptions.subscribers);
741        let mut live_subscribers = Vec::with_capacity(subscribers.len());
742
743        for sender in subscribers {
744            match sender.try_send(event.clone()) {
745                Ok(()) => {
746                    subscriptions.events_sent += 1;
747                    live_subscribers.push(sender);
748                }
749                Err(TrySendError::Full(_)) => {
750                    subscriptions.events_dropped += 1;
751                    live_subscribers.push(sender);
752                }
753                Err(TrySendError::Disconnected(_)) => {}
754            }
755        }
756
757        subscriptions.subscribers = live_subscribers;
758    }
759
760    fn stop_pruning_thread(&self) {
761        let handle = {
762            let mut runtime = self.pruning_runtime.lock();
763            runtime.shutdown.store(true, Ordering::SeqCst);
764            let handle = runtime.handle.take();
765            runtime.shutdown = Arc::new(AtomicBool::new(false));
766            handle
767        };
768
769        if let Some(handle) = handle {
770            let _ = handle.join();
771        }
772    }
773
774    fn execute_statement(
775        &self,
776        stmt: &Statement,
777        sql: &str,
778        params: &HashMap<String, Value>,
779        tx: Option<TxId>,
780    ) -> Result<QueryResult> {
781        self.execute_statement_with_plan(stmt, sql, params, tx, None)
782    }
783
784    fn execute_statement_with_plan(
785        &self,
786        stmt: &Statement,
787        sql: &str,
788        params: &HashMap<String, Value>,
789        tx: Option<TxId>,
790        cached_plan: Option<&PhysicalPlan>,
791    ) -> Result<QueryResult> {
792        self.plugin.on_query(sql)?;
793
794        if let Some(change) = self.ddl_change_for_statement(stmt).as_ref() {
795            self.plugin.on_ddl(change)?;
796        }
797
798        // Handle INSERT INTO GRAPH / __edges as a virtual table routing to the graph store.
799        if let Statement::Insert(ins) = stmt
800            && (ins.table.eq_ignore_ascii_case("GRAPH")
801                || ins.table.eq_ignore_ascii_case("__edges"))
802        {
803            return self.execute_graph_insert(ins, params, tx);
804        }
805
806        let started = Instant::now();
807        let result = (|| {
808            if let Some(plan) = cached_plan {
809                return self.run_planned_statement(stmt, plan, params, tx);
810            }
811
812            let (stmt, plan) = {
813                // Pre-resolve InSubquery expressions with CTE context before planning.
814                let stmt = self.pre_resolve_cte_subqueries(stmt, params, tx)?;
815                let plan = contextdb_planner::plan(&stmt)?;
816                self.cache_statement_if_eligible(sql, &stmt, &plan);
817                (stmt, plan)
818            };
819            self.run_planned_statement(&stmt, &plan, params, tx)
820        })();
821        let duration = started.elapsed();
822        let outcome = query_outcome_from_result(&result);
823        self.plugin.post_query(sql, duration, &outcome);
824        result.map(strip_internal_row_id)
825    }
826
827    fn run_planned_statement(
828        &self,
829        stmt: &Statement,
830        plan: &PhysicalPlan,
831        params: &HashMap<String, Value>,
832        tx: Option<TxId>,
833    ) -> Result<QueryResult> {
834        validate_dml(plan, self, params)?;
835        let result = match tx {
836            Some(tx) => {
837                // Reset rows_examined at the top of an in-tx statement so
838                // sub-plans accumulate rather than overwrite.
839                self.__reset_rows_examined();
840                execute_plan(self, plan, params, Some(tx))
841            }
842            None => self.execute_autocommit(plan, params),
843        };
844        if result.is_ok()
845            && let Statement::CreateTable(ct) = stmt
846            && !ct.dag_edge_types.is_empty()
847        {
848            self.graph.register_dag_edge_types(&ct.dag_edge_types);
849        }
850        result
851    }
852
853    /// Handle `INSERT INTO GRAPH (source_id, target_id, edge_type) VALUES (...)`.
854    fn execute_graph_insert(
855        &self,
856        ins: &contextdb_parser::ast::Insert,
857        params: &HashMap<String, Value>,
858        tx: Option<TxId>,
859    ) -> Result<QueryResult> {
860        use crate::executor::resolve_expr;
861
862        let col_index = |name: &str| {
863            ins.columns
864                .iter()
865                .position(|c| c.eq_ignore_ascii_case(name))
866        };
867        let source_idx = col_index("source_id")
868            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires source_id column".into()))?;
869        let target_idx = col_index("target_id")
870            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires target_id column".into()))?;
871        let edge_type_idx = col_index("edge_type")
872            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires edge_type column".into()))?;
873
874        let auto_commit = tx.is_none();
875        let tx = tx.unwrap_or_else(|| self.begin());
876        let mut count = 0u64;
877        for row_exprs in &ins.values {
878            let source = resolve_expr(&row_exprs[source_idx], params)?;
879            let target = resolve_expr(&row_exprs[target_idx], params)?;
880            let edge_type = resolve_expr(&row_exprs[edge_type_idx], params)?;
881
882            let source_uuid = match &source {
883                Value::Uuid(u) => *u,
884                Value::Text(t) => uuid::Uuid::parse_str(t)
885                    .map_err(|e| Error::PlanError(format!("invalid source_id uuid: {e}")))?,
886                _ => return Err(Error::PlanError("source_id must be UUID".into())),
887            };
888            let target_uuid = match &target {
889                Value::Uuid(u) => *u,
890                Value::Text(t) => uuid::Uuid::parse_str(t)
891                    .map_err(|e| Error::PlanError(format!("invalid target_id uuid: {e}")))?,
892                _ => return Err(Error::PlanError("target_id must be UUID".into())),
893            };
894            let edge_type_str = match &edge_type {
895                Value::Text(t) => t.clone(),
896                _ => return Err(Error::PlanError("edge_type must be TEXT".into())),
897            };
898
899            self.insert_edge(
900                tx,
901                source_uuid,
902                target_uuid,
903                edge_type_str,
904                Default::default(),
905            )?;
906            count += 1;
907        }
908
909        if auto_commit {
910            self.commit_with_source(tx, CommitSource::AutoCommit)?;
911        }
912
913        Ok(QueryResult::empty_with_affected(count))
914    }
915
916    fn ddl_change_for_statement(&self, stmt: &Statement) -> Option<DdlChange> {
917        match stmt {
918            Statement::CreateTable(ct) => Some(ddl_change_from_create_table(ct)),
919            Statement::DropTable(dt) => Some(DdlChange::DropTable {
920                name: dt.name.clone(),
921            }),
922            Statement::AlterTable(at) => {
923                let mut meta = self.table_meta(&at.table).unwrap_or_default();
924                // Simulate the alter action on a cloned meta to get post-alteration columns
925                match &at.action {
926                    AlterAction::AddColumn(col) => {
927                        meta.columns.push(contextdb_core::ColumnDef {
928                            name: col.name.clone(),
929                            column_type: crate::executor::map_column_type(&col.data_type),
930                            nullable: col.nullable,
931                            primary_key: col.primary_key,
932                            unique: col.unique,
933                            default: col
934                                .default
935                                .as_ref()
936                                .map(crate::executor::stored_default_expr),
937                            references: col.references.as_ref().map(|reference| {
938                                contextdb_core::ForeignKeyReference {
939                                    table: reference.table.clone(),
940                                    column: reference.column.clone(),
941                                }
942                            }),
943                            expires: col.expires,
944                            immutable: col.immutable,
945                            quantization: match col.quantization {
946                                contextdb_parser::ast::VectorQuantization::F32 => {
947                                    contextdb_core::VectorQuantization::F32
948                                }
949                                contextdb_parser::ast::VectorQuantization::SQ8 => {
950                                    contextdb_core::VectorQuantization::SQ8
951                                }
952                                contextdb_parser::ast::VectorQuantization::SQ4 => {
953                                    contextdb_core::VectorQuantization::SQ4
954                                }
955                            },
956                            rank_policy: col
957                                .rank_policy
958                                .as_deref()
959                                .map(crate::executor::map_rank_policy),
960                        });
961                        if col.expires {
962                            meta.expires_column = Some(col.name.clone());
963                        }
964                    }
965                    AlterAction::DropColumn {
966                        column: name,
967                        cascade: _,
968                    } => {
969                        meta.columns.retain(|c| c.name != *name);
970                        if meta.expires_column.as_deref() == Some(name.as_str()) {
971                            meta.expires_column = None;
972                        }
973                    }
974                    AlterAction::RenameColumn { from, to } => {
975                        if let Some(c) = meta.columns.iter_mut().find(|c| c.name == *from) {
976                            c.name = to.clone();
977                        }
978                        if meta.expires_column.as_deref() == Some(from.as_str()) {
979                            meta.expires_column = Some(to.clone());
980                        }
981                    }
982                    AlterAction::SetRetain {
983                        duration_seconds,
984                        sync_safe,
985                    } => {
986                        meta.default_ttl_seconds = Some(*duration_seconds);
987                        meta.sync_safe = *sync_safe;
988                    }
989                    AlterAction::DropRetain => {
990                        meta.default_ttl_seconds = None;
991                        meta.sync_safe = false;
992                    }
993                    AlterAction::SetSyncConflictPolicy(_) | AlterAction::DropSyncConflictPolicy => { /* handled in executor */
994                    }
995                }
996                Some(DdlChange::AlterTable {
997                    name: at.table.clone(),
998                    columns: meta
999                        .columns
1000                        .iter()
1001                        .map(|c| {
1002                            (
1003                                c.name.clone(),
1004                                sql_type_for_meta_column(c, &meta.propagation_rules),
1005                            )
1006                        })
1007                        .collect(),
1008                    constraints: create_table_constraints_from_meta(&meta),
1009                })
1010            }
1011            _ => None,
1012        }
1013    }
1014
1015    /// Pre-resolve InSubquery expressions within SELECT statements that have CTEs.
1016    /// This allows CTE-backed subqueries in WHERE clauses to be evaluated before planning.
1017    fn pre_resolve_cte_subqueries(
1018        &self,
1019        stmt: &Statement,
1020        params: &HashMap<String, Value>,
1021        tx: Option<TxId>,
1022    ) -> Result<Statement> {
1023        if let Statement::Select(sel) = stmt
1024            && !sel.ctes.is_empty()
1025            && sel.body.where_clause.is_some()
1026        {
1027            use crate::executor::resolve_in_subqueries_with_ctes;
1028            let resolved_where = sel
1029                .body
1030                .where_clause
1031                .as_ref()
1032                .map(|expr| resolve_in_subqueries_with_ctes(self, expr, params, tx, &sel.ctes))
1033                .transpose()?;
1034            let mut new_body = sel.body.clone();
1035            new_body.where_clause = resolved_where;
1036            Ok(Statement::Select(contextdb_parser::ast::SelectStatement {
1037                ctes: sel.ctes.clone(),
1038                body: new_body,
1039            }))
1040        } else {
1041            Ok(stmt.clone())
1042        }
1043    }
1044
1045    pub fn insert_row(
1046        &self,
1047        tx: TxId,
1048        table: &str,
1049        values: HashMap<ColName, Value>,
1050    ) -> Result<RowId> {
1051        // Statement-scoped bound: `Value::TxId(n)` must satisfy
1052        // `n <= max(committed_watermark, tx)` so writes inside an active
1053        // transaction can reference their own allocated TxId. The error,
1054        // when fired, still reports `committed_watermark` per plan B7.
1055        let values =
1056            self.coerce_row_for_insert(table, values, Some(self.committed_watermark()), Some(tx))?;
1057        self.validate_row_constraints(tx, table, &values, None)?;
1058        self.relational.insert(tx, table, values)
1059    }
1060
1061    /// UPDATE-aware insert: the UPDATE path first deletes the old row and
1062    /// then re-inserts. The constraint probe must skip the old row_id so the
1063    /// same PK does not self-collide. The old row's index entry still looks
1064    /// visible at the committed-watermark snapshot because its `deleted_tx`
1065    /// equals the current (uncommitted) `tx`.
1066    pub(crate) fn insert_row_replacing(
1067        &self,
1068        tx: TxId,
1069        table: &str,
1070        values: HashMap<ColName, Value>,
1071        old_row_id: RowId,
1072    ) -> Result<RowId> {
1073        let values =
1074            self.coerce_row_for_insert(table, values, Some(self.committed_watermark()), Some(tx))?;
1075        self.validate_row_constraints(tx, table, &values, Some(old_row_id))?;
1076        self.relational.insert(tx, table, values)
1077    }
1078
1079    /// Internal variant used by sync-apply: skips the TXID bound check because
1080    /// peer TxIds may legitimately exceed the local watermark. Still enforces
1081    /// wrong-variant + reverse-direction TXID column rules.
1082    pub(crate) fn insert_row_for_sync(
1083        &self,
1084        tx: TxId,
1085        table: &str,
1086        values: HashMap<ColName, Value>,
1087    ) -> Result<RowId> {
1088        let values = self.coerce_row_for_insert(table, values, None, None)?;
1089        self.validate_row_constraints(tx, table, &values, None)?;
1090        self.relational.insert(tx, table, values)
1091    }
1092
1093    pub(crate) fn upsert_row_for_sync(
1094        &self,
1095        tx: TxId,
1096        table: &str,
1097        conflict_col: &str,
1098        values: HashMap<ColName, Value>,
1099    ) -> Result<UpsertResult> {
1100        let values = self.coerce_row_for_insert(table, values, None, None)?;
1101        self.upsert_row(tx, table, conflict_col, values)
1102    }
1103
1104    /// Route each row cell through `coerce_value_for_column` for variant
1105    /// compatibility. The one concession to historical `insert_row` behavior
1106    /// is that `Value::Vector` payloads are accepted regardless of declared
1107    /// dimension — prior integration suites (e.g. the 3-component probe into
1108    /// a VECTOR(384) embedding column) depend on the library API NOT enforcing
1109    /// dim equality there. SQL execution (`exec_insert`/`exec_update`) still
1110    /// performs the full dim check because it always threads through the
1111    /// executor module's coercion helpers.
1112    fn coerce_row_for_insert(
1113        &self,
1114        table: &str,
1115        values: HashMap<ColName, Value>,
1116        current_tx_max: Option<TxId>,
1117        active_tx: Option<TxId>,
1118    ) -> Result<HashMap<ColName, Value>> {
1119        let meta = self.table_meta(table);
1120        let mut out: HashMap<ColName, Value> = HashMap::with_capacity(values.len());
1121        for (col, v) in values {
1122            // Vector + Value::Vector: pass straight through (dim check happens on SQL path).
1123            let is_vector_bypass = matches!(&v, Value::Vector(_))
1124                && meta
1125                    .as_ref()
1126                    .and_then(|m| m.columns.iter().find(|c| c.name == col))
1127                    .map(|c| matches!(c.column_type, contextdb_core::ColumnType::Vector(_)))
1128                    .unwrap_or(false);
1129
1130            let coerced = if is_vector_bypass {
1131                v
1132            } else {
1133                crate::executor::coerce_into_column(
1134                    self,
1135                    table,
1136                    &col,
1137                    v,
1138                    current_tx_max,
1139                    active_tx,
1140                )?
1141            };
1142            out.insert(col, coerced);
1143        }
1144        Ok(out)
1145    }
1146
1147    pub(crate) fn insert_row_with_unique_noop(
1148        &self,
1149        tx: TxId,
1150        table: &str,
1151        values: HashMap<ColName, Value>,
1152    ) -> Result<InsertRowResult> {
1153        match self.check_row_constraints(tx, table, &values, None, true)? {
1154            RowConstraintCheck::Valid => self
1155                .relational
1156                .insert(tx, table, values)
1157                .map(InsertRowResult::Inserted),
1158            RowConstraintCheck::DuplicateUniqueNoOp => Ok(InsertRowResult::NoOp),
1159        }
1160    }
1161
1162    pub fn upsert_row(
1163        &self,
1164        tx: TxId,
1165        table: &str,
1166        conflict_col: &str,
1167        values: HashMap<ColName, Value>,
1168    ) -> Result<UpsertResult> {
1169        let snapshot = self.snapshot_for_read();
1170        let existing_row = values
1171            .get(conflict_col)
1172            .map(|conflict_value| {
1173                self.point_lookup_in_tx(tx, table, conflict_col, conflict_value, snapshot)
1174            })
1175            .transpose()?
1176            .flatten();
1177        let existing_row_id = existing_row.as_ref().map(|row| row.row_id);
1178        // Diff-respecting column-level IMMUTABLE check: reject any upsert whose
1179        // flagged-column value differs from the existing local value. Idempotent
1180        // replay (same-value) succeeds; new rows (no existing match) apply normally.
1181        if let (Some(existing), Some(meta)) = (existing_row.as_ref(), self.table_meta(table)) {
1182            for col_def in meta.columns.iter().filter(|c| c.immutable) {
1183                let Some(incoming) = values.get(&col_def.name) else {
1184                    continue;
1185                };
1186                let existing_value = existing.values.get(&col_def.name);
1187                if existing_value != Some(incoming) {
1188                    return Err(Error::ImmutableColumn {
1189                        table: table.to_string(),
1190                        column: col_def.name.clone(),
1191                    });
1192                }
1193            }
1194        }
1195        self.validate_row_constraints(tx, table, &values, existing_row_id)?;
1196
1197        let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1198        let meta = self.table_meta(table);
1199        let new_state = meta
1200            .as_ref()
1201            .and_then(|m| m.state_machine.as_ref())
1202            .and_then(|sm| values.get(&sm.column))
1203            .and_then(Value::as_text)
1204            .map(std::borrow::ToOwned::to_owned);
1205
1206        let result = self
1207            .relational
1208            .upsert(tx, table, conflict_col, values, snapshot)?;
1209
1210        if let (Some(uuid), Some(state), Some(_meta)) =
1211            (row_uuid, new_state.as_deref(), meta.as_ref())
1212            && matches!(result, UpsertResult::Updated)
1213        {
1214            self.propagate_state_change_if_needed(tx, table, Some(uuid), Some(state))?;
1215        }
1216
1217        Ok(result)
1218    }
1219
1220    fn validate_row_constraints(
1221        &self,
1222        tx: TxId,
1223        table: &str,
1224        values: &HashMap<ColName, Value>,
1225        skip_row_id: Option<RowId>,
1226    ) -> Result<()> {
1227        match self.check_row_constraints(tx, table, values, skip_row_id, false)? {
1228            RowConstraintCheck::Valid => Ok(()),
1229            RowConstraintCheck::DuplicateUniqueNoOp => {
1230                unreachable!("strict constraint validation cannot return no-op")
1231            }
1232        }
1233    }
1234
1235    fn check_row_constraints(
1236        &self,
1237        tx: TxId,
1238        table: &str,
1239        values: &HashMap<ColName, Value>,
1240        skip_row_id: Option<RowId>,
1241        allow_duplicate_unique_noop: bool,
1242    ) -> Result<RowConstraintCheck> {
1243        let metas = self.relational_store.table_meta.read();
1244        let meta = metas
1245            .get(table)
1246            .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1247        // Constraint probes MUST see the current committed watermark, not any
1248        // thread-local override. A PK/UNIQUE violation on a committed row must
1249        // be detected even if the caller pinned a pre-violation snapshot for
1250        // read visibility.
1251        let snapshot = self.snapshot();
1252
1253        // Scan the whole table only when no index covers any PK / UNIQUE
1254        // column we need to probe. Pulled lazily so the fast path skips it.
1255        let mut visible_rows_cache: Option<Vec<VersionedRow>> = None;
1256
1257        for column in meta.columns.iter().filter(|column| column.primary_key) {
1258            let Some(value) = values.get(&column.name) else {
1259                continue;
1260            };
1261            if *value == Value::Null {
1262                continue;
1263            }
1264            match self.probe_column_for_constraint(
1265                tx,
1266                table,
1267                &column.name,
1268                value,
1269                snapshot,
1270                skip_row_id,
1271            )? {
1272                ConstraintProbe::Match(_) => {
1273                    return Err(Error::UniqueViolation {
1274                        table: table.to_string(),
1275                        column: column.name.clone(),
1276                    });
1277                }
1278                ConstraintProbe::NoMatch => {}
1279                ConstraintProbe::NoIndex => {
1280                    // Fallback to full scan for PK columns without an index.
1281                    if visible_rows_cache.is_none() {
1282                        visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1283                            Some(tx),
1284                            table,
1285                            snapshot,
1286                            &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1287                        )?);
1288                    }
1289                    let rows = visible_rows_cache.as_deref().unwrap();
1290                    if rows
1291                        .iter()
1292                        .any(|existing| existing.values.get(&column.name) == Some(value))
1293                    {
1294                        return Err(Error::UniqueViolation {
1295                            table: table.to_string(),
1296                            column: column.name.clone(),
1297                        });
1298                    }
1299                }
1300            }
1301        }
1302
1303        let mut duplicate_unique_row_id = None;
1304
1305        for column in meta
1306            .columns
1307            .iter()
1308            .filter(|column| column.unique && !column.primary_key)
1309        {
1310            let Some(value) = values.get(&column.name) else {
1311                continue;
1312            };
1313            if *value == Value::Null {
1314                continue;
1315            }
1316            let matching_row_ids: Vec<RowId> = match self.probe_column_for_constraint(
1317                tx,
1318                table,
1319                &column.name,
1320                value,
1321                snapshot,
1322                skip_row_id,
1323            )? {
1324                ConstraintProbe::Match(rid) => vec![rid],
1325                ConstraintProbe::NoMatch => Vec::new(),
1326                ConstraintProbe::NoIndex => {
1327                    if visible_rows_cache.is_none() {
1328                        visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1329                            Some(tx),
1330                            table,
1331                            snapshot,
1332                            &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1333                        )?);
1334                    }
1335                    let rows = visible_rows_cache.as_deref().unwrap();
1336                    rows.iter()
1337                        .filter(|existing| existing.values.get(&column.name) == Some(value))
1338                        .map(|existing| existing.row_id)
1339                        .collect()
1340                }
1341            };
1342            self.merge_unique_conflict(
1343                table,
1344                &column.name,
1345                &matching_row_ids,
1346                allow_duplicate_unique_noop,
1347                &mut duplicate_unique_row_id,
1348            )?;
1349        }
1350
1351        for unique_constraint in &meta.unique_constraints {
1352            let mut candidate_values = Vec::with_capacity(unique_constraint.len());
1353            let mut has_null = false;
1354
1355            for column_name in unique_constraint {
1356                match values.get(column_name) {
1357                    Some(Value::Null) | None => {
1358                        has_null = true;
1359                        break;
1360                    }
1361                    Some(value) => candidate_values.push(value.clone()),
1362                }
1363            }
1364
1365            if has_null {
1366                continue;
1367            }
1368
1369            let matching_row_ids: Vec<RowId> = if let Some(rid) = self.probe_composite_unique(
1370                tx,
1371                table,
1372                unique_constraint,
1373                &candidate_values,
1374                snapshot,
1375                skip_row_id,
1376            )? {
1377                vec![rid]
1378            } else if self.index_covers_composite(table, unique_constraint) {
1379                Vec::new()
1380            } else {
1381                if visible_rows_cache.is_none() {
1382                    visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1383                        Some(tx),
1384                        table,
1385                        snapshot,
1386                        &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1387                    )?);
1388                }
1389                let rows = visible_rows_cache.as_deref().unwrap();
1390                rows.iter()
1391                    .filter(|existing| {
1392                        unique_constraint.iter().zip(candidate_values.iter()).all(
1393                            |(column_name, value)| existing.values.get(column_name) == Some(value),
1394                        )
1395                    })
1396                    .map(|existing| existing.row_id)
1397                    .collect()
1398            };
1399            // Report composite UNIQUE violations using the first column name,
1400            // matching the plan's single-column error convention.
1401            let column_label = unique_constraint.first().map(|s| s.as_str()).unwrap_or("");
1402            self.merge_unique_conflict(
1403                table,
1404                column_label,
1405                &matching_row_ids,
1406                allow_duplicate_unique_noop,
1407                &mut duplicate_unique_row_id,
1408            )?;
1409        }
1410
1411        if duplicate_unique_row_id.is_some() {
1412            Ok(RowConstraintCheck::DuplicateUniqueNoOp)
1413        } else {
1414            Ok(RowConstraintCheck::Valid)
1415        }
1416    }
1417
1418    fn merge_unique_conflict(
1419        &self,
1420        table: &str,
1421        column: &str,
1422        matching_row_ids: &[RowId],
1423        allow_duplicate_unique_noop: bool,
1424        duplicate_unique_row_id: &mut Option<RowId>,
1425    ) -> Result<()> {
1426        if matching_row_ids.is_empty() {
1427            return Ok(());
1428        }
1429
1430        if !allow_duplicate_unique_noop || matching_row_ids.len() != 1 {
1431            return Err(Error::UniqueViolation {
1432                table: table.to_string(),
1433                column: column.to_string(),
1434            });
1435        }
1436
1437        let matched_row_id = matching_row_ids[0];
1438
1439        if let Some(existing_row_id) = duplicate_unique_row_id {
1440            if *existing_row_id != matched_row_id {
1441                return Err(Error::UniqueViolation {
1442                    table: table.to_string(),
1443                    column: column.to_string(),
1444                });
1445            }
1446        } else {
1447            *duplicate_unique_row_id = Some(matched_row_id);
1448        }
1449
1450        Ok(())
1451    }
1452
1453    /// Returns true if `table` has any single-column index covering `column`.
1454    fn index_covers_column(&self, table: &str, column: &str) -> bool {
1455        let indexes = self.relational_store.indexes.read();
1456        indexes
1457            .iter()
1458            .any(|((t, _), idx)| t == table && idx.columns.len() == 1 && idx.columns[0].0 == column)
1459    }
1460
1461    /// Returns true if `table` has an index whose first-column prefix contains
1462    /// exactly the columns in `cols` (same order).
1463    fn index_covers_composite(&self, table: &str, cols: &[String]) -> bool {
1464        let indexes = self.relational_store.indexes.read();
1465        indexes.iter().any(|((t, _), idx)| {
1466            t == table
1467                && idx.columns.len() >= cols.len()
1468                && idx
1469                    .columns
1470                    .iter()
1471                    .zip(cols.iter())
1472                    .all(|((c, _), want)| c == want)
1473        })
1474    }
1475
1476    /// Look up `(table, column) = value` using a single-column index when one
1477    /// exists, layered with the tx's staged inserts and deletes.
1478    fn probe_column_for_constraint(
1479        &self,
1480        tx: TxId,
1481        table: &str,
1482        column: &str,
1483        value: &Value,
1484        snapshot: SnapshotId,
1485        skip_row_id: Option<RowId>,
1486    ) -> Result<ConstraintProbe> {
1487        use contextdb_core::{DirectedValue, TotalOrdAsc};
1488        let (tx_staged_deletes, staged_overlap) = self.tx_mgr.with_write_set(tx, |ws| {
1489            // Rows this tx has already staged for delete must not be treated as
1490            // obstructions by the constraint probe. The old index entry still
1491            // looks visible at the committed-watermark snapshot until commit.
1492            let deletes = if ws.relational_deletes.is_empty() {
1493                std::collections::HashSet::new()
1494            } else {
1495                ws.relational_deletes
1496                    .iter()
1497                    .filter(|(t, _, _)| t == table)
1498                    .map(|(_, row_id, _)| *row_id)
1499                    .collect()
1500            };
1501            let overlap = ws.relational_inserts.iter().find_map(|(t, row)| {
1502                if t != table {
1503                    return None;
1504                }
1505                if let Some(sid) = skip_row_id
1506                    && row.row_id == sid
1507                {
1508                    return None;
1509                }
1510                if row.values.get(column) == Some(value) {
1511                    Some(row.row_id)
1512                } else {
1513                    None
1514                }
1515            });
1516            (deletes, overlap)
1517        })?;
1518        let indexes = self.relational_store.indexes.read();
1519        // Auto constraint indexes have stable names. Try those directly before
1520        // falling back to user-declared single-column indexes.
1521        let table_key = table.to_string();
1522        let pk_key = (table_key.clone(), format!("__pk_{column}"));
1523        let unique_key = (table_key, format!("__unique_{column}"));
1524        let storage = indexes
1525            .get(&pk_key)
1526            .or_else(|| indexes.get(&unique_key))
1527            .or_else(|| {
1528                indexes.iter().find_map(|((t, _), idx)| {
1529                    if t == table && idx.columns.len() == 1 && idx.columns[0].0 == column {
1530                        Some(idx)
1531                    } else {
1532                        None
1533                    }
1534                })
1535            });
1536        let Some(storage) = storage else {
1537            return Ok(ConstraintProbe::NoIndex);
1538        };
1539        let key = vec![DirectedValue::Asc(TotalOrdAsc(value.clone()))];
1540        if let Some(entries) = storage.tree.get(&key) {
1541            for entry in entries {
1542                if let Some(sid) = skip_row_id
1543                    && entry.row_id == sid
1544                {
1545                    continue;
1546                }
1547                if tx_staged_deletes.contains(&entry.row_id) {
1548                    continue;
1549                }
1550                if entry.visible_at(snapshot) {
1551                    return Ok(ConstraintProbe::Match(entry.row_id));
1552                }
1553            }
1554        }
1555        drop(indexes);
1556        Ok(match staged_overlap {
1557            Some(row_id) => ConstraintProbe::Match(row_id),
1558            None => ConstraintProbe::NoMatch,
1559        })
1560    }
1561
1562    /// Probe a composite UNIQUE (a, b, ...) using the first index whose
1563    /// leading prefix matches `cols`. The probe walks the range for the full
1564    /// key prefix.
1565    fn probe_composite_unique(
1566        &self,
1567        tx: TxId,
1568        table: &str,
1569        cols: &[String],
1570        values: &[Value],
1571        snapshot: SnapshotId,
1572        skip_row_id: Option<RowId>,
1573    ) -> Result<Option<RowId>> {
1574        use contextdb_core::{DirectedValue, TotalOrdAsc};
1575        if cols.is_empty() || values.is_empty() || cols.len() != values.len() {
1576            return Ok(None);
1577        }
1578        // Rows this tx has already staged for delete are not obstructions.
1579        let tx_staged_deletes: std::collections::HashSet<RowId> =
1580            self.tx_mgr.with_write_set(tx, |ws| {
1581                ws.relational_deletes
1582                    .iter()
1583                    .filter(|(t, _, _)| t == table)
1584                    .map(|(_, row_id, _)| *row_id)
1585                    .collect()
1586            })?;
1587        let indexes = self.relational_store.indexes.read();
1588        let storage_entry = indexes.iter().find(|((t, _), idx)| {
1589            t == table
1590                && idx.columns.len() >= cols.len()
1591                && idx
1592                    .columns
1593                    .iter()
1594                    .zip(cols.iter())
1595                    .all(|((c, _), w)| c == w)
1596        });
1597        let (_, storage) = match storage_entry {
1598            Some(e) => e,
1599            None => return Ok(None),
1600        };
1601        // Full-prefix walk: range from (val1,...,valN, -inf) to (val1,...,valN, +inf).
1602        // For simplicity, iterate and filter by prefix match.
1603        for (key, entries) in storage.tree.iter() {
1604            if key.len() < cols.len() {
1605                continue;
1606            }
1607            let prefix_match = values.iter().enumerate().all(|(i, v)| match &key[i] {
1608                DirectedValue::Asc(TotalOrdAsc(k)) => k == v,
1609                DirectedValue::Desc(contextdb_core::TotalOrdDesc(k)) => k == v,
1610            });
1611            if !prefix_match {
1612                continue;
1613            }
1614            for entry in entries {
1615                if let Some(sid) = skip_row_id
1616                    && entry.row_id == sid
1617                {
1618                    continue;
1619                }
1620                if tx_staged_deletes.contains(&entry.row_id) {
1621                    continue;
1622                }
1623                if entry.visible_at(snapshot) {
1624                    return Ok(Some(entry.row_id));
1625                }
1626            }
1627        }
1628        drop(indexes);
1629        // Tx-staged inserts.
1630        let overlap = self.tx_mgr.with_write_set(tx, |ws| {
1631            for (t, row) in &ws.relational_inserts {
1632                if t != table {
1633                    continue;
1634                }
1635                if let Some(sid) = skip_row_id
1636                    && row.row_id == sid
1637                {
1638                    continue;
1639                }
1640                let matches = cols
1641                    .iter()
1642                    .zip(values.iter())
1643                    .all(|(c, v)| row.values.get(c) == Some(v));
1644                if matches {
1645                    return Some(row.row_id);
1646                }
1647            }
1648            None
1649        })?;
1650        Ok(overlap)
1651    }
1652
1653    fn validate_foreign_keys_in_tx(&self, tx: TxId) -> Result<()> {
1654        // Constraint probe — FK existence checks must see the committed
1655        // watermark, not any read-side snapshot override.
1656        let snapshot = self.snapshot();
1657        let relational_inserts = self
1658            .tx_mgr
1659            .with_write_set(tx, |ws| ws.relational_inserts.clone())?;
1660
1661        for (table, row) in relational_inserts {
1662            let meta = self
1663                .table_meta(&table)
1664                .ok_or_else(|| Error::TableNotFound(table.clone()))?;
1665            for column in &meta.columns {
1666                let Some(reference) = &column.references else {
1667                    continue;
1668                };
1669                let Some(value) = row.values.get(&column.name) else {
1670                    continue;
1671                };
1672                if *value == Value::Null {
1673                    continue;
1674                }
1675                if self
1676                    .point_lookup_in_tx(tx, &reference.table, &reference.column, value, snapshot)?
1677                    .is_none()
1678                {
1679                    return Err(Error::ForeignKeyViolation {
1680                        table: table.clone(),
1681                        column: column.name.clone(),
1682                        ref_table: reference.table.clone(),
1683                    });
1684                }
1685            }
1686        }
1687
1688        Ok(())
1689    }
1690
1691    pub(crate) fn propagate_state_change_if_needed(
1692        &self,
1693        tx: TxId,
1694        table: &str,
1695        row_uuid: Option<uuid::Uuid>,
1696        new_state: Option<&str>,
1697    ) -> Result<()> {
1698        if let (Some(uuid), Some(state)) = (row_uuid, new_state) {
1699            let already_propagating = self
1700                .tx_mgr
1701                .with_write_set(tx, |ws| ws.propagation_in_progress)?;
1702            if !already_propagating {
1703                self.tx_mgr
1704                    .with_write_set(tx, |ws| ws.propagation_in_progress = true)?;
1705                let propagate_result = self.propagate(tx, table, uuid, state);
1706                self.tx_mgr
1707                    .with_write_set(tx, |ws| ws.propagation_in_progress = false)?;
1708                propagate_result?;
1709            }
1710        }
1711
1712        Ok(())
1713    }
1714
1715    fn propagate(
1716        &self,
1717        tx: TxId,
1718        table: &str,
1719        row_uuid: uuid::Uuid,
1720        new_state: &str,
1721    ) -> Result<()> {
1722        let snapshot = self.snapshot_for_read();
1723        let metas = self.relational_store().table_meta.read().clone();
1724        let mut queue: VecDeque<PropagationQueueEntry> = VecDeque::new();
1725        let mut visited: HashSet<(String, uuid::Uuid)> = HashSet::new();
1726        let mut abort_violation: Option<Error> = None;
1727        let ctx = PropagationContext {
1728            tx,
1729            snapshot,
1730            metas: &metas,
1731        };
1732        let root = PropagationSource {
1733            table,
1734            uuid: row_uuid,
1735            state: new_state,
1736            depth: 0,
1737        };
1738
1739        self.enqueue_fk_children(&ctx, &mut queue, root);
1740        self.enqueue_edge_children(&ctx, &mut queue, root)?;
1741        self.apply_vector_exclusions(&ctx, root)?;
1742
1743        while let Some(entry) = queue.pop_front() {
1744            if !visited.insert((entry.table.clone(), entry.uuid)) {
1745                continue;
1746            }
1747
1748            let Some(meta) = metas.get(&entry.table) else {
1749                continue;
1750            };
1751
1752            let Some(state_machine) = &meta.state_machine else {
1753                let msg = format!(
1754                    "warning: propagation target table {} has no state machine",
1755                    entry.table
1756                );
1757                eprintln!("{msg}");
1758                if entry.abort_on_failure && abort_violation.is_none() {
1759                    abort_violation = Some(Error::PropagationAborted {
1760                        table: entry.table.clone(),
1761                        column: String::new(),
1762                        from: String::new(),
1763                        to: entry.target_state.clone(),
1764                    });
1765                }
1766                continue;
1767            };
1768
1769            let state_column = state_machine.column.clone();
1770            let Some(existing) = self.relational.point_lookup_with_tx(
1771                Some(tx),
1772                &entry.table,
1773                "id",
1774                &Value::Uuid(entry.uuid),
1775                snapshot,
1776            )?
1777            else {
1778                continue;
1779            };
1780
1781            let from_state = existing
1782                .values
1783                .get(&state_column)
1784                .and_then(Value::as_text)
1785                .unwrap_or("")
1786                .to_string();
1787
1788            let mut next_values = existing.values.clone();
1789            next_values.insert(
1790                state_column.clone(),
1791                Value::Text(entry.target_state.clone()),
1792            );
1793
1794            let upsert_outcome =
1795                self.relational
1796                    .upsert(tx, &entry.table, "id", next_values, snapshot);
1797
1798            let reached_state = match upsert_outcome {
1799                Ok(UpsertResult::Updated) => entry.target_state.as_str(),
1800                Ok(UpsertResult::NoOp) | Ok(UpsertResult::Inserted) => continue,
1801                Err(Error::InvalidStateTransition(_)) => {
1802                    eprintln!(
1803                        "warning: skipped invalid propagated transition {}.{} {} -> {}",
1804                        entry.table, state_column, from_state, entry.target_state
1805                    );
1806                    if entry.abort_on_failure && abort_violation.is_none() {
1807                        abort_violation = Some(Error::PropagationAborted {
1808                            table: entry.table.clone(),
1809                            column: state_column.clone(),
1810                            from: from_state,
1811                            to: entry.target_state.clone(),
1812                        });
1813                    }
1814                    continue;
1815                }
1816                Err(err) => return Err(err),
1817            };
1818
1819            self.enqueue_edge_children(
1820                &ctx,
1821                &mut queue,
1822                PropagationSource {
1823                    table: &entry.table,
1824                    uuid: entry.uuid,
1825                    state: reached_state,
1826                    depth: entry.depth,
1827                },
1828            )?;
1829            self.apply_vector_exclusions(
1830                &ctx,
1831                PropagationSource {
1832                    table: &entry.table,
1833                    uuid: entry.uuid,
1834                    state: reached_state,
1835                    depth: entry.depth,
1836                },
1837            )?;
1838
1839            self.enqueue_fk_children(
1840                &ctx,
1841                &mut queue,
1842                PropagationSource {
1843                    table: &entry.table,
1844                    uuid: entry.uuid,
1845                    state: reached_state,
1846                    depth: entry.depth,
1847                },
1848            );
1849        }
1850
1851        if let Some(err) = abort_violation {
1852            return Err(err);
1853        }
1854
1855        Ok(())
1856    }
1857
1858    fn enqueue_fk_children(
1859        &self,
1860        ctx: &PropagationContext<'_>,
1861        queue: &mut VecDeque<PropagationQueueEntry>,
1862        source: PropagationSource<'_>,
1863    ) {
1864        for (owner_table, owner_meta) in ctx.metas {
1865            for rule in &owner_meta.propagation_rules {
1866                let PropagationRule::ForeignKey {
1867                    fk_column,
1868                    referenced_table,
1869                    trigger_state,
1870                    target_state,
1871                    max_depth,
1872                    abort_on_failure,
1873                    ..
1874                } = rule
1875                else {
1876                    continue;
1877                };
1878
1879                if referenced_table != source.table || trigger_state != source.state {
1880                    continue;
1881                }
1882
1883                if source.depth >= *max_depth {
1884                    continue;
1885                }
1886
1887                let rows = match self.relational.scan_filter_with_tx(
1888                    Some(ctx.tx),
1889                    owner_table,
1890                    ctx.snapshot,
1891                    &|row| row.values.get(fk_column) == Some(&Value::Uuid(source.uuid)),
1892                ) {
1893                    Ok(rows) => rows,
1894                    Err(err) => {
1895                        eprintln!(
1896                            "warning: propagation scan failed for {owner_table}.{fk_column}: {err}"
1897                        );
1898                        continue;
1899                    }
1900                };
1901
1902                for row in rows {
1903                    if let Some(id) = row.values.get("id").and_then(Value::as_uuid).copied() {
1904                        queue.push_back(PropagationQueueEntry {
1905                            table: owner_table.clone(),
1906                            uuid: id,
1907                            target_state: target_state.clone(),
1908                            depth: source.depth + 1,
1909                            abort_on_failure: *abort_on_failure,
1910                        });
1911                    }
1912                }
1913            }
1914        }
1915    }
1916
1917    fn enqueue_edge_children(
1918        &self,
1919        ctx: &PropagationContext<'_>,
1920        queue: &mut VecDeque<PropagationQueueEntry>,
1921        source: PropagationSource<'_>,
1922    ) -> Result<()> {
1923        let Some(meta) = ctx.metas.get(source.table) else {
1924            return Ok(());
1925        };
1926
1927        for rule in &meta.propagation_rules {
1928            let PropagationRule::Edge {
1929                edge_type,
1930                direction,
1931                trigger_state,
1932                target_state,
1933                max_depth,
1934                abort_on_failure,
1935            } = rule
1936            else {
1937                continue;
1938            };
1939
1940            if trigger_state != source.state || source.depth >= *max_depth {
1941                continue;
1942            }
1943
1944            let bfs = self.query_bfs(
1945                source.uuid,
1946                Some(std::slice::from_ref(edge_type)),
1947                *direction,
1948                1,
1949                ctx.snapshot,
1950            )?;
1951
1952            for node in bfs.nodes {
1953                if self
1954                    .relational
1955                    .point_lookup_with_tx(
1956                        Some(ctx.tx),
1957                        source.table,
1958                        "id",
1959                        &Value::Uuid(node.id),
1960                        ctx.snapshot,
1961                    )?
1962                    .is_some()
1963                {
1964                    queue.push_back(PropagationQueueEntry {
1965                        table: source.table.to_string(),
1966                        uuid: node.id,
1967                        target_state: target_state.clone(),
1968                        depth: source.depth + 1,
1969                        abort_on_failure: *abort_on_failure,
1970                    });
1971                }
1972            }
1973        }
1974
1975        Ok(())
1976    }
1977
1978    fn apply_vector_exclusions(
1979        &self,
1980        ctx: &PropagationContext<'_>,
1981        source: PropagationSource<'_>,
1982    ) -> Result<()> {
1983        let Some(meta) = ctx.metas.get(source.table) else {
1984            return Ok(());
1985        };
1986
1987        for rule in &meta.propagation_rules {
1988            let PropagationRule::VectorExclusion { trigger_state } = rule else {
1989                continue;
1990            };
1991            if trigger_state != source.state {
1992                continue;
1993            }
1994            for row_id in self.logical_row_ids_for_uuid(ctx.tx, source.table, source.uuid) {
1995                let index = self
1996                    .table_meta(source.table)
1997                    .and_then(|meta| {
1998                        meta.columns
1999                            .iter()
2000                            .find(|column| {
2001                                matches!(column.column_type, contextdb_core::ColumnType::Vector(_))
2002                            })
2003                            .map(|column| VectorIndexRef::new(source.table, column.name.clone()))
2004                    })
2005                    .unwrap_or_default();
2006                self.delete_vector(ctx.tx, index, row_id)?;
2007            }
2008        }
2009
2010        Ok(())
2011    }
2012
2013    pub fn delete_row(&self, tx: TxId, table: &str, row_id: RowId) -> Result<()> {
2014        self.relational.delete(tx, table, row_id)
2015    }
2016
2017    pub fn scan(&self, table: &str, snapshot: SnapshotId) -> Result<Vec<VersionedRow>> {
2018        self.relational.scan(table, snapshot)
2019    }
2020
2021    /// Scan a table with visibility over the tx's in-flight write-set layered on
2022    /// top of the committed snapshot.
2023    pub(crate) fn scan_in_tx(
2024        &self,
2025        tx: TxId,
2026        table: &str,
2027        snapshot: SnapshotId,
2028    ) -> Result<Vec<VersionedRow>> {
2029        self.relational.scan_with_tx(Some(tx), table, snapshot)
2030    }
2031
2032    /// Compute the in-tx overlay (deleted row_ids + matching staged inserts)
2033    /// for an index-driven scan of `table` matching `shape` on `column`.
2034    /// Internal helper for the IndexScan executor arm.
2035    pub(crate) fn index_scan_tx_overlay(
2036        &self,
2037        tx: TxId,
2038        table: &str,
2039        column: &str,
2040        shape: &crate::executor::IndexPredicateShape,
2041    ) -> Result<IndexScanTxOverlay> {
2042        use crate::executor::{IndexPredicateShape, range_includes};
2043        let mut overlay = IndexScanTxOverlay::default();
2044        self.tx_mgr.with_write_set(tx, |ws| {
2045            for (t, _row_id, _) in &ws.relational_deletes {
2046                if t == table {
2047                    overlay.deleted_row_ids.insert(*_row_id);
2048                }
2049            }
2050            for (t, row) in &ws.relational_inserts {
2051                if t != table {
2052                    continue;
2053                }
2054                let v = row.values.get(column).cloned().unwrap_or(Value::Null);
2055                let include = match shape {
2056                    IndexPredicateShape::Equality(target) => v == *target,
2057                    IndexPredicateShape::NotEqual(target) => v != *target,
2058                    IndexPredicateShape::InList(list) => list.contains(&v),
2059                    IndexPredicateShape::Range { lower, upper } => range_includes(&v, lower, upper),
2060                    IndexPredicateShape::IsNull => v == Value::Null,
2061                    IndexPredicateShape::IsNotNull => v != Value::Null,
2062                };
2063                if include {
2064                    overlay.matching_inserts.push(row.clone());
2065                }
2066            }
2067        })?;
2068        Ok(overlay)
2069    }
2070
2071    pub fn scan_filter(
2072        &self,
2073        table: &str,
2074        snapshot: SnapshotId,
2075        predicate: &dyn Fn(&VersionedRow) -> bool,
2076    ) -> Result<Vec<VersionedRow>> {
2077        self.relational.scan_filter(table, snapshot, predicate)
2078    }
2079
2080    pub fn point_lookup(
2081        &self,
2082        table: &str,
2083        col: &str,
2084        value: &Value,
2085        snapshot: SnapshotId,
2086    ) -> Result<Option<VersionedRow>> {
2087        self.relational.point_lookup(table, col, value, snapshot)
2088    }
2089
2090    pub(crate) fn point_lookup_in_tx(
2091        &self,
2092        tx: TxId,
2093        table: &str,
2094        col: &str,
2095        value: &Value,
2096        snapshot: SnapshotId,
2097    ) -> Result<Option<VersionedRow>> {
2098        self.relational
2099            .point_lookup_with_tx(Some(tx), table, col, value, snapshot)
2100    }
2101
2102    pub(crate) fn logical_row_ids_for_uuid(
2103        &self,
2104        tx: TxId,
2105        table: &str,
2106        uuid: uuid::Uuid,
2107    ) -> Vec<RowId> {
2108        let mut row_ids = HashSet::new();
2109
2110        if let Some(rows) = self.relational_store.tables.read().get(table) {
2111            for row in rows {
2112                if row.values.get("id") == Some(&Value::Uuid(uuid)) {
2113                    row_ids.insert(row.row_id);
2114                }
2115            }
2116        }
2117
2118        let _ = self.tx_mgr.with_write_set(tx, |ws| {
2119            for (insert_table, row) in &ws.relational_inserts {
2120                if insert_table == table && row.values.get("id") == Some(&Value::Uuid(uuid)) {
2121                    row_ids.insert(row.row_id);
2122                }
2123            }
2124        });
2125
2126        row_ids.into_iter().collect()
2127    }
2128
2129    pub fn insert_edge(
2130        &self,
2131        tx: TxId,
2132        source: NodeId,
2133        target: NodeId,
2134        edge_type: EdgeType,
2135        properties: HashMap<String, Value>,
2136    ) -> Result<bool> {
2137        let bytes = estimate_edge_bytes(source, target, &edge_type, &properties);
2138        self.accountant.try_allocate_for(
2139            bytes,
2140            "graph_insert",
2141            "insert_edge",
2142            "Reduce edge fan-out or raise MEMORY_LIMIT before inserting more graph edges.",
2143        )?;
2144
2145        match self
2146            .graph
2147            .insert_edge(tx, source, target, edge_type, properties)
2148        {
2149            Ok(inserted) => {
2150                if !inserted {
2151                    self.accountant.release(bytes);
2152                }
2153                Ok(inserted)
2154            }
2155            Err(err) => {
2156                self.accountant.release(bytes);
2157                Err(err)
2158            }
2159        }
2160    }
2161
2162    pub fn delete_edge(
2163        &self,
2164        tx: TxId,
2165        source: NodeId,
2166        target: NodeId,
2167        edge_type: &str,
2168    ) -> Result<()> {
2169        self.graph.delete_edge(tx, source, target, edge_type)
2170    }
2171
2172    pub fn query_bfs(
2173        &self,
2174        start: NodeId,
2175        edge_types: Option<&[EdgeType]>,
2176        direction: Direction,
2177        max_depth: u32,
2178        snapshot: SnapshotId,
2179    ) -> Result<TraversalResult> {
2180        self.graph
2181            .bfs(start, edge_types, direction, 1, max_depth, snapshot)
2182    }
2183
2184    pub fn edge_count(
2185        &self,
2186        source: NodeId,
2187        edge_type: &str,
2188        snapshot: SnapshotId,
2189    ) -> Result<usize> {
2190        Ok(self.graph.edge_count(source, edge_type, snapshot))
2191    }
2192
2193    pub fn get_edge_properties(
2194        &self,
2195        source: NodeId,
2196        target: NodeId,
2197        edge_type: &str,
2198        snapshot: SnapshotId,
2199    ) -> Result<Option<HashMap<String, Value>>> {
2200        let props = self
2201            .graph_store
2202            .forward_adj
2203            .read()
2204            .get(&source)
2205            .and_then(|entries| {
2206                entries
2207                    .iter()
2208                    .rev()
2209                    .find(|entry| {
2210                        entry.target == target
2211                            && entry.edge_type == edge_type
2212                            && entry.visible_at(snapshot)
2213                    })
2214                    .map(|entry| entry.properties.clone())
2215            });
2216        Ok(props)
2217    }
2218
2219    pub fn insert_vector(
2220        &self,
2221        tx: TxId,
2222        index: VectorIndexRef,
2223        row_id: RowId,
2224        vector: Vec<f32>,
2225    ) -> Result<()> {
2226        self.vector_store.state(&index)?;
2227        if let Some(expected) = self.pending_vector_dimension(tx, &index)?
2228            && expected != vector.len()
2229        {
2230            return Err(self.direct_vector_dimension_error(&index, expected, vector.len()));
2231        }
2232        self.insert_vector_strict(tx, index.clone(), row_id, vector)
2233            .map_err(|err| match err {
2234                Error::VectorIndexDimensionMismatch {
2235                    expected, actual, ..
2236                } => self.direct_vector_dimension_error(&index, expected, actual),
2237                other => other,
2238            })
2239    }
2240
2241    pub(crate) fn insert_vector_strict(
2242        &self,
2243        tx: TxId,
2244        index: VectorIndexRef,
2245        row_id: RowId,
2246        vector: Vec<f32>,
2247    ) -> Result<()> {
2248        self.vector_store.validate_vector(&index, vector.len())?;
2249        let bytes = self.vector_insert_accounted_bytes(&index, vector.len());
2250        self.accountant.try_allocate_for(
2251            bytes,
2252            "insert",
2253            &format!("vector_insert@{}.{}", index.table, index.column),
2254            "Reduce vector dimensionality, insert fewer rows, or raise MEMORY_LIMIT.",
2255        )?;
2256        self.vector
2257            .insert_vector(tx, index, row_id, vector)
2258            .inspect_err(|_| self.accountant.release(bytes))
2259    }
2260
2261    pub fn delete_vector(&self, tx: TxId, index: VectorIndexRef, row_id: RowId) -> Result<()> {
2262        self.vector.delete_vector(tx, index, row_id)
2263    }
2264
2265    pub(crate) fn move_vector(
2266        &self,
2267        tx: TxId,
2268        index: VectorIndexRef,
2269        old_row_id: RowId,
2270        new_row_id: RowId,
2271    ) -> Result<()> {
2272        self.vector_store.state(&index)?;
2273        self.tx_mgr.with_write_set(tx, |ws| {
2274            ws.vector_moves.push((index, old_row_id, new_row_id, tx));
2275        })?;
2276        Ok(())
2277    }
2278
2279    pub fn query_vector(
2280        &self,
2281        index: VectorIndexRef,
2282        query: &[f32],
2283        k: usize,
2284        candidates: Option<&RoaringTreemap>,
2285        snapshot: SnapshotId,
2286    ) -> Result<Vec<(RowId, f32)>> {
2287        if self.vector_store.try_state(&index).is_none() {
2288            return Err(Error::UnknownVectorIndex { index });
2289        }
2290        self.vector.search(index, query, k, candidates, snapshot)
2291    }
2292
2293    pub fn semantic_search(&self, query: SemanticQuery) -> Result<Vec<SearchResult>> {
2294        self.semantic_search_with_candidates(query, None)
2295    }
2296
2297    pub(crate) fn semantic_search_with_candidates(
2298        &self,
2299        query: SemanticQuery,
2300        candidates: Option<RoaringTreemap>,
2301    ) -> Result<Vec<SearchResult>> {
2302        let index = VectorIndexRef::new(query.table.clone(), query.vector_column.clone());
2303        let snapshot = self.snapshot_for_read();
2304        let meta = self
2305            .table_meta(&query.table)
2306            .ok_or_else(|| Error::TableNotFound(query.table.clone()))?;
2307        let vector_column = meta
2308            .columns
2309            .iter()
2310            .find(|column| column.name == query.vector_column)
2311            .ok_or_else(|| Error::UnknownVectorIndex {
2312                index: index.clone(),
2313            })?;
2314
2315        let mut candidate_bitmap = candidates;
2316        if let Some(where_clause) = &query.where_clause {
2317            let where_bitmap =
2318                self.semantic_where_candidate_bitmap(&query.table, where_clause, snapshot)?;
2319            candidate_bitmap = Some(match candidate_bitmap {
2320                Some(mut existing) => {
2321                    existing &= where_bitmap;
2322                    existing
2323                }
2324                None => where_bitmap,
2325            });
2326        }
2327
2328        let Some(sort_key) = query.sort_key.as_deref() else {
2329            let raw_k = if query.min_similarity.is_some() || candidate_bitmap.is_some() {
2330                self.vector_entry_count(&index).max(query.limit)
2331            } else {
2332                query.limit
2333            };
2334            let mut rows = self.query_vector_strict(
2335                index.clone(),
2336                &query.query,
2337                raw_k,
2338                candidate_bitmap.as_ref(),
2339                snapshot,
2340            )?;
2341            if let Some(min_similarity) = query.min_similarity {
2342                rows.retain(|(_, score)| *score >= min_similarity);
2343                rows.truncate(query.limit);
2344            }
2345            return rows
2346                .into_iter()
2347                .map(|(row_id, vector_score)| {
2348                    let anchor = self.find_row_by_id_at(&query.table, row_id, snapshot)?;
2349                    let values = self.search_result_values(&index, row_id, snapshot, anchor.values);
2350                    Ok(SearchResult {
2351                        row_id,
2352                        values,
2353                        vector_score,
2354                        rank: vector_score,
2355                    })
2356                })
2357                .collect();
2358        };
2359
2360        let Some(policy) = vector_column.rank_policy.as_ref() else {
2361            return Err(Error::RankPolicyNotFound {
2362                index: rank_index_name(&query.table, &query.vector_column),
2363                sort_key: sort_key.to_string(),
2364            });
2365        };
2366        if policy.sort_key != sort_key {
2367            return Err(Error::RankPolicyNotFound {
2368                index: rank_index_name(&query.table, &query.vector_column),
2369                sort_key: sort_key.to_string(),
2370            });
2371        }
2372        let formula = self.rank_formula(&query.table, &query.vector_column)?;
2373        let entry_count = self.vector_entry_count(&index);
2374        let internal_k = self.rank_policy_candidate_k(entry_count, query.limit);
2375        let mut raw = self.query_vector_strict(
2376            index.clone(),
2377            &query.query,
2378            internal_k,
2379            candidate_bitmap.as_ref(),
2380            snapshot,
2381        )?;
2382        if let Some(min_similarity) = query.min_similarity {
2383            raw.retain(|(_, score)| *score >= min_similarity);
2384        }
2385
2386        let mut ranked = Vec::with_capacity(raw.len());
2387        for (row_id, vector_score) in raw {
2388            let anchor = self.find_row_by_id_at(&query.table, row_id, snapshot)?;
2389            let joined = self.joined_row_for_rank_policy(policy, &anchor, snapshot)?;
2390            self.rank_policy_eval_count.fetch_add(1, Ordering::SeqCst);
2391            let eval = formula.eval_with_resolver(vector_score, |column| {
2392                self.resolve_rank_formula_column(policy, &anchor, joined.as_ref(), column)
2393            });
2394            let rank = match eval {
2395                Ok(Some(rank)) => rank,
2396                Ok(None) => f32::NAN,
2397                Err(err) => {
2398                    let error_row_id =
2399                        if matches!(err, FormulaEvalError::CorruptJoinedColumn { .. }) {
2400                            joined.as_ref().map(|row| row.row_id).unwrap_or(row_id)
2401                        } else {
2402                            row_id
2403                        };
2404                    self.warn_rank_eval_error(
2405                        &query.table,
2406                        &query.vector_column,
2407                        error_row_id,
2408                        &err,
2409                    );
2410                    continue;
2411                }
2412            };
2413            let values = self.search_result_values(
2414                &index,
2415                row_id,
2416                snapshot,
2417                merged_rank_values(&anchor, joined.as_ref()),
2418            );
2419            ranked.push(SearchResult {
2420                row_id,
2421                values,
2422                vector_score,
2423                rank,
2424            });
2425        }
2426        ranked.sort_by(compare_ranked_results);
2427        ranked.truncate(query.limit);
2428        Ok(ranked)
2429    }
2430
2431    #[doc(hidden)]
2432    pub fn __rank_policy_eval_count(&self) -> u64 {
2433        self.rank_policy_eval_count.load(Ordering::SeqCst)
2434    }
2435
2436    #[doc(hidden)]
2437    pub fn __reset_rank_policy_eval_count(&self) {
2438        self.rank_policy_eval_count.store(0, Ordering::SeqCst);
2439    }
2440
2441    #[doc(hidden)]
2442    pub fn __rank_policy_formula_parse_count(&self) -> u64 {
2443        self.rank_policy_formula_parse_count.load(Ordering::SeqCst)
2444    }
2445
2446    #[doc(hidden)]
2447    pub fn __inject_raw_joined_row_value_for_test(
2448        &self,
2449        table: &str,
2450        row_id: RowId,
2451        column: &str,
2452        _raw_bytes: Vec<u8>,
2453    ) -> Result<()> {
2454        self.corrupt_joined_values
2455            .write()
2456            .insert((table.to_string(), row_id, column.to_string()));
2457        Ok(())
2458    }
2459
2460    pub(crate) fn query_vector_strict(
2461        &self,
2462        index: VectorIndexRef,
2463        query: &[f32],
2464        k: usize,
2465        candidates: Option<&RoaringTreemap>,
2466        snapshot: SnapshotId,
2467    ) -> Result<Vec<(RowId, f32)>> {
2468        self.vector_store.validate_vector(&index, query.len())?;
2469        self.vector.search(index, query, k, candidates, snapshot)
2470    }
2471
2472    pub(crate) fn register_rank_formula(
2473        &self,
2474        table: &str,
2475        column: &str,
2476        formula: Arc<RankFormula>,
2477    ) {
2478        let mut cache = self.rank_formula_cache.write();
2479        cache.insert((table.to_string(), column.to_string()), formula);
2480        self.rank_policy_formula_parse_count
2481            .store(cache.len() as u64, Ordering::SeqCst);
2482    }
2483
2484    pub(crate) fn remove_rank_formula(&self, table: &str, column: &str) {
2485        let mut cache = self.rank_formula_cache.write();
2486        cache.remove(&(table.to_string(), column.to_string()));
2487        self.rank_policy_formula_parse_count
2488            .store(cache.len() as u64, Ordering::SeqCst);
2489    }
2490
2491    pub(crate) fn remove_rank_formulas_for_table(&self, table: &str) {
2492        let mut cache = self.rank_formula_cache.write();
2493        cache.retain(|(policy_table, _), _| policy_table != table);
2494        self.rank_policy_formula_parse_count
2495            .store(cache.len() as u64, Ordering::SeqCst);
2496    }
2497
2498    fn rebuild_rank_formula_cache_from_meta(
2499        &self,
2500        metas: &HashMap<String, TableMeta>,
2501    ) -> Result<()> {
2502        let mut cache = self.rank_formula_cache.write();
2503        cache.clear();
2504        for (table, meta) in metas {
2505            for column in &meta.columns {
2506                if let Some(policy) = &column.rank_policy {
2507                    let formula = RankFormula::compile_for_index(
2508                        &rank_index_name(table, &column.name),
2509                        &policy.formula,
2510                    )?;
2511                    cache.insert((table.clone(), column.name.clone()), Arc::new(formula));
2512                }
2513            }
2514        }
2515        self.rank_policy_formula_parse_count
2516            .store(cache.len() as u64, Ordering::SeqCst);
2517        Ok(())
2518    }
2519
2520    fn rank_formula(&self, table: &str, column: &str) -> Result<Arc<RankFormula>> {
2521        self.rank_formula_cache
2522            .read()
2523            .get(&(table.to_string(), column.to_string()))
2524            .cloned()
2525            .ok_or_else(|| {
2526                Error::Other(format!(
2527                    "rank policy formula cache missing for {}",
2528                    rank_index_name(table, column)
2529                ))
2530            })
2531    }
2532
2533    fn vector_entry_count(&self, index: &VectorIndexRef) -> usize {
2534        self.vector_store
2535            .try_state(index)
2536            .map(|state| state.entry_count())
2537            .unwrap_or(0)
2538    }
2539
2540    fn rank_policy_candidate_k(&self, entry_count: usize, limit: usize) -> usize {
2541        if entry_count == 0 || limit == 0 {
2542            return limit;
2543        }
2544        if entry_count < 1000 {
2545            return entry_count;
2546        }
2547        entry_count
2548            .saturating_sub(1)
2549            .min(limit.saturating_mul(30).max(1500))
2550            .max(limit)
2551    }
2552
2553    fn semantic_where_candidate_bitmap(
2554        &self,
2555        table: &str,
2556        where_clause: &str,
2557        snapshot: SnapshotId,
2558    ) -> Result<RoaringTreemap> {
2559        let sql = format!("SELECT * FROM {table} WHERE {where_clause}");
2560        let stmt = contextdb_parser::parse(&sql)?;
2561        let expr = match stmt {
2562            Statement::Select(select) => select
2563                .body
2564                .where_clause
2565                .ok_or_else(|| Error::ParseError("semantic WHERE missing expression".into()))?,
2566            _ => return Err(Error::ParseError("semantic WHERE parse failed".into())),
2567        };
2568        let mut bitmap = RoaringTreemap::new();
2569        for row in self.scan(table, snapshot)? {
2570            if crate::executor::row_matches(&row, &expr, &HashMap::new())? {
2571                bitmap.insert(row.row_id.0);
2572            }
2573        }
2574        Ok(bitmap)
2575    }
2576
2577    fn find_row_by_id_at(
2578        &self,
2579        table: &str,
2580        row_id: RowId,
2581        snapshot: SnapshotId,
2582    ) -> Result<VersionedRow> {
2583        self.relational_store
2584            .row_by_id(table, row_id, snapshot)
2585            .ok_or_else(|| Error::NotFound(format!("row {row_id} in table {table}")))
2586    }
2587
2588    fn joined_row_for_rank_policy(
2589        &self,
2590        policy: &RankPolicy,
2591        anchor: &VersionedRow,
2592        snapshot: SnapshotId,
2593    ) -> Result<Option<VersionedRow>> {
2594        if policy.anchor_column.is_empty() {
2595            return Err(Error::Other(format!(
2596                "rank policy on index {}.{} has no resolved anchor join column",
2597                policy.joined_table, policy.joined_column
2598            )));
2599        }
2600        let join_value = anchor
2601            .values
2602            .get(&policy.anchor_column)
2603            .cloned()
2604            .unwrap_or(Value::Null);
2605        if join_value == Value::Null {
2606            return Ok(None);
2607        }
2608
2609        let indexes = self.relational_store.indexes.read();
2610        let storage = indexes
2611            .get(&(policy.joined_table.clone(), policy.protected_index.clone()))
2612            .ok_or_else(|| {
2613                Error::Other(format!(
2614                    "rank policy protected index `{}` missing on table `{}`",
2615                    policy.protected_index, policy.joined_table
2616                ))
2617            })?;
2618        let Some((first_column, direction)) = storage.columns.first() else {
2619            return Err(Error::Other(format!(
2620                "rank policy protected index `{}` on `{}` has no columns",
2621                policy.protected_index, policy.joined_table
2622            )));
2623        };
2624        if first_column != &policy.joined_column {
2625            return Err(Error::Other(format!(
2626                "rank policy protected index `{}` on `{}` no longer leads with `{}`",
2627                policy.protected_index, policy.joined_table, policy.joined_column
2628            )));
2629        }
2630
2631        let key_component = match direction {
2632            SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(join_value.clone())),
2633            SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(join_value.clone())),
2634        };
2635        let mut best_row_id: Option<RowId> = None;
2636        let mut consider = |entries: &[contextdb_relational::IndexEntry]| {
2637            for entry in entries {
2638                if entry.visible_at(snapshot)
2639                    && best_row_id.is_none_or(|current| current < entry.row_id)
2640                {
2641                    best_row_id = Some(entry.row_id);
2642                }
2643            }
2644        };
2645
2646        if storage.columns.len() == 1 {
2647            if let Some(entries) = storage.tree.get(&vec![key_component.clone()]) {
2648                consider(entries);
2649            }
2650        } else {
2651            for (key, entries) in storage.tree.range(vec![key_component.clone()]..) {
2652                if key.first() != Some(&key_component) {
2653                    break;
2654                }
2655                consider(entries);
2656            }
2657        }
2658        drop(indexes);
2659
2660        let Some(row_id) = best_row_id else {
2661            return Ok(None);
2662        };
2663        let Some(row) = self
2664            .relational_store
2665            .row_by_id(&policy.joined_table, row_id, snapshot)
2666        else {
2667            return Ok(None);
2668        };
2669        let Some(value) = row.values.get(&policy.joined_column) else {
2670            return Ok(None);
2671        };
2672        if !values_equal_for_rank_join(value, &join_value) {
2673            return Ok(None);
2674        }
2675        Ok(Some(row))
2676    }
2677
2678    fn resolve_rank_formula_column(
2679        &self,
2680        policy: &RankPolicy,
2681        anchor: &VersionedRow,
2682        joined: Option<&VersionedRow>,
2683        column: &str,
2684    ) -> std::result::Result<Option<f32>, FormulaEvalError> {
2685        if let Some(value) = anchor.values.get(column) {
2686            return rank_value_to_number(value, column);
2687        }
2688        let Some(joined) = joined else {
2689            return Ok(None);
2690        };
2691        if self.corrupt_joined_values.read().contains(&(
2692            policy.joined_table.clone(),
2693            joined.row_id,
2694            column.to_string(),
2695        )) {
2696            return Err(FormulaEvalError::CorruptJoinedColumn {
2697                column: column.to_string(),
2698            });
2699        }
2700        let value = joined.values.get(column).unwrap_or(&Value::Null);
2701        rank_value_to_number(value, column)
2702    }
2703
2704    fn warn_rank_eval_error(
2705        &self,
2706        table: &str,
2707        column: &str,
2708        row_id: RowId,
2709        err: &FormulaEvalError,
2710    ) {
2711        let mut reason = err.reason();
2712        if reason.len() > 256 {
2713            reason.truncate(253);
2714            reason.push_str("...");
2715        }
2716        tracing::warn!(
2717            name: "rank_policy_eval_error",
2718            target: "rank_policy_eval_error",
2719            index = %rank_index_name(table, column),
2720            row_id = row_id.0,
2721            reason = %reason,
2722            "rank_policy_eval_error"
2723        );
2724    }
2725
2726    fn search_result_values(
2727        &self,
2728        index: &VectorIndexRef,
2729        row_id: RowId,
2730        snapshot: SnapshotId,
2731        mut values: HashMap<String, Value>,
2732    ) -> HashMap<String, Value> {
2733        if let Some(entry) = self.vector_store_live_entry_for_row(index, row_id, snapshot) {
2734            values.insert(index.column.clone(), Value::Vector(entry.vector));
2735        }
2736        values
2737    }
2738
2739    fn pending_vector_dimension(&self, tx: TxId, index: &VectorIndexRef) -> Result<Option<usize>> {
2740        Ok(self
2741            .tx_mgr
2742            .cloned_write_set(tx)?
2743            .vector_inserts
2744            .iter()
2745            .rev()
2746            .find(|entry| entry.index == *index && entry.deleted_tx.is_none())
2747            .map(|entry| entry.vector.len()))
2748    }
2749
2750    fn direct_vector_dimension_error(
2751        &self,
2752        index: &VectorIndexRef,
2753        expected: usize,
2754        actual: usize,
2755    ) -> Error {
2756        Error::VectorIndexDimensionMismatch {
2757            index: index.clone(),
2758            expected,
2759            actual,
2760        }
2761    }
2762
2763    pub(crate) fn vector_insert_accounted_bytes(
2764        &self,
2765        index: &VectorIndexRef,
2766        dimension: usize,
2767    ) -> usize {
2768        self.vector_store
2769            .try_state(index)
2770            .map(|state| state.quantization().storage_bytes(dimension))
2771            .unwrap_or_else(|| 24 + dimension.saturating_mul(std::mem::size_of::<f32>()))
2772    }
2773
2774    #[doc(hidden)]
2775    pub fn __debug_vector_hnsw_len(&self, index: VectorIndexRef) -> Option<usize> {
2776        self.vector_store
2777            .try_state(&index)
2778            .and_then(|state| state.hnsw_len())
2779    }
2780
2781    #[doc(hidden)]
2782    pub fn __debug_vector_hnsw_stats(&self, index: VectorIndexRef) -> Option<HnswGraphStats> {
2783        self.vector_store
2784            .try_state(&index)
2785            .and_then(|state| state.hnsw_stats())
2786    }
2787
2788    #[doc(hidden)]
2789    pub fn __debug_vector_storage_bytes_per_entry(
2790        &self,
2791        index: VectorIndexRef,
2792    ) -> Result<Vec<usize>> {
2793        self.vector_store.storage_bytes_per_entry(&index)
2794    }
2795
2796    pub fn has_live_vector(&self, row_id: RowId, snapshot: SnapshotId) -> bool {
2797        !self
2798            .vector_store
2799            .live_entries_for_row(row_id, snapshot)
2800            .is_empty()
2801    }
2802
2803    pub fn live_vector_entry(&self, row_id: RowId, snapshot: SnapshotId) -> Option<VectorEntry> {
2804        self.vector_store
2805            .live_entries_for_row(row_id, snapshot)
2806            .into_iter()
2807            .next()
2808    }
2809
2810    pub(crate) fn vector_store_live_entry_for_row(
2811        &self,
2812        index: &VectorIndexRef,
2813        row_id: RowId,
2814        snapshot: SnapshotId,
2815    ) -> Option<VectorEntry> {
2816        self.vector_store
2817            .live_entry_for_row(index, row_id, snapshot)
2818    }
2819
2820    pub(crate) fn drop_table_aux_state(&self, table: &str) {
2821        let snapshot = self.snapshot_for_read();
2822        let rows = self.scan(table, snapshot).unwrap_or_default();
2823        let edge_keys: HashSet<(NodeId, EdgeType, NodeId)> = rows
2824            .iter()
2825            .filter_map(|row| {
2826                match (
2827                    row.values.get("source_id").and_then(Value::as_uuid),
2828                    row.values.get("target_id").and_then(Value::as_uuid),
2829                    row.values.get("edge_type").and_then(Value::as_text),
2830                ) {
2831                    (Some(source), Some(target), Some(edge_type)) => {
2832                        Some((*source, edge_type.to_string(), *target))
2833                    }
2834                    _ => None,
2835                }
2836            })
2837            .collect();
2838
2839        if !edge_keys.is_empty() {
2840            {
2841                let mut forward = self.graph_store.forward_adj.write();
2842                for entries in forward.values_mut() {
2843                    entries.retain(|entry| {
2844                        !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
2845                    });
2846                }
2847                forward.retain(|_, entries| !entries.is_empty());
2848            }
2849            {
2850                let mut reverse = self.graph_store.reverse_adj.write();
2851                for entries in reverse.values_mut() {
2852                    entries.retain(|entry| {
2853                        !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
2854                    });
2855                }
2856                reverse.retain(|_, entries| !entries.is_empty());
2857            }
2858        }
2859    }
2860
2861    pub fn table_names(&self) -> Vec<String> {
2862        self.relational_store.table_names()
2863    }
2864
2865    pub fn table_meta(&self, table: &str) -> Option<TableMeta> {
2866        self.relational_store.table_meta(table)
2867    }
2868
2869    /// Execute at a specific snapshot. Threads `snapshot` through via a
2870    /// thread-local override so scans / IndexScans filter visibility using
2871    /// the caller-provided snapshot, not the live committed watermark.
2872    #[doc(hidden)]
2873    pub fn execute_at_snapshot(
2874        &self,
2875        sql: &str,
2876        params: &HashMap<String, Value>,
2877        snapshot: SnapshotId,
2878    ) -> Result<QueryResult> {
2879        SNAPSHOT_OVERRIDE.with(|cell| {
2880            let prior = cell.replace(Some(snapshot));
2881            let r = self.execute(sql, params);
2882            cell.replace(prior);
2883            r
2884        })
2885    }
2886
2887    pub(crate) fn snapshot_for_read(&self) -> SnapshotId {
2888        SNAPSHOT_OVERRIDE.with(|cell| cell.borrow().unwrap_or_else(|| self.snapshot()))
2889    }
2890
2891    /// Return the row changes since `since`. Walks `change_log` for
2892    /// `RowInsert` / `RowDelete` entries whose LSN exceeds `since`, fetches
2893    /// the row values out of the live relational store, and emits a
2894    /// `RowChange` the receiver can replay. row_id order is preserved.
2895    #[doc(hidden)]
2896    pub fn change_log_rows_since(&self, since: Lsn) -> Result<Vec<RowChange>> {
2897        let entries = self.change_log_since(since);
2898        let tables = self.relational_store.tables.read();
2899        let mut out = Vec::new();
2900        for e in entries {
2901            match e {
2902                ChangeLogEntry::RowInsert { table, row_id, lsn } => {
2903                    let Some(rows) = tables.get(&table) else {
2904                        continue;
2905                    };
2906                    let Some(row) = rows.iter().find(|r| r.row_id == row_id) else {
2907                        continue;
2908                    };
2909                    let natural_key = row
2910                        .values
2911                        .get("id")
2912                        .cloned()
2913                        .map(|value| NaturalKey {
2914                            column: "id".to_string(),
2915                            value,
2916                        })
2917                        .unwrap_or_else(|| NaturalKey {
2918                            column: "id".to_string(),
2919                            value: Value::Int64(row_id.0 as i64),
2920                        });
2921                    out.push(RowChange {
2922                        table,
2923                        natural_key,
2924                        values: row.values.clone(),
2925                        deleted: row.deleted_tx.is_some(),
2926                        lsn,
2927                    });
2928                }
2929                ChangeLogEntry::RowDelete {
2930                    table,
2931                    row_id: _,
2932                    natural_key,
2933                    lsn,
2934                } => {
2935                    out.push(RowChange {
2936                        table,
2937                        natural_key,
2938                        values: HashMap::new(),
2939                        deleted: true,
2940                        lsn,
2941                    });
2942                }
2943                _ => {}
2944            }
2945        }
2946        Ok(out)
2947    }
2948
2949    /// Count of base rows the executor touched during the most recent query.
2950    #[doc(hidden)]
2951    pub fn __rows_examined(&self) -> u64 {
2952        self.rows_examined.load(Ordering::SeqCst)
2953    }
2954
2955    #[doc(hidden)]
2956    pub fn __reset_rows_examined(&self) {
2957        self.rows_examined.store(0, Ordering::SeqCst);
2958    }
2959
2960    #[doc(hidden)]
2961    pub fn __bump_rows_examined(&self, delta: u64) {
2962        self.rows_examined.fetch_add(delta, Ordering::SeqCst);
2963    }
2964
2965    /// Count of batch-level `indexes.write()` lock acquisitions since startup.
2966    /// `apply_changes` bumps this once per batch; per-row commits do not.
2967    #[doc(hidden)]
2968    pub fn __index_write_lock_count(&self) -> u64 {
2969        self.relational_store.index_write_lock_count()
2970    }
2971
2972    /// Total entries across every registered index's BTreeMap.
2973    #[doc(hidden)]
2974    pub fn __introspect_indexes_total_entries(&self) -> u64 {
2975        self.relational_store.introspect_indexes_total_entries()
2976    }
2977
2978    /// Probe the constraint-check path for a specific table/column/value.
2979    /// Returns a QueryResult whose trace reflects whether the probe went
2980    /// through an index (IndexScan) or a full scan. Accepts either a
2981    /// single-column index or a composite leading-column match.
2982    #[doc(hidden)]
2983    pub fn __probe_constraint_check(
2984        &self,
2985        table: &str,
2986        column: &str,
2987        value: Value,
2988    ) -> Result<QueryResult> {
2989        let covered = self.index_covers_column(table, column)
2990            || self
2991                .relational_store
2992                .indexes
2993                .read()
2994                .iter()
2995                .any(|((t, _), idx)| {
2996                    t == table && idx.columns.first().is_some_and(|(c, _)| c == column)
2997                });
2998        let trace = if covered {
2999            QueryTrace {
3000                physical_plan: "IndexScan",
3001                index_used: None,
3002                predicates_pushed: Default::default(),
3003                indexes_considered: Default::default(),
3004                sort_elided: false,
3005            }
3006        } else {
3007            QueryTrace::scan()
3008        };
3009        let _ = value;
3010        Ok(QueryResult {
3011            columns: vec![],
3012            rows: vec![],
3013            rows_affected: 0,
3014            trace,
3015            cascade: None,
3016        })
3017    }
3018
3019    /// Run one pruning cycle. Called by the background loop or manually in tests.
3020    pub fn run_pruning_cycle(&self) -> u64 {
3021        let _guard = self.pruning_guard.lock();
3022        prune_expired_rows(
3023            &self.relational_store,
3024            &self.graph_store,
3025            &self.vector_store,
3026            self.accountant(),
3027            self.persistence.as_ref(),
3028            self.sync_watermark(),
3029        )
3030    }
3031
3032    /// Set the pruning loop interval. Test-only API.
3033    pub fn set_pruning_interval(&self, interval: Duration) {
3034        self.stop_pruning_thread();
3035
3036        let shutdown = Arc::new(AtomicBool::new(false));
3037        let relational = self.relational_store.clone();
3038        let graph = self.graph_store.clone();
3039        let vector = self.vector_store.clone();
3040        let accountant = self.accountant.clone();
3041        let persistence = self.persistence.clone();
3042        let sync_watermark = self.sync_watermark.clone();
3043        let pruning_guard = self.pruning_guard.clone();
3044        let thread_shutdown = shutdown.clone();
3045
3046        let handle = thread::spawn(move || {
3047            while !thread_shutdown.load(Ordering::SeqCst) {
3048                {
3049                    let _guard = pruning_guard.lock();
3050                    let _ = prune_expired_rows(
3051                        &relational,
3052                        &graph,
3053                        &vector,
3054                        accountant.as_ref(),
3055                        persistence.as_ref(),
3056                        sync_watermark.load(Ordering::SeqCst),
3057                    );
3058                }
3059                sleep_with_shutdown(&thread_shutdown, interval);
3060            }
3061        });
3062
3063        let mut runtime = self.pruning_runtime.lock();
3064        runtime.shutdown = shutdown;
3065        runtime.handle = Some(handle);
3066    }
3067
3068    pub fn sync_watermark(&self) -> Lsn {
3069        self.sync_watermark.load(Ordering::SeqCst)
3070    }
3071
3072    pub fn set_sync_watermark(&self, watermark: Lsn) {
3073        self.sync_watermark.store(watermark, Ordering::SeqCst);
3074    }
3075
3076    pub fn instance_id(&self) -> uuid::Uuid {
3077        self.instance_id
3078    }
3079
3080    pub fn open_memory_with_plugin_and_accountant(
3081        plugin: Arc<dyn DatabasePlugin>,
3082        accountant: Arc<MemoryAccountant>,
3083    ) -> Result<Self> {
3084        Self::open_memory_internal(plugin, accountant)
3085    }
3086
3087    pub fn open_memory_with_plugin(plugin: Arc<dyn DatabasePlugin>) -> Result<Self> {
3088        let db = Self::open_memory_with_plugin_and_accountant(
3089            plugin,
3090            Arc::new(MemoryAccountant::no_limit()),
3091        )?;
3092        db.plugin.on_open()?;
3093        Ok(db)
3094    }
3095
3096    pub fn close(&self) -> Result<()> {
3097        if self.closed.swap(true, Ordering::SeqCst) {
3098            return Ok(());
3099        }
3100        let tx = self.session_tx.lock().take();
3101        if let Some(tx) = tx {
3102            self.rollback(tx)?;
3103        }
3104        self.stop_pruning_thread();
3105        self.subscriptions.lock().subscribers.clear();
3106        if let Some(persistence) = &self.persistence {
3107            persistence.close();
3108        }
3109        self.plugin.on_close()
3110    }
3111
3112    /// File-backed database with custom plugin.
3113    pub fn open_with_plugin(
3114        path: impl AsRef<Path>,
3115        plugin: Arc<dyn DatabasePlugin>,
3116    ) -> Result<Self> {
3117        let db = Self::open_loaded(path, plugin, Arc::new(MemoryAccountant::no_limit()), None)?;
3118        db.plugin.on_open()?;
3119        Ok(db)
3120    }
3121
3122    /// Full constructor with budget.
3123    pub fn open_with_config(
3124        path: impl AsRef<Path>,
3125        plugin: Arc<dyn DatabasePlugin>,
3126        accountant: Arc<MemoryAccountant>,
3127    ) -> Result<Self> {
3128        Self::open_with_config_and_disk_limit(path, plugin, accountant, None)
3129    }
3130
3131    pub fn open_with_config_and_disk_limit(
3132        path: impl AsRef<Path>,
3133        plugin: Arc<dyn DatabasePlugin>,
3134        accountant: Arc<MemoryAccountant>,
3135        startup_disk_limit: Option<u64>,
3136    ) -> Result<Self> {
3137        let path = path.as_ref();
3138        if path.as_os_str() == ":memory:" {
3139            return Self::open_memory_with_plugin_and_accountant(plugin, accountant);
3140        }
3141        let accountant = if let Some(limit) = persisted_memory_limit(path)? {
3142            let usage = accountant.usage();
3143            if usage.limit.is_none() && usage.startup_ceiling.is_none() {
3144                Arc::new(MemoryAccountant::with_budget(limit))
3145            } else {
3146                accountant
3147            }
3148        } else {
3149            accountant
3150        };
3151        let db = Self::open_loaded(path, plugin, accountant, startup_disk_limit)?;
3152        db.plugin.on_open()?;
3153        Ok(db)
3154    }
3155
3156    /// In-memory database with budget.
3157    pub fn open_memory_with_accountant(accountant: Arc<MemoryAccountant>) -> Self {
3158        Self::open_memory_internal(Arc::new(CorePlugin), accountant)
3159            .expect("failed to open in-memory database with accountant")
3160    }
3161
3162    /// Access the memory accountant.
3163    pub fn accountant(&self) -> &MemoryAccountant {
3164        &self.accountant
3165    }
3166
3167    pub(crate) fn register_vector_index_for_column(&self, table: &str, column: &ColumnDef) {
3168        if let ColumnType::Vector(dimension) = column.column_type {
3169            self.vector_store.register_index(
3170                VectorIndexRef::new(table, column.name.clone()),
3171                dimension,
3172                column.quantization,
3173            );
3174        }
3175    }
3176
3177    pub(crate) fn deregister_vector_index(&self, table: &str, column: &str) {
3178        self.vector_store
3179            .deregister_index(&VectorIndexRef::new(table, column), self.accountant());
3180    }
3181
3182    pub(crate) fn rename_vector_index(&self, table: &str, from: &str, to: &str) -> Result<()> {
3183        self.vector_store.rename_index(
3184            &VectorIndexRef::new(table, from),
3185            VectorIndexRef::new(table, to),
3186        )
3187    }
3188
3189    pub(crate) fn vector_store_deregister_table(&self, table: &str) {
3190        self.vector_store.deregister_table(table, self.accountant());
3191    }
3192
3193    pub(crate) fn vector_index_infos(&self) -> Vec<contextdb_vector::store::VectorIndexInfo> {
3194        self.vector_store.index_infos()
3195    }
3196
3197    fn account_loaded_state(&self) -> Result<()> {
3198        let metadata_bytes = self
3199            .relational_store
3200            .table_meta
3201            .read()
3202            .values()
3203            .fold(0usize, |acc, meta| {
3204                acc.saturating_add(meta.estimated_bytes())
3205            });
3206        self.accountant.try_allocate_for(
3207            metadata_bytes,
3208            "open",
3209            "load_table_metadata",
3210            "Open the database with a larger MEMORY_LIMIT or reduce stored schema metadata.",
3211        )?;
3212
3213        let row_bytes =
3214            self.relational_store
3215                .tables
3216                .read()
3217                .iter()
3218                .fold(0usize, |acc, (table, rows)| {
3219                    let meta = self.table_meta(table);
3220                    acc.saturating_add(rows.iter().fold(0usize, |inner, row| {
3221                        inner.saturating_add(meta.as_ref().map_or_else(
3222                            || row.estimated_bytes(),
3223                            |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
3224                        ))
3225                    }))
3226                });
3227        self.accountant.try_allocate_for(
3228            row_bytes,
3229            "open",
3230            "load_rows",
3231            "Open the database with a larger MEMORY_LIMIT or prune retained rows first.",
3232        )?;
3233
3234        let edge_bytes = self
3235            .graph_store
3236            .forward_adj
3237            .read()
3238            .values()
3239            .flatten()
3240            .filter(|edge| edge.deleted_tx.is_none())
3241            .fold(0usize, |acc, edge| {
3242                acc.saturating_add(edge.estimated_bytes())
3243            });
3244        self.accountant.try_allocate_for(
3245            edge_bytes,
3246            "open",
3247            "load_edges",
3248            "Open the database with a larger MEMORY_LIMIT or reduce graph edge volume.",
3249        )?;
3250
3251        let vector_bytes = self
3252            .vector_store
3253            .index_infos()
3254            .into_iter()
3255            .fold(0usize, |acc, info| acc.saturating_add(info.bytes));
3256        self.accountant.try_allocate_for(
3257            vector_bytes,
3258            "open",
3259            "load_vectors",
3260            "Open the database with a larger MEMORY_LIMIT or reduce stored vector data.",
3261        )?;
3262
3263        Ok(())
3264    }
3265
3266    fn release_insert_allocations(&self, ws: &contextdb_tx::WriteSet) {
3267        for (table, row) in &ws.relational_inserts {
3268            let bytes = self
3269                .table_meta(table)
3270                .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
3271                .unwrap_or_else(|| row.estimated_bytes());
3272            self.accountant.release(bytes);
3273        }
3274
3275        for edge in &ws.adj_inserts {
3276            self.accountant.release(edge.estimated_bytes());
3277        }
3278
3279        for entry in &ws.vector_inserts {
3280            self.accountant
3281                .release(self.vector_insert_accounted_bytes(&entry.index, entry.vector.len()));
3282        }
3283    }
3284
3285    fn release_delete_allocations(&self, ws: &contextdb_tx::WriteSet) {
3286        for (table, row_id, _) in &ws.relational_deletes {
3287            if let Some(row) = self.find_row_by_id(table, *row_id) {
3288                let bytes = self
3289                    .table_meta(table)
3290                    .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
3291                    .unwrap_or_else(|| row.estimated_bytes());
3292                self.accountant.release(bytes);
3293            }
3294        }
3295
3296        for (source, edge_type, target, _) in &ws.adj_deletes {
3297            if let Some(edge) = self.find_edge(source, target, edge_type) {
3298                self.accountant.release(edge.estimated_bytes());
3299            }
3300        }
3301
3302        for (index, row_id, _) in &ws.vector_deletes {
3303            if let Some(vector) = self.find_vector_by_index_and_row(index, *row_id) {
3304                self.accountant
3305                    .release(self.vector_insert_accounted_bytes(index, vector.vector.len()));
3306            }
3307        }
3308
3309        if !ws.vector_deletes.is_empty() {
3310            self.vector_store.clear_hnsw(self.accountant());
3311        }
3312    }
3313
3314    fn find_row_by_id(&self, table: &str, row_id: RowId) -> Option<VersionedRow> {
3315        self.relational_store
3316            .tables
3317            .read()
3318            .get(table)
3319            .and_then(|rows| rows.iter().find(|row| row.row_id == row_id))
3320            .cloned()
3321    }
3322
3323    fn find_vector_by_index_and_row(
3324        &self,
3325        index: &VectorIndexRef,
3326        row_id: RowId,
3327    ) -> Option<VectorEntry> {
3328        self.vector_store
3329            .try_state(index)
3330            .and_then(|state| state.find_by_row_id(index, row_id))
3331    }
3332
3333    fn find_edge(&self, source: &NodeId, target: &NodeId, edge_type: &str) -> Option<AdjEntry> {
3334        self.graph_store
3335            .forward_adj
3336            .read()
3337            .get(source)
3338            .and_then(|entries| {
3339                entries
3340                    .iter()
3341                    .find(|entry| entry.target == *target && entry.edge_type == edge_type)
3342                    .cloned()
3343            })
3344    }
3345
3346    pub(crate) fn write_set_checkpoint(
3347        &self,
3348        tx: TxId,
3349    ) -> Result<(usize, usize, usize, usize, usize, usize)> {
3350        self.tx_mgr.with_write_set(tx, |ws| {
3351            (
3352                ws.relational_inserts.len(),
3353                ws.relational_deletes.len(),
3354                ws.adj_inserts.len(),
3355                ws.vector_inserts.len(),
3356                ws.vector_deletes.len(),
3357                ws.vector_moves.len(),
3358            )
3359        })
3360    }
3361
3362    pub(crate) fn restore_write_set_checkpoint(
3363        &self,
3364        tx: TxId,
3365        checkpoint: (usize, usize, usize, usize, usize, usize),
3366    ) -> Result<()> {
3367        self.tx_mgr.with_write_set(tx, |ws| {
3368            ws.relational_inserts.truncate(checkpoint.0);
3369            ws.relational_deletes.truncate(checkpoint.1);
3370            ws.adj_inserts.truncate(checkpoint.2);
3371            ws.vector_inserts.truncate(checkpoint.3);
3372            ws.vector_deletes.truncate(checkpoint.4);
3373            ws.vector_moves.truncate(checkpoint.5);
3374        })
3375    }
3376
3377    /// Get a clone of the current conflict policies.
3378    pub fn conflict_policies(&self) -> ConflictPolicies {
3379        self.conflict_policies.read().clone()
3380    }
3381
3382    /// Set the default conflict policy.
3383    pub fn set_default_conflict_policy(&self, policy: ConflictPolicy) {
3384        self.conflict_policies.write().default = policy;
3385    }
3386
3387    /// Set a per-table conflict policy.
3388    pub fn set_table_conflict_policy(&self, table: &str, policy: ConflictPolicy) {
3389        self.conflict_policies
3390            .write()
3391            .per_table
3392            .insert(table.to_string(), policy);
3393    }
3394
3395    /// Remove a per-table conflict policy override.
3396    pub fn drop_table_conflict_policy(&self, table: &str) {
3397        self.conflict_policies.write().per_table.remove(table);
3398    }
3399
3400    pub fn plugin(&self) -> &dyn DatabasePlugin {
3401        self.plugin.as_ref()
3402    }
3403
3404    pub fn plugin_health(&self) -> PluginHealth {
3405        self.plugin.health()
3406    }
3407
3408    pub fn plugin_describe(&self) -> serde_json::Value {
3409        self.plugin.describe()
3410    }
3411
3412    pub(crate) fn graph(&self) -> &MemGraphExecutor<DynStore> {
3413        &self.graph
3414    }
3415
3416    pub(crate) fn relational_store(&self) -> &Arc<RelationalStore> {
3417        &self.relational_store
3418    }
3419
3420    pub(crate) fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
3421    where
3422        F: FnOnce(Lsn) -> R,
3423    {
3424        self.tx_mgr.allocate_ddl_lsn(f)
3425    }
3426
3427    pub(crate) fn with_commit_lock<F, R>(&self, f: F) -> R
3428    where
3429        F: FnOnce() -> R,
3430    {
3431        self.tx_mgr.with_commit_lock(f)
3432    }
3433
3434    pub(crate) fn log_create_table_ddl(
3435        &self,
3436        name: &str,
3437        meta: &TableMeta,
3438        lsn: Lsn,
3439    ) -> Result<()> {
3440        let change = ddl_change_from_meta(name, meta);
3441        self.ddl_log.write().push((lsn, change.clone()));
3442        if let Some(persistence) = &self.persistence {
3443            persistence.append_ddl_log(lsn, &change)?;
3444        }
3445        Ok(())
3446    }
3447
3448    pub(crate) fn log_drop_table_ddl(&self, name: &str, lsn: Lsn) -> Result<()> {
3449        let change = DdlChange::DropTable {
3450            name: name.to_string(),
3451        };
3452        self.ddl_log.write().push((lsn, change.clone()));
3453        if let Some(persistence) = &self.persistence {
3454            persistence.append_ddl_log(lsn, &change)?;
3455        }
3456        Ok(())
3457    }
3458
3459    pub(crate) fn log_alter_table_ddl(&self, name: &str, meta: &TableMeta, lsn: Lsn) -> Result<()> {
3460        let change = DdlChange::AlterTable {
3461            name: name.to_string(),
3462            columns: meta
3463                .columns
3464                .iter()
3465                .map(|c| {
3466                    (
3467                        c.name.clone(),
3468                        sql_type_for_meta_column(c, &meta.propagation_rules),
3469                    )
3470                })
3471                .collect(),
3472            constraints: create_table_constraints_from_meta(meta),
3473        };
3474        self.ddl_log.write().push((lsn, change.clone()));
3475        if let Some(persistence) = &self.persistence {
3476            persistence.append_ddl_log(lsn, &change)?;
3477        }
3478        Ok(())
3479    }
3480
3481    pub(crate) fn log_create_index_ddl(
3482        &self,
3483        table: &str,
3484        name: &str,
3485        columns: &[(String, contextdb_core::SortDirection)],
3486        lsn: Lsn,
3487    ) -> Result<()> {
3488        let change = DdlChange::CreateIndex {
3489            table: table.to_string(),
3490            name: name.to_string(),
3491            columns: columns.to_vec(),
3492        };
3493        self.ddl_log.write().push((lsn, change.clone()));
3494        if let Some(persistence) = &self.persistence {
3495            persistence.append_ddl_log(lsn, &change)?;
3496        }
3497        Ok(())
3498    }
3499
3500    pub(crate) fn log_drop_index_ddl(&self, table: &str, name: &str, lsn: Lsn) -> Result<()> {
3501        let change = DdlChange::DropIndex {
3502            table: table.to_string(),
3503            name: name.to_string(),
3504        };
3505        self.ddl_log.write().push((lsn, change.clone()));
3506        if let Some(persistence) = &self.persistence {
3507            persistence.append_ddl_log(lsn, &change)?;
3508        }
3509        Ok(())
3510    }
3511
3512    pub(crate) fn persist_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
3513        if let Some(persistence) = &self.persistence {
3514            persistence.flush_table_meta(name, meta)?;
3515        }
3516        Ok(())
3517    }
3518
3519    pub(crate) fn persist_memory_limit(&self, limit: Option<usize>) -> Result<()> {
3520        if let Some(persistence) = &self.persistence {
3521            match limit {
3522                Some(limit) => persistence.flush_config_value("memory_limit", &limit)?,
3523                None => persistence.remove_config_value("memory_limit")?,
3524            }
3525        }
3526        Ok(())
3527    }
3528
3529    pub fn set_disk_limit(&self, limit: Option<u64>) -> Result<()> {
3530        if self.persistence.is_none() {
3531            self.disk_limit.store(0, Ordering::SeqCst);
3532            return Ok(());
3533        }
3534
3535        let ceiling = self.disk_limit_startup_ceiling();
3536        if let Some(ceiling) = ceiling {
3537            match limit {
3538                Some(bytes) if bytes > ceiling => {
3539                    return Err(Error::Other(format!(
3540                        "disk limit {bytes} exceeds startup ceiling {ceiling}"
3541                    )));
3542                }
3543                None => {
3544                    return Err(Error::Other(
3545                        "cannot remove disk limit when a startup ceiling is set".to_string(),
3546                    ));
3547                }
3548                _ => {}
3549            }
3550        }
3551
3552        self.disk_limit.store(limit.unwrap_or(0), Ordering::SeqCst);
3553        Ok(())
3554    }
3555
3556    pub fn disk_limit(&self) -> Option<u64> {
3557        match self.disk_limit.load(Ordering::SeqCst) {
3558            0 => None,
3559            bytes => Some(bytes),
3560        }
3561    }
3562
3563    pub fn disk_limit_startup_ceiling(&self) -> Option<u64> {
3564        match self.disk_limit_startup_ceiling.load(Ordering::SeqCst) {
3565            0 => None,
3566            bytes => Some(bytes),
3567        }
3568    }
3569
3570    pub fn disk_file_size(&self) -> Option<u64> {
3571        self.persistence
3572            .as_ref()
3573            .map(|persistence| std::fs::metadata(persistence.path()).map(|meta| meta.len()))
3574            .transpose()
3575            .ok()
3576            .flatten()
3577    }
3578
3579    pub(crate) fn persist_disk_limit(&self, limit: Option<u64>) -> Result<()> {
3580        if let Some(persistence) = &self.persistence {
3581            match limit {
3582                Some(limit) => persistence.flush_config_value("disk_limit", &limit)?,
3583                None => persistence.remove_config_value("disk_limit")?,
3584            }
3585        }
3586        Ok(())
3587    }
3588
3589    pub fn check_disk_budget(&self, operation: &str) -> Result<()> {
3590        let Some(limit) = self.disk_limit() else {
3591            return Ok(());
3592        };
3593        let Some(current_bytes) = self.disk_file_size() else {
3594            return Ok(());
3595        };
3596        if current_bytes.saturating_add(MIN_DISK_WRITE_HEADROOM_BYTES) <= limit {
3597            return Ok(());
3598        }
3599        Err(Error::DiskBudgetExceeded {
3600            operation: operation.to_string(),
3601            current_bytes,
3602            budget_limit_bytes: limit,
3603            hint: "Reduce retained file-backed data or raise DISK_LIMIT before writing more data."
3604                .to_string(),
3605        })
3606    }
3607
3608    pub fn persisted_sync_watermarks(&self, tenant_id: &str) -> Result<(Lsn, Lsn)> {
3609        let Some(persistence) = &self.persistence else {
3610            return Ok((Lsn(0), Lsn(0)));
3611        };
3612        let push = persistence
3613            .load_config_value::<u64>(&format!("sync_push_watermark:{tenant_id}"))?
3614            .map(Lsn)
3615            .unwrap_or(Lsn(0));
3616        let pull = persistence
3617            .load_config_value::<u64>(&format!("sync_pull_watermark:{tenant_id}"))?
3618            .map(Lsn)
3619            .unwrap_or(Lsn(0));
3620        Ok((push, pull))
3621    }
3622
3623    pub fn persist_sync_push_watermark(&self, tenant_id: &str, watermark: Lsn) -> Result<()> {
3624        if let Some(persistence) = &self.persistence {
3625            persistence
3626                .flush_config_value(&format!("sync_push_watermark:{tenant_id}"), &watermark.0)?;
3627        }
3628        Ok(())
3629    }
3630
3631    pub fn persist_sync_pull_watermark(&self, tenant_id: &str, watermark: Lsn) -> Result<()> {
3632        if let Some(persistence) = &self.persistence {
3633            persistence
3634                .flush_config_value(&format!("sync_pull_watermark:{tenant_id}"), &watermark.0)?;
3635        }
3636        Ok(())
3637    }
3638
3639    pub(crate) fn persist_table_rows(&self, name: &str) -> Result<()> {
3640        if let Some(persistence) = &self.persistence {
3641            let tables = self.relational_store.tables.read();
3642            if let Some(rows) = tables.get(name) {
3643                persistence.rewrite_table_rows(name, rows)?;
3644            }
3645        }
3646        Ok(())
3647    }
3648
3649    pub(crate) fn persist_vectors(&self) -> Result<()> {
3650        if let Some(persistence) = &self.persistence {
3651            let vectors = self.vector_store.all_entries();
3652            persistence.rewrite_vectors(&vectors)?;
3653        }
3654        Ok(())
3655    }
3656
3657    pub(crate) fn remove_persisted_table(&self, name: &str) -> Result<()> {
3658        if let Some(persistence) = &self.persistence {
3659            persistence.remove_table_meta(name)?;
3660            persistence.remove_table_data(name)?;
3661        }
3662        Ok(())
3663    }
3664
3665    pub fn change_log_since(&self, since_lsn: Lsn) -> Vec<ChangeLogEntry> {
3666        let log = self.change_log.read();
3667        let start = log.partition_point(|e| e.lsn() <= since_lsn);
3668        log[start..].to_vec()
3669    }
3670
3671    pub fn ddl_log_since(&self, since_lsn: Lsn) -> Vec<DdlChange> {
3672        let ddl = self.ddl_log.read();
3673        let start = ddl.partition_point(|(lsn, _)| *lsn <= since_lsn);
3674        ddl[start..].iter().map(|(_, c)| c.clone()).collect()
3675    }
3676
3677    /// Builds a complete snapshot of all live data as a ChangeSet.
3678    /// Used as fallback when change_log/ddl_log cannot serve a watermark.
3679    #[allow(dead_code)]
3680    fn full_state_snapshot(&self) -> ChangeSet {
3681        let mut rows = Vec::new();
3682        let mut edges = Vec::new();
3683        let mut vectors = Vec::new();
3684        let mut ddl = Vec::new();
3685
3686        let meta_guard = self.relational_store.table_meta.read();
3687        let tables_guard = self.relational_store.tables.read();
3688
3689        // DDL. A full snapshot must be directly applyable to an empty peer:
3690        // create joined tables and their user indexes before any table whose
3691        // rank policy validates against them.
3692        ddl.extend(full_snapshot_ddl(&meta_guard));
3693
3694        // Rows (live only) — collect row_ids that have live rows for orphan vector filtering
3695        let mut live_row_ids: HashSet<RowId> = HashSet::new();
3696        for (table_name, table_rows) in tables_guard.iter() {
3697            let meta = match meta_guard.get(table_name) {
3698                Some(m) => m,
3699                None => continue,
3700            };
3701            let key_col = meta
3702                .natural_key_column
3703                .clone()
3704                .or_else(|| {
3705                    if meta
3706                        .columns
3707                        .iter()
3708                        .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3709                    {
3710                        Some("id".to_string())
3711                    } else {
3712                        None
3713                    }
3714                })
3715                .or_else(|| {
3716                    meta.columns
3717                        .iter()
3718                        .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
3719                        .map(|c| c.name.clone())
3720                })
3721                .unwrap_or_default();
3722            if key_col.is_empty() {
3723                continue;
3724            }
3725            for row in table_rows.iter().filter(|r| r.deleted_tx.is_none()) {
3726                let key_val = match row.values.get(&key_col) {
3727                    Some(v) => v.clone(),
3728                    None => continue,
3729                };
3730                live_row_ids.insert(row.row_id);
3731                rows.push(RowChange {
3732                    table: table_name.clone(),
3733                    natural_key: NaturalKey {
3734                        column: key_col.clone(),
3735                        value: key_val,
3736                    },
3737                    values: row.values.clone(),
3738                    deleted: false,
3739                    lsn: row.lsn,
3740                });
3741            }
3742        }
3743
3744        drop(tables_guard);
3745        drop(meta_guard);
3746
3747        // Edges (live only)
3748        let fwd = self.graph_store.forward_adj.read();
3749        for (_source, entries) in fwd.iter() {
3750            for entry in entries.iter().filter(|e| e.deleted_tx.is_none()) {
3751                edges.push(EdgeChange {
3752                    source: entry.source,
3753                    target: entry.target,
3754                    edge_type: entry.edge_type.clone(),
3755                    properties: entry.properties.clone(),
3756                    lsn: entry.lsn,
3757                });
3758            }
3759        }
3760        drop(fwd);
3761
3762        // Vectors (live only, skip orphans)
3763        for entry in self
3764            .vector_store
3765            .all_entries()
3766            .into_iter()
3767            .filter(|v| v.deleted_tx.is_none())
3768        {
3769            if !live_row_ids.contains(&entry.row_id) {
3770                continue; // skip orphan vectors
3771            }
3772            vectors.push(VectorChange {
3773                index: entry.index.clone(),
3774                row_id: entry.row_id,
3775                vector: entry.vector,
3776                lsn: entry.lsn,
3777            });
3778        }
3779
3780        ChangeSet {
3781            rows,
3782            edges,
3783            vectors,
3784            ddl,
3785        }
3786    }
3787
3788    fn persisted_state_since(&self, since_lsn: Lsn) -> ChangeSet {
3789        if since_lsn == Lsn(0) {
3790            return self.full_state_snapshot();
3791        }
3792
3793        let mut rows = Vec::new();
3794        let mut edges = Vec::new();
3795        let mut vectors = Vec::new();
3796        let ddl = Vec::new();
3797
3798        let meta_guard = self.relational_store.table_meta.read();
3799        let tables_guard = self.relational_store.tables.read();
3800
3801        let mut live_row_ids: HashSet<RowId> = HashSet::new();
3802        for (table_name, table_rows) in tables_guard.iter() {
3803            let meta = match meta_guard.get(table_name) {
3804                Some(meta) => meta,
3805                None => continue,
3806            };
3807            let key_col = meta
3808                .natural_key_column
3809                .clone()
3810                .or_else(|| {
3811                    if meta
3812                        .columns
3813                        .iter()
3814                        .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3815                    {
3816                        Some("id".to_string())
3817                    } else {
3818                        None
3819                    }
3820                })
3821                .or_else(|| {
3822                    meta.columns
3823                        .iter()
3824                        .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
3825                        .map(|c| c.name.clone())
3826                })
3827                .unwrap_or_default();
3828            if key_col.is_empty() {
3829                continue;
3830            }
3831            for row in table_rows.iter().filter(|row| row.deleted_tx.is_none()) {
3832                live_row_ids.insert(row.row_id);
3833                if row.lsn <= since_lsn {
3834                    continue;
3835                }
3836                let key_val = match row.values.get(&key_col) {
3837                    Some(value) => value.clone(),
3838                    None => continue,
3839                };
3840                rows.push(RowChange {
3841                    table: table_name.clone(),
3842                    natural_key: NaturalKey {
3843                        column: key_col.clone(),
3844                        value: key_val,
3845                    },
3846                    values: row.values.clone(),
3847                    deleted: false,
3848                    lsn: row.lsn,
3849                });
3850            }
3851        }
3852        drop(tables_guard);
3853        drop(meta_guard);
3854
3855        let fwd = self.graph_store.forward_adj.read();
3856        for entries in fwd.values() {
3857            for entry in entries
3858                .iter()
3859                .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
3860            {
3861                edges.push(EdgeChange {
3862                    source: entry.source,
3863                    target: entry.target,
3864                    edge_type: entry.edge_type.clone(),
3865                    properties: entry.properties.clone(),
3866                    lsn: entry.lsn,
3867                });
3868            }
3869        }
3870        drop(fwd);
3871
3872        for entry in self
3873            .vector_store
3874            .all_entries()
3875            .into_iter()
3876            .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
3877        {
3878            if !live_row_ids.contains(&entry.row_id) {
3879                continue;
3880            }
3881            vectors.push(VectorChange {
3882                index: entry.index.clone(),
3883                row_id: entry.row_id,
3884                vector: entry.vector,
3885                lsn: entry.lsn,
3886            });
3887        }
3888
3889        ChangeSet {
3890            rows,
3891            edges,
3892            vectors,
3893            ddl,
3894        }
3895    }
3896
3897    fn preflight_sync_apply_memory(
3898        &self,
3899        changes: &ChangeSet,
3900        policies: &ConflictPolicies,
3901    ) -> Result<()> {
3902        let usage = self.accountant.usage();
3903        let Some(limit) = usage.limit else {
3904            return Ok(());
3905        };
3906        let available = usage.available.unwrap_or(limit);
3907        let mut required = 0usize;
3908
3909        for row in &changes.rows {
3910            if row.deleted || row.values.is_empty() {
3911                continue;
3912            }
3913
3914            let policy = policies
3915                .per_table
3916                .get(&row.table)
3917                .copied()
3918                .unwrap_or(policies.default);
3919            let existing = self.point_lookup(
3920                &row.table,
3921                &row.natural_key.column,
3922                &row.natural_key.value,
3923                self.snapshot(),
3924            )?;
3925
3926            if existing.is_some()
3927                && matches!(
3928                    policy,
3929                    ConflictPolicy::InsertIfNotExists | ConflictPolicy::ServerWins
3930                )
3931            {
3932                continue;
3933            }
3934
3935            required = required.saturating_add(
3936                self.table_meta(&row.table)
3937                    .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
3938                    .unwrap_or_else(|| estimate_row_value_bytes(&row.values)),
3939            );
3940        }
3941
3942        for edge in &changes.edges {
3943            required = required.saturating_add(
3944                96 + edge.edge_type.len().saturating_mul(16)
3945                    + estimate_row_value_bytes(&edge.properties),
3946            );
3947        }
3948
3949        for vector in &changes.vectors {
3950            if vector.vector.is_empty() {
3951                continue;
3952            }
3953            required = required.saturating_add(
3954                24 + vector
3955                    .vector
3956                    .len()
3957                    .saturating_mul(std::mem::size_of::<f32>()),
3958            );
3959        }
3960
3961        if required > available {
3962            return Err(Error::MemoryBudgetExceeded {
3963                subsystem: "sync".to_string(),
3964                operation: "apply_changes".to_string(),
3965                requested_bytes: required,
3966                available_bytes: available,
3967                budget_limit_bytes: limit,
3968                hint:
3969                    "Reduce sync batch size, split the push, or raise MEMORY_LIMIT on the server."
3970                        .to_string(),
3971            });
3972        }
3973
3974        Ok(())
3975    }
3976
3977    /// Extracts changes from this database since the given LSN.
3978    pub fn changes_since(&self, since_lsn: Lsn) -> ChangeSet {
3979        // Future watermark guard
3980        if since_lsn > self.current_lsn() {
3981            return ChangeSet::default();
3982        }
3983
3984        // Check if the ephemeral logs can serve the requested watermark.
3985        // After restart, both logs are empty but stores may have data — fall back to snapshot.
3986        let log = self.change_log.read();
3987        let change_first_lsn = log.first().map(|e| e.lsn());
3988        let change_log_empty = log.is_empty();
3989        drop(log);
3990
3991        let ddl = self.ddl_log.read();
3992        let ddl_first_lsn = ddl.first().map(|(lsn, _)| *lsn);
3993        let ddl_log_empty = ddl.is_empty();
3994        drop(ddl);
3995
3996        let has_table_data = !self
3997            .relational_store
3998            .tables
3999            .read()
4000            .values()
4001            .all(|rows| rows.is_empty());
4002        let has_table_meta = !self.relational_store.table_meta.read().is_empty();
4003
4004        // If both logs are empty but stores have data → post-restart, derive deltas from
4005        // persisted row/edge/vector LSNs instead of replaying a full snapshot.
4006        if change_log_empty && ddl_log_empty && (has_table_data || has_table_meta) {
4007            return self.persisted_state_since(since_lsn);
4008        }
4009
4010        // If logs have entries, check the minimum first-LSN across both covers since_lsn
4011        let min_first_lsn = match (change_first_lsn, ddl_first_lsn) {
4012            (Some(c), Some(d)) => Some(c.min(d)),
4013            (Some(c), None) => Some(c),
4014            (None, Some(d)) => Some(d),
4015            (None, None) => None, // both empty, stores empty — nothing to serve
4016        };
4017
4018        if min_first_lsn.is_some_and(|min_lsn| min_lsn.0 > since_lsn.0 + 1) {
4019            // Log doesn't cover since_lsn — derive the delta from persisted state.
4020            return self.persisted_state_since(since_lsn);
4021        }
4022
4023        let (ddl, change_entries) = self.with_commit_lock(|| {
4024            let ddl = self.ddl_log_since(since_lsn);
4025            let changes = self.change_log_since(since_lsn);
4026            (ddl, changes)
4027        });
4028
4029        let mut rows = Vec::new();
4030        let mut edges = Vec::new();
4031        let mut vectors = Vec::new();
4032
4033        for entry in change_entries {
4034            match entry {
4035                ChangeLogEntry::RowInsert { table, row_id, lsn } => {
4036                    if let Some((natural_key, values)) = self.row_change_values(&table, row_id) {
4037                        rows.push(RowChange {
4038                            table,
4039                            natural_key,
4040                            values,
4041                            deleted: false,
4042                            lsn,
4043                        });
4044                    }
4045                }
4046                ChangeLogEntry::RowDelete {
4047                    table,
4048                    natural_key,
4049                    lsn,
4050                    ..
4051                } => {
4052                    let mut values = HashMap::new();
4053                    values.insert("__deleted".to_string(), Value::Bool(true));
4054                    rows.push(RowChange {
4055                        table,
4056                        natural_key,
4057                        values,
4058                        deleted: true,
4059                        lsn,
4060                    });
4061                }
4062                ChangeLogEntry::EdgeInsert {
4063                    source,
4064                    target,
4065                    edge_type,
4066                    lsn,
4067                } => {
4068                    let properties = self
4069                        .edge_properties(source, target, &edge_type, lsn)
4070                        .unwrap_or_default();
4071                    edges.push(EdgeChange {
4072                        source,
4073                        target,
4074                        edge_type,
4075                        properties,
4076                        lsn,
4077                    });
4078                }
4079                ChangeLogEntry::EdgeDelete {
4080                    source,
4081                    target,
4082                    edge_type,
4083                    lsn,
4084                } => {
4085                    let mut properties = HashMap::new();
4086                    properties.insert("__deleted".to_string(), Value::Bool(true));
4087                    edges.push(EdgeChange {
4088                        source,
4089                        target,
4090                        edge_type,
4091                        properties,
4092                        lsn,
4093                    });
4094                }
4095                ChangeLogEntry::VectorInsert { index, row_id, lsn } => {
4096                    if let Some(vector) = self.vector_for_row_lsn(&index, row_id, lsn) {
4097                        vectors.push(VectorChange {
4098                            index,
4099                            row_id,
4100                            vector,
4101                            lsn,
4102                        });
4103                    }
4104                }
4105                ChangeLogEntry::VectorDelete { index, row_id, lsn } => vectors.push(VectorChange {
4106                    index,
4107                    row_id,
4108                    vector: Vec::new(),
4109                    lsn,
4110                }),
4111            }
4112        }
4113
4114        // Deduplicate upserts: when a RowDelete is followed by a RowInsert for the same
4115        // (table, natural_key), the delete is part of an upsert — remove it.
4116        // Only remove a delete if there is a non-delete entry with a HIGHER LSN
4117        // (i.e., the insert came after the delete, indicating an upsert).
4118        // If the insert has a lower LSN, the delete is genuine and must be kept.
4119        let insert_max_lsn: HashMap<(String, String, String), Lsn> = {
4120            let mut map: HashMap<(String, String, String), Lsn> = HashMap::new();
4121            for r in rows.iter().filter(|r| !r.deleted) {
4122                let key = (
4123                    r.table.clone(),
4124                    r.natural_key.column.clone(),
4125                    format!("{:?}", r.natural_key.value),
4126                );
4127                let entry = map.entry(key).or_insert(Lsn(0));
4128                if r.lsn > *entry {
4129                    *entry = r.lsn;
4130                }
4131            }
4132            map
4133        };
4134        rows.retain(|r| {
4135            if r.deleted {
4136                let key = (
4137                    r.table.clone(),
4138                    r.natural_key.column.clone(),
4139                    format!("{:?}", r.natural_key.value),
4140                );
4141                // Keep the delete unless there is a subsequent insert (higher or equal LSN).
4142                // Equal LSN means the delete+insert are part of the same upsert transaction.
4143                match insert_max_lsn.get(&key) {
4144                    Some(&insert_lsn) => insert_lsn < r.lsn,
4145                    None => true,
4146                }
4147            } else {
4148                true
4149            }
4150        });
4151
4152        let vector_reinserts: HashSet<(VectorIndexRef, Lsn)> = vectors
4153            .iter()
4154            .filter(|v| !v.vector.is_empty())
4155            .map(|v| (v.index.clone(), v.lsn))
4156            .collect();
4157        vectors.retain(|v| {
4158            !v.vector.is_empty() || !vector_reinserts.contains(&(v.index.clone(), v.lsn))
4159        });
4160
4161        ChangeSet {
4162            rows,
4163            edges,
4164            vectors,
4165            ddl,
4166        }
4167    }
4168
4169    /// Returns the current LSN of this database.
4170    pub fn current_lsn(&self) -> Lsn {
4171        self.tx_mgr.current_lsn()
4172    }
4173
4174    /// Returns the highest-committed TxId on this database.
4175    pub fn committed_watermark(&self) -> TxId {
4176        self.tx_mgr.current_tx_max()
4177    }
4178
4179    /// Returns the next TxId the allocator will issue on this database.
4180    pub fn next_tx(&self) -> TxId {
4181        self.tx_mgr.peek_next_tx()
4182    }
4183
4184    /// Subscribe to commit events. Returns a receiver that yields a `CommitEvent`
4185    /// after each commit.
4186    pub fn subscribe(&self) -> Receiver<CommitEvent> {
4187        self.subscribe_with_capacity(DEFAULT_SUBSCRIPTION_CAPACITY)
4188    }
4189
4190    /// Subscribe with a custom channel capacity.
4191    pub fn subscribe_with_capacity(&self, capacity: usize) -> Receiver<CommitEvent> {
4192        let (tx, rx) = mpsc::sync_channel(capacity.max(1));
4193        self.subscriptions.lock().subscribers.push(tx);
4194        rx
4195    }
4196
4197    /// Returns health metrics for the subscription system.
4198    pub fn subscription_health(&self) -> SubscriptionMetrics {
4199        let subscriptions = self.subscriptions.lock();
4200        SubscriptionMetrics {
4201            active_channels: subscriptions.subscribers.len(),
4202            events_sent: subscriptions.events_sent,
4203            events_dropped: subscriptions.events_dropped,
4204        }
4205    }
4206
4207    /// Applies a ChangeSet to this database with the given conflict policies.
4208    pub fn apply_changes(
4209        &self,
4210        mut changes: ChangeSet,
4211        policies: &ConflictPolicies,
4212    ) -> Result<ApplyResult> {
4213        // Per I14: the whole batch takes the index-maintenance lock once.
4214        // Per-row commits reuse the same guard via the per-row apply that
4215        // runs inside the tx manager's commit_mutex, so no second write
4216        // acquisition happens for the scope of this call.
4217        self.relational_store.bump_index_write_lock_count();
4218        self.plugin.on_sync_pull(&mut changes)?;
4219        self.check_disk_budget("sync_pull")?;
4220        self.preflight_sync_apply_memory(&changes, policies)?;
4221
4222        // Pre-scan for TxId overflow so the allocator is untouched on rejection.
4223        for row in &changes.rows {
4224            for v in row.values.values() {
4225                if let Value::TxId(incoming) = v
4226                    && incoming.0 == u64::MAX
4227                {
4228                    return Err(Error::TxIdOverflow {
4229                        table: row.table.clone(),
4230                        incoming: u64::MAX,
4231                    });
4232                }
4233            }
4234        }
4235
4236        let mut tx = self.begin();
4237        let batch_row_commits = changes.rows.len() > 128;
4238        let mut result = ApplyResult {
4239            applied_rows: 0,
4240            skipped_rows: 0,
4241            conflicts: Vec::new(),
4242            new_lsn: self.current_lsn(),
4243        };
4244        let vector_row_ids = changes.vectors.iter().map(|v| v.row_id).collect::<Vec<_>>();
4245        let mut vector_row_map: HashMap<RowId, RowId> = HashMap::new();
4246        let mut vector_row_idx = 0usize;
4247        let mut failed_row_ids: HashSet<RowId> = HashSet::new();
4248        let mut table_meta_cache: HashMap<String, Option<TableMeta>> = HashMap::new();
4249        let mut visible_rows_cache: HashMap<String, Vec<VersionedRow>> = HashMap::new();
4250
4251        for ddl in changes.ddl.clone() {
4252            match ddl {
4253                DdlChange::CreateTable {
4254                    name,
4255                    columns,
4256                    constraints,
4257                } => {
4258                    if self.table_meta(&name).is_some() {
4259                        if let Some(local_meta) = self.table_meta(&name) {
4260                            let local_cols: Vec<(String, String)> = local_meta
4261                                .columns
4262                                .iter()
4263                                .map(|c| {
4264                                    (
4265                                        c.name.clone(),
4266                                        normalize_schema_type(&sql_type_for_meta_column(
4267                                            c,
4268                                            &local_meta.propagation_rules,
4269                                        )),
4270                                    )
4271                                })
4272                                .collect();
4273                            let remote_cols: Vec<(String, String)> = columns
4274                                .iter()
4275                                .map(|(col_name, col_type)| {
4276                                    (col_name.clone(), normalize_schema_type(col_type))
4277                                })
4278                                .collect();
4279                            let mut local_sorted = local_cols.clone();
4280                            local_sorted.sort();
4281                            let mut remote_sorted = remote_cols.clone();
4282                            remote_sorted.sort();
4283                            if local_sorted != remote_sorted {
4284                                result.conflicts.push(Conflict {
4285                                    natural_key: NaturalKey {
4286                                        column: "table".to_string(),
4287                                        value: Value::Text(name.clone()),
4288                                    },
4289                                    resolution: ConflictPolicy::ServerWins,
4290                                    reason: Some(format!(
4291                                        "schema mismatch: local columns {:?} differ from remote {:?}",
4292                                        local_cols, remote_cols
4293                                    )),
4294                                });
4295                            }
4296                        }
4297                        continue;
4298                    }
4299                    let mut sql = format!(
4300                        "CREATE TABLE {} ({})",
4301                        name,
4302                        columns
4303                            .iter()
4304                            .map(|(col, ty)| format!("{col} {ty}"))
4305                            .collect::<Vec<_>>()
4306                            .join(", ")
4307                    );
4308                    if !constraints.is_empty() {
4309                        sql.push(' ');
4310                        sql.push_str(&constraints.join(" "));
4311                    }
4312                    self.execute_in_tx(tx, &sql, &HashMap::new())?;
4313                    self.clear_statement_cache();
4314                    table_meta_cache.remove(&name);
4315                    visible_rows_cache.remove(&name);
4316                }
4317                DdlChange::DropTable { name } => {
4318                    if self.table_meta(&name).is_some() {
4319                        if let Some(block) =
4320                            crate::executor::rank_policy_drop_table_blocker(self, &name)
4321                        {
4322                            return Err(block);
4323                        }
4324                        self.drop_table_aux_state(&name);
4325                        self.remove_rank_formulas_for_table(&name);
4326                        self.relational_store().drop_table(&name);
4327                        self.remove_persisted_table(&name)?;
4328                        self.clear_statement_cache();
4329                    }
4330                    table_meta_cache.remove(&name);
4331                    visible_rows_cache.remove(&name);
4332                }
4333                DdlChange::AlterTable {
4334                    name,
4335                    columns,
4336                    constraints,
4337                } => {
4338                    if self.table_meta(&name).is_none() {
4339                        continue;
4340                    }
4341                    let existing = self.table_meta(&name).unwrap_or_default();
4342                    let existing_cols: HashSet<String> =
4343                        existing.columns.iter().map(|c| c.name.clone()).collect();
4344                    for (col, ty) in columns {
4345                        if existing_cols.contains(&col) {
4346                            continue;
4347                        }
4348                        let sql = format!("ALTER TABLE {} ADD COLUMN {} {}", name, col, ty);
4349                        self.execute_in_tx(tx, &sql, &HashMap::new())?;
4350                    }
4351                    let _ = constraints;
4352                    self.clear_statement_cache();
4353                    table_meta_cache.remove(&name);
4354                    visible_rows_cache.remove(&name);
4355                }
4356                DdlChange::CreateIndex {
4357                    table,
4358                    name,
4359                    columns,
4360                } => {
4361                    // Apply at the receiver: write IndexDecl into
4362                    // TableMeta.indexes, register storage, rebuild over
4363                    // locally-resident rows. Emit a matching DDL log entry.
4364                    // Silently skipping on missing table would hide sync
4365                    // divergence; surface it as TableNotFound so the caller
4366                    // can see which index couldn't land.
4367                    if self.table_meta(&table).is_none() {
4368                        return Err(Error::TableNotFound(table.clone()));
4369                    }
4370                    let already = self
4371                        .table_meta(&table)
4372                        .map(|m| m.indexes.iter().any(|i| i.name == name))
4373                        .unwrap_or(false);
4374                    if !already {
4375                        {
4376                            let store = self.relational_store();
4377                            let mut metas = store.table_meta.write();
4378                            if let Some(m) = metas.get_mut(&table) {
4379                                m.indexes.push(contextdb_core::IndexDecl {
4380                                    name: name.clone(),
4381                                    columns: columns.clone(),
4382                                    kind: contextdb_core::IndexKind::UserDeclared,
4383                                });
4384                            }
4385                        }
4386                        self.relational_store().create_index_storage(
4387                            &table,
4388                            &name,
4389                            columns.clone(),
4390                        );
4391                        self.relational_store().rebuild_index(&table, &name);
4392                        if let Some(table_meta) = self.table_meta(&table) {
4393                            self.persist_table_meta(&table, &table_meta)?;
4394                        }
4395                        self.allocate_ddl_lsn(|lsn| {
4396                            self.log_create_index_ddl(&table, &name, &columns, lsn)
4397                        })?;
4398                        self.clear_statement_cache();
4399                    }
4400                    table_meta_cache.remove(&table);
4401                }
4402                DdlChange::DropIndex { table, name } => {
4403                    if self.table_meta(&table).is_some() {
4404                        let exists = self
4405                            .table_meta(&table)
4406                            .map(|m| m.indexes.iter().any(|i| i.name == name))
4407                            .unwrap_or(false);
4408                        if exists {
4409                            if let Some(block) =
4410                                crate::executor::rank_policy_drop_index_blocker(self, &table, &name)
4411                            {
4412                                return Err(block);
4413                            }
4414                            {
4415                                let store = self.relational_store();
4416                                let mut metas = store.table_meta.write();
4417                                if let Some(m) = metas.get_mut(&table) {
4418                                    m.indexes.retain(|i| i.name != name);
4419                                }
4420                            }
4421                            self.relational_store().drop_index_storage(&table, &name);
4422                            if let Some(table_meta) = self.table_meta(&table) {
4423                                self.persist_table_meta(&table, &table_meta)?;
4424                            }
4425                            self.allocate_ddl_lsn(|lsn| {
4426                                self.log_drop_index_ddl(&table, &name, lsn)
4427                            })?;
4428                            self.clear_statement_cache();
4429                        }
4430                    }
4431                    table_meta_cache.remove(&table);
4432                }
4433            }
4434        }
4435
4436        self.preflight_sync_apply_memory(&changes, policies)?;
4437
4438        for row in changes.rows {
4439            if row.values.is_empty() {
4440                result.skipped_rows += 1;
4441                if !batch_row_commits {
4442                    self.commit_with_source(tx, CommitSource::SyncPull)?;
4443                    tx = self.begin();
4444                }
4445                continue;
4446            }
4447
4448            let policy = policies
4449                .per_table
4450                .get(&row.table)
4451                .copied()
4452                .unwrap_or(policies.default);
4453
4454            let existing = cached_point_lookup(
4455                self,
4456                &mut visible_rows_cache,
4457                &row.table,
4458                &row.natural_key.column,
4459                &row.natural_key.value,
4460            )?;
4461            let is_delete = row.deleted;
4462            let row_has_vector = cached_table_meta(self, &mut table_meta_cache, &row.table)
4463                .is_some_and(|meta| {
4464                    meta.columns
4465                        .iter()
4466                        .any(|col| matches!(col.column_type, ColumnType::Vector(_)))
4467                });
4468
4469            if is_delete {
4470                if let Some(local) = existing {
4471                    if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4472                        consume_vector_row_group(
4473                            &vector_row_ids,
4474                            &mut vector_row_idx,
4475                            local.row_id,
4476                            &mut vector_row_map,
4477                        );
4478                    }
4479                    if let Err(err) = self.delete_row(tx, &row.table, local.row_id) {
4480                        result.conflicts.push(Conflict {
4481                            natural_key: row.natural_key.clone(),
4482                            resolution: policy,
4483                            reason: Some(format!("delete failed: {err}")),
4484                        });
4485                        result.skipped_rows += 1;
4486                    } else {
4487                        remove_cached_row(&mut visible_rows_cache, &row.table, local.row_id);
4488                        result.applied_rows += 1;
4489                    }
4490                } else {
4491                    result.skipped_rows += 1;
4492                }
4493                if !batch_row_commits {
4494                    self.commit_with_source(tx, CommitSource::SyncPull)?;
4495                    tx = self.begin();
4496                }
4497                continue;
4498            }
4499
4500            let mut values = row.values.clone();
4501            values.remove("__deleted");
4502
4503            match (existing, policy) {
4504                (None, _) => {
4505                    if let Some(meta) = cached_table_meta(self, &mut table_meta_cache, &row.table) {
4506                        let mut constraint_error: Option<String> = None;
4507
4508                        for col_def in &meta.columns {
4509                            if !col_def.nullable
4510                                && !col_def.primary_key
4511                                && col_def.default.is_none()
4512                            {
4513                                match values.get(&col_def.name) {
4514                                    None | Some(Value::Null) => {
4515                                        constraint_error = Some(format!(
4516                                            "NOT NULL constraint violated: {}.{}",
4517                                            row.table, col_def.name
4518                                        ));
4519                                        break;
4520                                    }
4521                                    _ => {}
4522                                }
4523                            }
4524                        }
4525
4526                        let has_unique = meta.columns.iter().any(|c| c.unique && !c.primary_key);
4527                        if constraint_error.is_none() && has_unique {
4528                            for col_def in &meta.columns {
4529                                if col_def.unique
4530                                    && !col_def.primary_key
4531                                    && let Some(new_val) = values.get(&col_def.name)
4532                                    && *new_val != Value::Null
4533                                    && cached_visible_rows(
4534                                        self,
4535                                        &mut visible_rows_cache,
4536                                        &row.table,
4537                                    )?
4538                                    .iter()
4539                                    .any(|r| r.values.get(&col_def.name) == Some(new_val))
4540                                {
4541                                    constraint_error = Some(format!(
4542                                        "UNIQUE constraint violated: {}.{}",
4543                                        row.table, col_def.name
4544                                    ));
4545                                    break;
4546                                }
4547                            }
4548                        }
4549
4550                        if let Some(err_msg) = constraint_error {
4551                            result.skipped_rows += 1;
4552                            if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4553                                consume_failed_vector_row_group(
4554                                    &vector_row_ids,
4555                                    &mut vector_row_idx,
4556                                    &mut failed_row_ids,
4557                                );
4558                            }
4559                            result.conflicts.push(Conflict {
4560                                natural_key: row.natural_key.clone(),
4561                                resolution: policy,
4562                                reason: Some(err_msg),
4563                            });
4564                            if !batch_row_commits {
4565                                self.commit_with_source(tx, CommitSource::SyncPull)?;
4566                                tx = self.begin();
4567                            }
4568                            continue;
4569                        }
4570                    }
4571
4572                    // Sync-apply overflow guard + allocator/watermark advance for Value::TxId cells.
4573                    let mut overflow: Option<Error> = None;
4574                    for v in values.values() {
4575                        if let Value::TxId(incoming) = v
4576                            && let Err(err) = self.tx_mgr.advance_for_sync(&row.table, *incoming)
4577                        {
4578                            overflow = Some(err);
4579                            break;
4580                        }
4581                    }
4582                    if let Some(err) = overflow {
4583                        return Err(err);
4584                    }
4585
4586                    match self.insert_row_for_sync(tx, &row.table, values.clone()) {
4587                        Ok(new_row_id) => {
4588                            record_cached_insert(
4589                                &mut visible_rows_cache,
4590                                &row.table,
4591                                VersionedRow {
4592                                    row_id: new_row_id,
4593                                    values: values.clone(),
4594                                    created_tx: tx,
4595                                    deleted_tx: None,
4596                                    lsn: row.lsn,
4597                                    created_at: None,
4598                                },
4599                            );
4600                            result.applied_rows += 1;
4601                            if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4602                                consume_vector_row_group(
4603                                    &vector_row_ids,
4604                                    &mut vector_row_idx,
4605                                    new_row_id,
4606                                    &mut vector_row_map,
4607                                );
4608                            }
4609                        }
4610                        Err(err) => {
4611                            if is_fatal_sync_apply_error(&err) {
4612                                return Err(err);
4613                            }
4614                            result.skipped_rows += 1;
4615                            if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4616                                consume_failed_vector_row_group(
4617                                    &vector_row_ids,
4618                                    &mut vector_row_idx,
4619                                    &mut failed_row_ids,
4620                                );
4621                            }
4622                            result.conflicts.push(Conflict {
4623                                natural_key: row.natural_key.clone(),
4624                                resolution: policy,
4625                                reason: Some(format!("{err}")),
4626                            });
4627                        }
4628                    }
4629                }
4630                (Some(local), ConflictPolicy::InsertIfNotExists) => {
4631                    if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4632                        consume_vector_row_group(
4633                            &vector_row_ids,
4634                            &mut vector_row_idx,
4635                            local.row_id,
4636                            &mut vector_row_map,
4637                        );
4638                    }
4639                    result.skipped_rows += 1;
4640                }
4641                (Some(_), ConflictPolicy::ServerWins) => {
4642                    result.skipped_rows += 1;
4643                    if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4644                        consume_failed_vector_row_group(
4645                            &vector_row_ids,
4646                            &mut vector_row_idx,
4647                            &mut failed_row_ids,
4648                        );
4649                    }
4650                    result.conflicts.push(Conflict {
4651                        natural_key: row.natural_key.clone(),
4652                        resolution: ConflictPolicy::ServerWins,
4653                        reason: Some("server_wins".to_string()),
4654                    });
4655                }
4656                (Some(local), ConflictPolicy::LatestWins) => {
4657                    // Deterministic tie-break when LSNs match: if both rows carry a
4658                    // `Value::TxId` cell under the same column name, the row with the
4659                    // strictly greater raw u64 wins. Otherwise fall back to the strict
4660                    // "incoming must exceed local" rule.
4661                    let incoming_wins = if row.lsn == local.lsn {
4662                        let mut winner = false;
4663                        for (col, incoming_val) in values.iter() {
4664                            if let (Value::TxId(incoming_tx), Some(Value::TxId(local_tx))) =
4665                                (incoming_val, local.values.get(col))
4666                            {
4667                                if incoming_tx.0 > local_tx.0 {
4668                                    winner = true;
4669                                    break;
4670                                } else if incoming_tx.0 < local_tx.0 {
4671                                    winner = false;
4672                                    break;
4673                                }
4674                            }
4675                        }
4676                        winner
4677                    } else {
4678                        row.lsn > local.lsn
4679                    };
4680
4681                    if !incoming_wins {
4682                        result.skipped_rows += 1;
4683                        if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4684                            consume_failed_vector_row_group(
4685                                &vector_row_ids,
4686                                &mut vector_row_idx,
4687                                &mut failed_row_ids,
4688                            );
4689                        }
4690                        result.conflicts.push(Conflict {
4691                            natural_key: row.natural_key.clone(),
4692                            resolution: ConflictPolicy::LatestWins,
4693                            reason: Some("local_lsn_newer_or_equal".to_string()),
4694                        });
4695                    } else {
4696                        // State machine conflict detection
4697                        if let Some(meta) = self.table_meta(&row.table)
4698                            && let Some(sm) = &meta.state_machine
4699                        {
4700                            let sm_col = sm.column.clone();
4701                            let transitions = sm.transitions.clone();
4702                            let incoming_state = values.get(&sm_col).and_then(|v| match v {
4703                                Value::Text(s) => Some(s.clone()),
4704                                _ => None,
4705                            });
4706                            let local_state = local.values.get(&sm_col).and_then(|v| match v {
4707                                Value::Text(s) => Some(s.clone()),
4708                                _ => None,
4709                            });
4710
4711                            if let (Some(incoming), Some(current)) = (incoming_state, local_state) {
4712                                // Check if the transition from current to incoming is valid
4713                                let valid = transitions
4714                                    .get(&current)
4715                                    .is_some_and(|targets| targets.contains(&incoming));
4716                                if !valid && incoming != current {
4717                                    result.skipped_rows += 1;
4718                                    if row_has_vector
4719                                        && vector_row_ids.get(vector_row_idx).is_some()
4720                                    {
4721                                        consume_failed_vector_row_group(
4722                                            &vector_row_ids,
4723                                            &mut vector_row_idx,
4724                                            &mut failed_row_ids,
4725                                        );
4726                                    }
4727                                    result.conflicts.push(Conflict {
4728                                        natural_key: row.natural_key.clone(),
4729                                        resolution: ConflictPolicy::LatestWins,
4730                                        reason: Some(format!(
4731                                            "state_machine: invalid transition {} -> {} (current: {})",
4732                                            current, incoming, current
4733                                        )),
4734                                    });
4735                                    if !batch_row_commits {
4736                                        self.commit_with_source(tx, CommitSource::SyncPull)?;
4737                                        tx = self.begin();
4738                                    }
4739                                    continue;
4740                                }
4741                            }
4742                        }
4743
4744                        // Sync-apply overflow guard + allocator/watermark advance.
4745                        let mut overflow: Option<Error> = None;
4746                        for v in values.values() {
4747                            if let Value::TxId(incoming) = v
4748                                && let Err(err) =
4749                                    self.tx_mgr.advance_for_sync(&row.table, *incoming)
4750                            {
4751                                overflow = Some(err);
4752                                break;
4753                            }
4754                        }
4755                        if let Some(err) = overflow {
4756                            return Err(err);
4757                        }
4758
4759                        match self.upsert_row_for_sync(
4760                            tx,
4761                            &row.table,
4762                            &row.natural_key.column,
4763                            values.clone(),
4764                        ) {
4765                            Ok(_) => {
4766                                visible_rows_cache.remove(&row.table);
4767                                result.applied_rows += 1;
4768                                if row_has_vector
4769                                    && vector_row_ids.get(vector_row_idx).is_some()
4770                                    && let Ok(Some(found)) = self.point_lookup_in_tx(
4771                                        tx,
4772                                        &row.table,
4773                                        &row.natural_key.column,
4774                                        &row.natural_key.value,
4775                                        self.snapshot(),
4776                                    )
4777                                {
4778                                    consume_vector_row_group(
4779                                        &vector_row_ids,
4780                                        &mut vector_row_idx,
4781                                        found.row_id,
4782                                        &mut vector_row_map,
4783                                    );
4784                                }
4785                            }
4786                            Err(err) => {
4787                                if is_fatal_sync_apply_error(&err) {
4788                                    return Err(err);
4789                                }
4790                                result.skipped_rows += 1;
4791                                if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4792                                    consume_failed_vector_row_group(
4793                                        &vector_row_ids,
4794                                        &mut vector_row_idx,
4795                                        &mut failed_row_ids,
4796                                    );
4797                                }
4798                                result.conflicts.push(Conflict {
4799                                    natural_key: row.natural_key.clone(),
4800                                    resolution: ConflictPolicy::LatestWins,
4801                                    reason: Some(format!("state_machine_or_constraint: {err}")),
4802                                });
4803                            }
4804                        }
4805                    }
4806                }
4807                (Some(_), ConflictPolicy::EdgeWins) => {
4808                    result.conflicts.push(Conflict {
4809                        natural_key: row.natural_key.clone(),
4810                        resolution: ConflictPolicy::EdgeWins,
4811                        reason: Some("edge_wins".to_string()),
4812                    });
4813                    let mut overflow: Option<Error> = None;
4814                    for v in values.values() {
4815                        if let Value::TxId(incoming) = v
4816                            && let Err(err) = self.tx_mgr.advance_for_sync(&row.table, *incoming)
4817                        {
4818                            overflow = Some(err);
4819                            break;
4820                        }
4821                    }
4822                    if let Some(err) = overflow {
4823                        return Err(err);
4824                    }
4825
4826                    match self.upsert_row_for_sync(
4827                        tx,
4828                        &row.table,
4829                        &row.natural_key.column,
4830                        values.clone(),
4831                    ) {
4832                        Ok(_) => {
4833                            visible_rows_cache.remove(&row.table);
4834                            result.applied_rows += 1;
4835                            if row_has_vector
4836                                && vector_row_ids.get(vector_row_idx).is_some()
4837                                && let Ok(Some(found)) = self.point_lookup_in_tx(
4838                                    tx,
4839                                    &row.table,
4840                                    &row.natural_key.column,
4841                                    &row.natural_key.value,
4842                                    self.snapshot(),
4843                                )
4844                            {
4845                                consume_vector_row_group(
4846                                    &vector_row_ids,
4847                                    &mut vector_row_idx,
4848                                    found.row_id,
4849                                    &mut vector_row_map,
4850                                );
4851                            }
4852                        }
4853                        Err(err) => {
4854                            if is_fatal_sync_apply_error(&err) {
4855                                return Err(err);
4856                            }
4857                            result.skipped_rows += 1;
4858                            if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4859                                consume_failed_vector_row_group(
4860                                    &vector_row_ids,
4861                                    &mut vector_row_idx,
4862                                    &mut failed_row_ids,
4863                                );
4864                            }
4865                            if let Some(last) = result.conflicts.last_mut() {
4866                                last.reason = Some(format!("state_machine_or_constraint: {err}"));
4867                            }
4868                        }
4869                    }
4870                }
4871            }
4872
4873            if !batch_row_commits {
4874                self.commit_with_source(tx, CommitSource::SyncPull)?;
4875                tx = self.begin();
4876            }
4877        }
4878
4879        if batch_row_commits {
4880            self.commit_with_source(tx, CommitSource::SyncPull)?;
4881            tx = self.begin();
4882        }
4883
4884        for edge in changes.edges {
4885            let is_delete = matches!(edge.properties.get("__deleted"), Some(Value::Bool(true)));
4886            if is_delete {
4887                let _ = self.delete_edge(tx, edge.source, edge.target, &edge.edge_type);
4888            } else {
4889                let _ = self.insert_edge(
4890                    tx,
4891                    edge.source,
4892                    edge.target,
4893                    edge.edge_type,
4894                    edge.properties,
4895                );
4896            }
4897        }
4898
4899        for vector in changes.vectors {
4900            if failed_row_ids.contains(&vector.row_id) {
4901                continue; // skip vectors for rows that failed to insert
4902            }
4903            let local_row_id = vector_row_map
4904                .get(&vector.row_id)
4905                .copied()
4906                .unwrap_or(vector.row_id);
4907            if vector.vector.is_empty() {
4908                self.vector
4909                    .delete_vector(tx, vector.index.clone(), local_row_id)?;
4910            } else {
4911                if self.has_live_vector(local_row_id, self.snapshot()) {
4912                    let _ = self.delete_vector(tx, vector.index.clone(), local_row_id);
4913                }
4914                self.insert_vector_strict(tx, vector.index.clone(), local_row_id, vector.vector)?;
4915            }
4916        }
4917
4918        self.commit_with_source(tx, CommitSource::SyncPull)?;
4919        result.new_lsn = self.current_lsn();
4920        Ok(result)
4921    }
4922
4923    fn row_change_values(
4924        &self,
4925        table: &str,
4926        row_id: RowId,
4927    ) -> Option<(NaturalKey, HashMap<String, Value>)> {
4928        let tables = self.relational_store.tables.read();
4929        let meta = self.relational_store.table_meta.read();
4930        let rows = tables.get(table)?;
4931        let row = rows.iter().find(|r| r.row_id == row_id)?;
4932        let key_col = meta
4933            .get(table)
4934            .and_then(|m| m.natural_key_column.clone())
4935            .or_else(|| {
4936                meta.get(table).and_then(|m| {
4937                    m.columns
4938                        .iter()
4939                        .find(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
4940                        .map(|_| "id".to_string())
4941                })
4942            })
4943            .or_else(|| {
4944                meta.get(table).and_then(|m| {
4945                    m.columns
4946                        .iter()
4947                        .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
4948                        .map(|c| c.name.clone())
4949                })
4950            })?;
4951
4952        let key_val = row.values.get(&key_col)?.clone();
4953        let values = row
4954            .values
4955            .iter()
4956            .map(|(k, v)| (k.clone(), v.clone()))
4957            .collect::<HashMap<_, _>>();
4958        Some((
4959            NaturalKey {
4960                column: key_col,
4961                value: key_val,
4962            },
4963            values,
4964        ))
4965    }
4966
4967    fn edge_properties(
4968        &self,
4969        source: NodeId,
4970        target: NodeId,
4971        edge_type: &str,
4972        lsn: Lsn,
4973    ) -> Option<HashMap<String, Value>> {
4974        self.graph_store
4975            .forward_adj
4976            .read()
4977            .get(&source)
4978            .and_then(|entries| {
4979                entries
4980                    .iter()
4981                    .find(|e| e.target == target && e.edge_type == edge_type && e.lsn == lsn)
4982                    .map(|e| e.properties.clone())
4983            })
4984    }
4985
4986    fn vector_for_row_lsn(
4987        &self,
4988        index: &VectorIndexRef,
4989        row_id: RowId,
4990        lsn: Lsn,
4991    ) -> Option<Vec<f32>> {
4992        self.vector_store.vector_for_row_lsn(index, row_id, lsn)
4993    }
4994}
4995
4996fn strip_internal_row_id(mut qr: QueryResult) -> QueryResult {
4997    if let Some(pos) = qr.columns.iter().position(|c| c == "row_id") {
4998        qr.columns.remove(pos);
4999        for row in &mut qr.rows {
5000            if pos < row.len() {
5001                row.remove(pos);
5002            }
5003        }
5004    }
5005    qr
5006}
5007
5008fn cached_table_meta(
5009    db: &Database,
5010    cache: &mut HashMap<String, Option<TableMeta>>,
5011    table: &str,
5012) -> Option<TableMeta> {
5013    cache
5014        .entry(table.to_string())
5015        .or_insert_with(|| db.table_meta(table))
5016        .clone()
5017}
5018
5019pub(crate) fn rank_index_name(table: &str, column: &str) -> String {
5020    format!("{table}.{column}")
5021}
5022
5023fn rank_value_to_number(
5024    value: &Value,
5025    column: &str,
5026) -> std::result::Result<Option<f32>, FormulaEvalError> {
5027    match value {
5028        Value::Null => Ok(None),
5029        Value::Float64(value) => Ok(Some(*value as f32)),
5030        Value::Int64(value) => Ok(Some(*value as f32)),
5031        Value::Bool(value) => Ok(Some(if *value { 1.0 } else { 0.0 })),
5032        Value::Text(_) => Err(FormulaEvalError::UnsupportedType {
5033            column: column.to_string(),
5034            actual: "TEXT",
5035        }),
5036        Value::Json(_) => Err(FormulaEvalError::UnsupportedType {
5037            column: column.to_string(),
5038            actual: "JSON",
5039        }),
5040        Value::Uuid(_) => Err(FormulaEvalError::UnsupportedType {
5041            column: column.to_string(),
5042            actual: "UUID",
5043        }),
5044        Value::Vector(_) => Err(FormulaEvalError::UnsupportedType {
5045            column: column.to_string(),
5046            actual: "VECTOR",
5047        }),
5048        Value::Timestamp(_) => Err(FormulaEvalError::UnsupportedType {
5049            column: column.to_string(),
5050            actual: "TIMESTAMP",
5051        }),
5052        Value::TxId(_) => Err(FormulaEvalError::UnsupportedType {
5053            column: column.to_string(),
5054            actual: "TXID",
5055        }),
5056    }
5057}
5058
5059fn merged_rank_values(
5060    anchor: &VersionedRow,
5061    joined: Option<&VersionedRow>,
5062) -> HashMap<String, Value> {
5063    let mut values = anchor.values.clone();
5064    if let Some(joined) = joined {
5065        for (key, value) in &joined.values {
5066            values.entry(key.clone()).or_insert_with(|| value.clone());
5067        }
5068    }
5069    values
5070}
5071
5072fn values_equal_for_rank_join(left: &Value, right: &Value) -> bool {
5073    if matches!((left, right), (Value::Null, _) | (_, Value::Null)) {
5074        return false;
5075    }
5076    left == right
5077}
5078
5079fn compare_ranked_results(left: &SearchResult, right: &SearchResult) -> std::cmp::Ordering {
5080    rank_float_desc(left.rank, right.rank)
5081        .then_with(|| rank_float_desc(left.vector_score, right.vector_score))
5082        .then_with(|| right.row_id.cmp(&left.row_id))
5083}
5084
5085fn rank_float_desc(left: f32, right: f32) -> std::cmp::Ordering {
5086    match (left.is_nan(), right.is_nan()) {
5087        (true, true) => std::cmp::Ordering::Equal,
5088        (true, false) => std::cmp::Ordering::Greater,
5089        (false, true) => std::cmp::Ordering::Less,
5090        (false, false) => right.total_cmp(&left),
5091    }
5092}
5093
5094fn cached_visible_rows<'a>(
5095    db: &Database,
5096    cache: &'a mut HashMap<String, Vec<VersionedRow>>,
5097    table: &str,
5098) -> Result<&'a mut Vec<VersionedRow>> {
5099    if !cache.contains_key(table) {
5100        let rows = db.scan(table, db.snapshot())?;
5101        cache.insert(table.to_string(), rows);
5102    }
5103    Ok(cache.get_mut(table).expect("cached visible rows"))
5104}
5105
5106fn cached_point_lookup(
5107    db: &Database,
5108    cache: &mut HashMap<String, Vec<VersionedRow>>,
5109    table: &str,
5110    col: &str,
5111    value: &Value,
5112) -> Result<Option<VersionedRow>> {
5113    let rows = cached_visible_rows(db, cache, table)?;
5114    Ok(rows
5115        .iter()
5116        .find(|r| r.values.get(col) == Some(value))
5117        .cloned())
5118}
5119
5120fn record_cached_insert(
5121    cache: &mut HashMap<String, Vec<VersionedRow>>,
5122    table: &str,
5123    row: VersionedRow,
5124) {
5125    if let Some(rows) = cache.get_mut(table) {
5126        rows.push(row);
5127    }
5128}
5129
5130fn consume_vector_row_group(
5131    remote_row_ids: &[RowId],
5132    cursor: &mut usize,
5133    local_row_id: RowId,
5134    map: &mut HashMap<RowId, RowId>,
5135) {
5136    let Some(remote_row_id) = remote_row_ids.get(*cursor).copied() else {
5137        return;
5138    };
5139    while remote_row_ids.get(*cursor).copied() == Some(remote_row_id) {
5140        map.insert(remote_row_id, local_row_id);
5141        *cursor += 1;
5142    }
5143}
5144
5145fn consume_failed_vector_row_group(
5146    remote_row_ids: &[RowId],
5147    cursor: &mut usize,
5148    failed: &mut HashSet<RowId>,
5149) {
5150    let Some(remote_row_id) = remote_row_ids.get(*cursor).copied() else {
5151        return;
5152    };
5153    while remote_row_ids.get(*cursor).copied() == Some(remote_row_id) {
5154        failed.insert(remote_row_id);
5155        *cursor += 1;
5156    }
5157}
5158
5159fn vector_index_from_plan(plan: &PhysicalPlan) -> Option<VectorIndexRef> {
5160    match plan {
5161        PhysicalPlan::VectorSearch { table, column, .. }
5162        | PhysicalPlan::HnswSearch { table, column, .. } => {
5163            Some(VectorIndexRef::new(table.clone(), column.clone()))
5164        }
5165        PhysicalPlan::Project { input, .. }
5166        | PhysicalPlan::Filter { input, .. }
5167        | PhysicalPlan::Distinct { input }
5168        | PhysicalPlan::Limit { input, .. }
5169        | PhysicalPlan::Sort { input, .. }
5170        | PhysicalPlan::MaterializeCte { input, .. } => vector_index_from_plan(input),
5171        PhysicalPlan::Join { left, right, .. } => {
5172            vector_index_from_plan(left).or_else(|| vector_index_from_plan(right))
5173        }
5174        PhysicalPlan::Pipeline(plans) => plans.iter().find_map(vector_index_from_plan),
5175        _ => None,
5176    }
5177}
5178
5179fn hydrate_relational_vector_values(relational: &RelationalStore, vectors: &[VectorEntry]) {
5180    if vectors.is_empty() {
5181        return;
5182    }
5183    let mut tables = relational.tables.write();
5184    for entry in vectors {
5185        let Some(rows) = tables.get_mut(&entry.index.table) else {
5186            continue;
5187        };
5188        if let Some(row) = rows.iter_mut().find(|row| row.row_id == entry.row_id) {
5189            row.values.insert(
5190                entry.index.column.clone(),
5191                Value::Vector(entry.vector.clone()),
5192            );
5193        }
5194    }
5195}
5196
5197fn remove_cached_row(cache: &mut HashMap<String, Vec<VersionedRow>>, table: &str, row_id: RowId) {
5198    if let Some(rows) = cache.get_mut(table) {
5199        rows.retain(|row| row.row_id != row_id);
5200    }
5201}
5202
5203fn query_outcome_from_result(result: &Result<QueryResult>) -> QueryOutcome {
5204    match result {
5205        Ok(query_result) => QueryOutcome::Success {
5206            row_count: if query_result.rows.is_empty() {
5207                query_result.rows_affected as usize
5208            } else {
5209                query_result.rows.len()
5210            },
5211        },
5212        Err(error) => QueryOutcome::Error {
5213            error: error.to_string(),
5214        },
5215    }
5216}
5217
5218fn maybe_prebuild_hnsw(vector_store: &VectorStore, accountant: &MemoryAccountant) {
5219    let _ = (vector_store, accountant);
5220}
5221
5222fn estimate_row_bytes_for_meta(
5223    values: &HashMap<ColName, Value>,
5224    meta: &TableMeta,
5225    include_vectors: bool,
5226) -> usize {
5227    let mut bytes = 96usize;
5228    for column in &meta.columns {
5229        let Some(value) = values.get(&column.name) else {
5230            continue;
5231        };
5232        if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
5233            continue;
5234        }
5235        bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
5236    }
5237    bytes
5238}
5239
5240fn estimate_edge_bytes(
5241    source: NodeId,
5242    target: NodeId,
5243    edge_type: &str,
5244    properties: &HashMap<String, Value>,
5245) -> usize {
5246    AdjEntry {
5247        source,
5248        target,
5249        edge_type: edge_type.to_string(),
5250        properties: properties.clone(),
5251        created_tx: TxId(0),
5252        deleted_tx: None,
5253        lsn: Lsn(0),
5254    }
5255    .estimated_bytes()
5256}
5257
5258impl Drop for Database {
5259    fn drop(&mut self) {
5260        if self.closed.swap(true, Ordering::SeqCst) {
5261            return;
5262        }
5263        let runtime = self.pruning_runtime.get_mut();
5264        runtime.shutdown.store(true, Ordering::SeqCst);
5265        if let Some(handle) = runtime.handle.take() {
5266            let _ = handle.join();
5267        }
5268        self.subscriptions.lock().subscribers.clear();
5269        if let Some(persistence) = &self.persistence {
5270            persistence.close();
5271        }
5272    }
5273}
5274
5275fn sleep_with_shutdown(shutdown: &AtomicBool, interval: Duration) {
5276    let deadline = Instant::now() + interval;
5277    while !shutdown.load(Ordering::SeqCst) {
5278        let now = Instant::now();
5279        if now >= deadline {
5280            break;
5281        }
5282        let remaining = deadline.saturating_duration_since(now);
5283        thread::sleep(remaining.min(Duration::from_millis(50)));
5284    }
5285}
5286
5287fn prune_expired_rows(
5288    relational_store: &Arc<RelationalStore>,
5289    graph_store: &Arc<GraphStore>,
5290    vector_store: &Arc<VectorStore>,
5291    accountant: &MemoryAccountant,
5292    persistence: Option<&Arc<RedbPersistence>>,
5293    sync_watermark: Lsn,
5294) -> u64 {
5295    let now = Wallclock::now();
5296    let metas = relational_store.table_meta.read().clone();
5297    let mut pruned_by_table: HashMap<String, Vec<RowId>> = HashMap::new();
5298    let mut pruned_node_ids = HashSet::new();
5299    let mut released_row_bytes = 0usize;
5300
5301    {
5302        let mut tables = relational_store.tables.write();
5303        for (table_name, rows) in tables.iter_mut() {
5304            let Some(meta) = metas.get(table_name) else {
5305                continue;
5306            };
5307            if meta.default_ttl_seconds.is_none() {
5308                continue;
5309            }
5310
5311            rows.retain(|row| {
5312                if !row_is_prunable(row, meta, now, sync_watermark) {
5313                    return true;
5314                }
5315
5316                pruned_by_table
5317                    .entry(table_name.clone())
5318                    .or_default()
5319                    .push(row.row_id);
5320                released_row_bytes = released_row_bytes
5321                    .saturating_add(estimate_row_bytes_for_meta(&row.values, meta, false));
5322                if let Some(Value::Uuid(id)) = row.values.get("id") {
5323                    pruned_node_ids.insert(*id);
5324                }
5325                false
5326            });
5327        }
5328    }
5329
5330    let pruned_row_ids: HashSet<RowId> = pruned_by_table
5331        .values()
5332        .flat_map(|rows| rows.iter().copied())
5333        .collect();
5334    if pruned_row_ids.is_empty() {
5335        return 0;
5336    }
5337
5338    let released_vector_bytes = vector_store.prune_row_ids(&pruned_row_ids, accountant);
5339
5340    let mut released_edge_bytes = 0usize;
5341    {
5342        let mut forward = graph_store.forward_adj.write();
5343        for entries in forward.values_mut() {
5344            entries.retain(|entry| {
5345                if pruned_node_ids.contains(&entry.source)
5346                    || pruned_node_ids.contains(&entry.target)
5347                {
5348                    released_edge_bytes =
5349                        released_edge_bytes.saturating_add(entry.estimated_bytes());
5350                    false
5351                } else {
5352                    true
5353                }
5354            });
5355        }
5356        forward.retain(|_, entries| !entries.is_empty());
5357    }
5358    {
5359        let mut reverse = graph_store.reverse_adj.write();
5360        for entries in reverse.values_mut() {
5361            entries.retain(|entry| {
5362                !pruned_node_ids.contains(&entry.source) && !pruned_node_ids.contains(&entry.target)
5363            });
5364        }
5365        reverse.retain(|_, entries| !entries.is_empty());
5366    }
5367
5368    if let Some(persistence) = persistence {
5369        for table_name in pruned_by_table.keys() {
5370            let rows = relational_store
5371                .tables
5372                .read()
5373                .get(table_name)
5374                .cloned()
5375                .unwrap_or_default();
5376            let _ = persistence.rewrite_table_rows(table_name, &rows);
5377        }
5378
5379        let vectors = vector_store.all_entries();
5380        let edges = graph_store
5381            .forward_adj
5382            .read()
5383            .values()
5384            .flat_map(|entries| entries.iter().cloned())
5385            .collect::<Vec<_>>();
5386        let _ = persistence.rewrite_vectors(&vectors);
5387        let _ = persistence.rewrite_graph_edges(&edges);
5388    }
5389
5390    accountant.release(
5391        released_row_bytes
5392            .saturating_add(released_vector_bytes)
5393            .saturating_add(released_edge_bytes),
5394    );
5395
5396    pruned_row_ids.len() as u64
5397}
5398
5399fn row_is_prunable(
5400    row: &VersionedRow,
5401    meta: &TableMeta,
5402    now: Wallclock,
5403    sync_watermark: Lsn,
5404) -> bool {
5405    if meta.sync_safe && row.lsn >= sync_watermark {
5406        return false;
5407    }
5408
5409    let Some(default_ttl_seconds) = meta.default_ttl_seconds else {
5410        return false;
5411    };
5412
5413    if let Some(expires_column) = &meta.expires_column {
5414        match row.values.get(expires_column) {
5415            Some(Value::Timestamp(millis)) if *millis == i64::MAX => return false,
5416            Some(Value::Timestamp(millis)) if *millis < 0 => return true,
5417            Some(Value::Timestamp(millis)) => return (*millis as u64) <= now.0,
5418            Some(Value::Null) | None => {}
5419            Some(_) => {}
5420        }
5421    }
5422
5423    let ttl_millis = default_ttl_seconds.saturating_mul(1000);
5424    row.created_at
5425        .map(|created_at| now.0.saturating_sub(created_at.0) > ttl_millis)
5426        .unwrap_or(false)
5427}
5428
5429fn max_tx_across_all(
5430    relational: &RelationalStore,
5431    graph: &GraphStore,
5432    vector: &VectorStore,
5433) -> TxId {
5434    let relational_max = relational
5435        .tables
5436        .read()
5437        .values()
5438        .flat_map(|rows| rows.iter())
5439        .flat_map(|row| std::iter::once(row.created_tx).chain(row.deleted_tx))
5440        .max()
5441        .unwrap_or(TxId(0));
5442    let graph_max = graph
5443        .forward_adj
5444        .read()
5445        .values()
5446        .flat_map(|entries| entries.iter())
5447        .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
5448        .max()
5449        .unwrap_or(TxId(0));
5450    let vector_max = vector
5451        .all_entries()
5452        .into_iter()
5453        .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
5454        .max()
5455        .unwrap_or(TxId(0));
5456
5457    relational_max.max(graph_max).max(vector_max)
5458}
5459
5460fn max_lsn_across_all(
5461    relational: &RelationalStore,
5462    graph: &GraphStore,
5463    vector: &VectorStore,
5464) -> Lsn {
5465    let relational_max = relational
5466        .tables
5467        .read()
5468        .values()
5469        .flat_map(|rows| rows.iter().map(|row| row.lsn))
5470        .max()
5471        .unwrap_or(Lsn(0));
5472    let graph_max = graph
5473        .forward_adj
5474        .read()
5475        .values()
5476        .flat_map(|entries| entries.iter().map(|entry| entry.lsn))
5477        .max()
5478        .unwrap_or(Lsn(0));
5479    let vector_max = vector
5480        .all_entries()
5481        .into_iter()
5482        .map(|entry| entry.lsn)
5483        .max()
5484        .unwrap_or(Lsn(0));
5485
5486    relational_max.max(graph_max).max(vector_max)
5487}
5488
5489fn persisted_memory_limit(path: &Path) -> Result<Option<usize>> {
5490    if !path.exists() {
5491        return Ok(None);
5492    }
5493    let persistence = RedbPersistence::open(path)?;
5494    let limit = persistence.load_config_value::<usize>("memory_limit")?;
5495    persistence.close();
5496    Ok(limit)
5497}
5498
5499fn is_fatal_sync_apply_error(err: &Error) -> bool {
5500    matches!(
5501        err,
5502        Error::MemoryBudgetExceeded { .. } | Error::DiskBudgetExceeded { .. }
5503    )
5504}
5505
5506fn ddl_change_from_create_table(ct: &CreateTable) -> DdlChange {
5507    DdlChange::CreateTable {
5508        name: ct.name.clone(),
5509        columns: ct
5510            .columns
5511            .iter()
5512            .map(|col| {
5513                (
5514                    col.name.clone(),
5515                    sql_type_for_ast_column(col, &ct.propagation_rules),
5516                )
5517            })
5518            .collect(),
5519        constraints: create_table_constraints_from_ast(ct),
5520    }
5521}
5522
5523fn ddl_change_from_meta(name: &str, meta: &TableMeta) -> DdlChange {
5524    ddl_change_from_meta_excluding(name, meta, &HashSet::new())
5525}
5526
5527fn ddl_change_from_meta_excluding(
5528    name: &str,
5529    meta: &TableMeta,
5530    excluded_columns: &HashSet<String>,
5531) -> DdlChange {
5532    DdlChange::CreateTable {
5533        name: name.to_string(),
5534        columns: meta
5535            .columns
5536            .iter()
5537            .filter(|col| !excluded_columns.contains(&col.name))
5538            .map(|col| {
5539                (
5540                    col.name.clone(),
5541                    sql_type_for_meta_column(col, &meta.propagation_rules),
5542                )
5543            })
5544            .collect(),
5545        constraints: create_table_constraints_from_meta(meta),
5546    }
5547}
5548
5549fn full_snapshot_ddl(metas: &HashMap<String, TableMeta>) -> Vec<DdlChange> {
5550    let mut names = metas.keys().cloned().collect::<Vec<_>>();
5551    names.sort();
5552
5553    let mut emitted = HashSet::new();
5554    let mut ddl = Vec::new();
5555    while emitted.len() < names.len() {
5556        let before = emitted.len();
5557        for name in &names {
5558            if emitted.contains(name) {
5559                continue;
5560            }
5561            let Some(meta) = metas.get(name) else {
5562                continue;
5563            };
5564            let deps_ready = meta
5565                .columns
5566                .iter()
5567                .filter_map(|column| column.rank_policy.as_ref())
5568                .all(|policy| {
5569                    policy.joined_table == *name
5570                        || !metas.contains_key(&policy.joined_table)
5571                        || emitted.contains(&policy.joined_table)
5572                });
5573            if deps_ready {
5574                push_snapshot_table_ddl(&mut ddl, name, meta);
5575                emitted.insert(name.clone());
5576            }
5577        }
5578        if emitted.len() == before {
5579            for name in &names {
5580                if !emitted.contains(name)
5581                    && let Some(meta) = metas.get(name)
5582                {
5583                    push_snapshot_table_ddl(&mut ddl, name, meta);
5584                    emitted.insert(name.clone());
5585                }
5586            }
5587        }
5588    }
5589    ddl
5590}
5591
5592fn push_snapshot_table_ddl(ddl: &mut Vec<DdlChange>, name: &str, meta: &TableMeta) {
5593    let deferred_self_rank_columns = meta
5594        .columns
5595        .iter()
5596        .filter(|column| {
5597            column
5598                .rank_policy
5599                .as_ref()
5600                .is_some_and(|policy| policy.joined_table == name)
5601        })
5602        .map(|column| column.name.clone())
5603        .collect::<HashSet<_>>();
5604    ddl.push(ddl_change_from_meta_excluding(
5605        name,
5606        meta,
5607        &deferred_self_rank_columns,
5608    ));
5609    for index in &meta.indexes {
5610        if index.kind == contextdb_core::IndexKind::UserDeclared {
5611            ddl.push(DdlChange::CreateIndex {
5612                table: name.to_string(),
5613                name: index.name.clone(),
5614                columns: index.columns.clone(),
5615            });
5616        }
5617    }
5618    if !deferred_self_rank_columns.is_empty() {
5619        ddl.push(DdlChange::AlterTable {
5620            name: name.to_string(),
5621            columns: meta
5622                .columns
5623                .iter()
5624                .map(|col| {
5625                    (
5626                        col.name.clone(),
5627                        sql_type_for_meta_column(col, &meta.propagation_rules),
5628                    )
5629                })
5630                .collect(),
5631            constraints: create_table_constraints_from_meta(meta),
5632        });
5633    }
5634}
5635
5636fn sql_type_for_ast(data_type: &DataType) -> String {
5637    match data_type {
5638        DataType::Uuid => "UUID".to_string(),
5639        DataType::Text => "TEXT".to_string(),
5640        DataType::Integer => "INTEGER".to_string(),
5641        DataType::Real => "REAL".to_string(),
5642        DataType::Boolean => "BOOLEAN".to_string(),
5643        DataType::Timestamp => "TIMESTAMP".to_string(),
5644        DataType::Json => "JSON".to_string(),
5645        DataType::Vector(dim) => format!("VECTOR({dim})"),
5646        DataType::TxId => "TXID".to_string(),
5647    }
5648}
5649
5650fn sql_type_for_ast_column(
5651    col: &contextdb_parser::ast::ColumnDef,
5652    _rules: &[contextdb_parser::ast::AstPropagationRule],
5653) -> String {
5654    let mut ty = sql_type_for_ast(&col.data_type);
5655    append_ast_quantization(&mut ty, col.quantization);
5656    if let Some(reference) = &col.references {
5657        ty.push_str(&format!(
5658            " REFERENCES {}({})",
5659            reference.table, reference.column
5660        ));
5661        for rule in &reference.propagation_rules {
5662            if let contextdb_parser::ast::AstPropagationRule::FkState {
5663                trigger_state,
5664                target_state,
5665                max_depth,
5666                abort_on_failure,
5667            } = rule
5668            {
5669                ty.push_str(&format!(
5670                    " ON STATE {} PROPAGATE SET {}",
5671                    trigger_state, target_state
5672                ));
5673                if max_depth.unwrap_or(10) != 10 {
5674                    ty.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
5675                }
5676                if *abort_on_failure {
5677                    ty.push_str(" ABORT ON FAILURE");
5678                }
5679            }
5680        }
5681    }
5682    if col.primary_key {
5683        ty.push_str(" PRIMARY KEY");
5684    }
5685    if !col.nullable && !col.primary_key {
5686        ty.push_str(" NOT NULL");
5687    }
5688    if col.unique {
5689        ty.push_str(" UNIQUE");
5690    }
5691    if col.immutable {
5692        ty.push_str(" IMMUTABLE");
5693    }
5694    if let Some(policy) = col.rank_policy.as_deref() {
5695        ty.push_str(&format!(
5696            " RANK_POLICY (JOIN {} ON {}, FORMULA '{}', SORT_KEY {})",
5697            policy.joined_table,
5698            policy.joined_column,
5699            sql_quote(&policy.formula),
5700            policy.sort_key
5701        ));
5702    }
5703    ty
5704}
5705
5706fn sql_type_for_meta_column(col: &contextdb_core::ColumnDef, rules: &[PropagationRule]) -> String {
5707    let mut ty = match col.column_type {
5708        ColumnType::Integer => "INTEGER".to_string(),
5709        ColumnType::Real => "REAL".to_string(),
5710        ColumnType::Text => "TEXT".to_string(),
5711        ColumnType::Boolean => "BOOLEAN".to_string(),
5712        ColumnType::Json => "JSON".to_string(),
5713        ColumnType::Uuid => "UUID".to_string(),
5714        ColumnType::Vector(dim) => format!("VECTOR({dim})"),
5715        ColumnType::Timestamp => "TIMESTAMP".to_string(),
5716        ColumnType::TxId => "TXID".to_string(),
5717    };
5718    append_core_quantization(&mut ty, col.quantization);
5719
5720    let fk_rules = rules
5721        .iter()
5722        .filter_map(|rule| match rule {
5723            PropagationRule::ForeignKey {
5724                fk_column,
5725                referenced_table,
5726                referenced_column,
5727                trigger_state,
5728                target_state,
5729                max_depth,
5730                abort_on_failure,
5731            } if fk_column == &col.name => Some((
5732                referenced_table,
5733                referenced_column,
5734                trigger_state,
5735                target_state,
5736                *max_depth,
5737                *abort_on_failure,
5738            )),
5739            _ => None,
5740        })
5741        .collect::<Vec<_>>();
5742
5743    if let Some(reference) = &col.references {
5744        ty.push_str(&format!(
5745            " REFERENCES {}({})",
5746            reference.table, reference.column
5747        ));
5748    } else if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
5749        ty.push_str(&format!(
5750            " REFERENCES {}({})",
5751            referenced_table, referenced_column
5752        ));
5753    }
5754
5755    if col.references.is_some() || !fk_rules.is_empty() {
5756        for (_, _, trigger_state, target_state, max_depth, abort_on_failure) in fk_rules {
5757            ty.push_str(&format!(
5758                " ON STATE {} PROPAGATE SET {}",
5759                trigger_state, target_state
5760            ));
5761            if max_depth != 10 {
5762                ty.push_str(&format!(" MAX DEPTH {max_depth}"));
5763            }
5764            if abort_on_failure {
5765                ty.push_str(" ABORT ON FAILURE");
5766            }
5767        }
5768    }
5769    if col.primary_key {
5770        ty.push_str(" PRIMARY KEY");
5771    }
5772    if !col.nullable && !col.primary_key {
5773        ty.push_str(" NOT NULL");
5774    }
5775    if col.unique {
5776        ty.push_str(" UNIQUE");
5777    }
5778    if col.expires {
5779        ty.push_str(" EXPIRES");
5780    }
5781    if col.immutable {
5782        ty.push_str(" IMMUTABLE");
5783    }
5784    if let Some(policy) = &col.rank_policy {
5785        ty.push_str(&format!(
5786            " RANK_POLICY (JOIN {} ON {}, FORMULA '{}', SORT_KEY {})",
5787            policy.joined_table,
5788            policy.joined_column,
5789            sql_quote(&policy.formula),
5790            policy.sort_key
5791        ));
5792    }
5793
5794    ty
5795}
5796
5797fn append_ast_quantization(
5798    ty: &mut String,
5799    quantization: contextdb_parser::ast::VectorQuantization,
5800) {
5801    let quantization = match quantization {
5802        contextdb_parser::ast::VectorQuantization::F32 => return,
5803        contextdb_parser::ast::VectorQuantization::SQ8 => "SQ8",
5804        contextdb_parser::ast::VectorQuantization::SQ4 => "SQ4",
5805    };
5806    ty.push_str(&format!(" WITH (quantization = '{quantization}')"));
5807}
5808
5809fn append_core_quantization(ty: &mut String, quantization: contextdb_core::VectorQuantization) {
5810    if !matches!(quantization, contextdb_core::VectorQuantization::F32) {
5811        ty.push_str(&format!(
5812            " WITH (quantization = '{}')",
5813            quantization.as_str()
5814        ));
5815    }
5816}
5817
5818fn sql_quote(value: &str) -> String {
5819    value.replace('\'', "''")
5820}
5821
5822fn normalize_schema_type(value: &str) -> String {
5823    value.split_whitespace().collect::<Vec<_>>().join(" ")
5824}
5825
5826fn create_table_constraints_from_ast(ct: &CreateTable) -> Vec<String> {
5827    let mut constraints = Vec::new();
5828
5829    if ct.immutable {
5830        constraints.push("IMMUTABLE".to_string());
5831    }
5832
5833    if let Some(sm) = &ct.state_machine {
5834        let transitions = sm
5835            .transitions
5836            .iter()
5837            .map(|(from, tos)| format!("{from} -> [{}]", tos.join(", ")))
5838            .collect::<Vec<_>>()
5839            .join(", ");
5840        constraints.push(format!("STATE MACHINE ({}: {})", sm.column, transitions));
5841    }
5842
5843    if !ct.dag_edge_types.is_empty() {
5844        let edge_types = ct
5845            .dag_edge_types
5846            .iter()
5847            .map(|edge_type| format!("'{edge_type}'"))
5848            .collect::<Vec<_>>()
5849            .join(", ");
5850        constraints.push(format!("DAG({edge_types})"));
5851    }
5852
5853    if let Some(retain) = &ct.retain {
5854        let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(retain.duration_seconds));
5855        if retain.sync_safe {
5856            clause.push_str(" SYNC SAFE");
5857        }
5858        constraints.push(clause);
5859    }
5860
5861    for unique_constraint in &ct.unique_constraints {
5862        constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
5863    }
5864
5865    for rule in &ct.propagation_rules {
5866        match rule {
5867            contextdb_parser::ast::AstPropagationRule::EdgeState {
5868                edge_type,
5869                direction,
5870                trigger_state,
5871                target_state,
5872                max_depth,
5873                abort_on_failure,
5874            } => {
5875                let mut clause = format!(
5876                    "PROPAGATE ON EDGE {} {} STATE {} SET {}",
5877                    edge_type, direction, trigger_state, target_state
5878                );
5879                if max_depth.unwrap_or(10) != 10 {
5880                    clause.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
5881                }
5882                if *abort_on_failure {
5883                    clause.push_str(" ABORT ON FAILURE");
5884                }
5885                constraints.push(clause);
5886            }
5887            contextdb_parser::ast::AstPropagationRule::VectorExclusion { trigger_state } => {
5888                constraints.push(format!(
5889                    "PROPAGATE ON STATE {} EXCLUDE VECTOR",
5890                    trigger_state
5891                ));
5892            }
5893            contextdb_parser::ast::AstPropagationRule::FkState { .. } => {}
5894        }
5895    }
5896
5897    constraints
5898}
5899
5900fn create_table_constraints_from_meta(meta: &TableMeta) -> Vec<String> {
5901    let mut constraints = Vec::new();
5902
5903    if meta.immutable {
5904        constraints.push("IMMUTABLE".to_string());
5905    }
5906
5907    if let Some(sm) = &meta.state_machine {
5908        let states = sm
5909            .transitions
5910            .iter()
5911            .map(|(from, to)| format!("{from} -> [{}]", to.join(", ")))
5912            .collect::<Vec<_>>()
5913            .join(", ");
5914        constraints.push(format!("STATE MACHINE ({}: {})", sm.column, states));
5915    }
5916
5917    if !meta.dag_edge_types.is_empty() {
5918        let edge_types = meta
5919            .dag_edge_types
5920            .iter()
5921            .map(|edge_type| format!("'{edge_type}'"))
5922            .collect::<Vec<_>>()
5923            .join(", ");
5924        constraints.push(format!("DAG({edge_types})"));
5925    }
5926
5927    if let Some(ttl_seconds) = meta.default_ttl_seconds {
5928        let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(ttl_seconds));
5929        if meta.sync_safe {
5930            clause.push_str(" SYNC SAFE");
5931        }
5932        constraints.push(clause);
5933    }
5934
5935    for unique_constraint in &meta.unique_constraints {
5936        constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
5937    }
5938
5939    for rule in &meta.propagation_rules {
5940        match rule {
5941            PropagationRule::Edge {
5942                edge_type,
5943                direction,
5944                trigger_state,
5945                target_state,
5946                max_depth,
5947                abort_on_failure,
5948            } => {
5949                let dir = match direction {
5950                    Direction::Incoming => "INCOMING",
5951                    Direction::Outgoing => "OUTGOING",
5952                    Direction::Both => "BOTH",
5953                };
5954                let mut clause = format!(
5955                    "PROPAGATE ON EDGE {} {} STATE {} SET {}",
5956                    edge_type, dir, trigger_state, target_state
5957                );
5958                if *max_depth != 10 {
5959                    clause.push_str(&format!(" MAX DEPTH {max_depth}"));
5960                }
5961                if *abort_on_failure {
5962                    clause.push_str(" ABORT ON FAILURE");
5963                }
5964                constraints.push(clause);
5965            }
5966            PropagationRule::VectorExclusion { trigger_state } => {
5967                constraints.push(format!(
5968                    "PROPAGATE ON STATE {} EXCLUDE VECTOR",
5969                    trigger_state
5970                ));
5971            }
5972            PropagationRule::ForeignKey { .. } => {}
5973        }
5974    }
5975
5976    constraints
5977}
5978
5979fn ttl_seconds_to_sql(seconds: u64) -> String {
5980    if seconds.is_multiple_of(24 * 60 * 60) {
5981        format!("{} DAYS", seconds / (24 * 60 * 60))
5982    } else if seconds.is_multiple_of(60 * 60) {
5983        format!("{} HOURS", seconds / (60 * 60))
5984    } else if seconds.is_multiple_of(60) {
5985        format!("{} MINUTES", seconds / 60)
5986    } else {
5987        format!("{seconds} SECONDS")
5988    }
5989}