Skip to main content

contextdb_engine/
database.rs

1use crate::composite_store::{ChangeLogEntry, CompositeStore};
2use crate::executor::execute_plan;
3use crate::persistence::RedbPersistence;
4use crate::persistent_store::PersistentCompositeStore;
5use crate::plugin::{
6    CommitEvent, CommitSource, CorePlugin, DatabasePlugin, PluginHealth, QueryOutcome,
7    SubscriptionMetrics,
8};
9use crate::schema_enforcer::validate_dml;
10use crate::sync_types::{
11    ApplyResult, ChangeSet, Conflict, ConflictPolicies, ConflictPolicy, DdlChange, EdgeChange,
12    NaturalKey, RowChange, VectorChange,
13};
14use contextdb_core::*;
15use contextdb_graph::{GraphStore, MemGraphExecutor};
16use contextdb_parser::Statement;
17use contextdb_parser::ast::{AlterAction, CreateTable, DataType};
18use contextdb_planner::PhysicalPlan;
19use contextdb_relational::{MemRelationalExecutor, RelationalStore};
20use contextdb_tx::{TxManager, WriteSetApplicator};
21use contextdb_vector::{HnswIndex, MemVectorExecutor, VectorStore};
22use parking_lot::{Mutex, RwLock};
23use roaring::RoaringTreemap;
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::path::Path;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
28use std::sync::{Arc, OnceLock};
29use std::thread::{self, JoinHandle};
30use std::time::{Duration, Instant};
31
32type DynStore = Box<dyn WriteSetApplicator>;
33const DEFAULT_SUBSCRIPTION_CAPACITY: usize = 64;
34const MAX_STATEMENT_CACHE_ENTRIES: usize = 1024;
35
36#[derive(Debug, Clone)]
37pub struct IndexCandidate {
38    pub name: String,
39    pub rejected_reason: std::borrow::Cow<'static, str>,
40}
41
42#[derive(Debug, Clone, Default)]
43pub struct QueryTrace {
44    pub physical_plan: &'static str,
45    pub index_used: Option<String>,
46    pub predicates_pushed: smallvec::SmallVec<[std::borrow::Cow<'static, str>; 4]>,
47    pub indexes_considered: smallvec::SmallVec<[IndexCandidate; 4]>,
48    pub sort_elided: bool,
49}
50
51impl QueryTrace {
52    /// Stub default: scan-labeled trace with no plan data. The no-op writes
53    /// this everywhere. Impl must replace construction sites with real plan
54    /// inspection.
55    pub fn scan() -> Self {
56        Self {
57            physical_plan: "Scan",
58            ..Default::default()
59        }
60    }
61}
62
63#[derive(Debug, Clone)]
64pub struct CascadeReport {
65    pub dropped_indexes: Vec<String>,
66}
67
68#[derive(Debug, Clone)]
69pub struct QueryResult {
70    pub columns: Vec<String>,
71    pub rows: Vec<Vec<Value>>,
72    pub rows_affected: u64,
73    pub trace: QueryTrace,
74    pub cascade: Option<CascadeReport>,
75}
76
77#[derive(Debug, Clone)]
78struct CachedStatement {
79    stmt: Statement,
80    plan: PhysicalPlan,
81}
82
83impl QueryResult {
84    pub fn empty() -> Self {
85        Self {
86            columns: vec![],
87            rows: vec![],
88            rows_affected: 0,
89            trace: QueryTrace::scan(),
90            cascade: None,
91        }
92    }
93
94    pub fn empty_with_affected(rows_affected: u64) -> Self {
95        Self {
96            columns: vec![],
97            rows: vec![],
98            rows_affected,
99            trace: QueryTrace::scan(),
100            cascade: None,
101        }
102    }
103}
104
105thread_local! {
106    static SNAPSHOT_OVERRIDE: std::cell::RefCell<Option<SnapshotId>> =
107        const { std::cell::RefCell::new(None) };
108}
109
110pub struct Database {
111    tx_mgr: Arc<TxManager<DynStore>>,
112    relational_store: Arc<RelationalStore>,
113    graph_store: Arc<GraphStore>,
114    vector_store: Arc<VectorStore>,
115    change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
116    ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
117    persistence: Option<Arc<RedbPersistence>>,
118    relational: MemRelationalExecutor<DynStore>,
119    graph: MemGraphExecutor<DynStore>,
120    vector: MemVectorExecutor<DynStore>,
121    session_tx: Mutex<Option<TxId>>,
122    instance_id: uuid::Uuid,
123    plugin: Arc<dyn DatabasePlugin>,
124    accountant: Arc<MemoryAccountant>,
125    conflict_policies: RwLock<ConflictPolicies>,
126    subscriptions: Mutex<SubscriptionState>,
127    pruning_runtime: Mutex<PruningRuntime>,
128    pruning_guard: Arc<Mutex<()>>,
129    disk_limit: AtomicU64,
130    disk_limit_startup_ceiling: AtomicU64,
131    sync_watermark: Arc<AtomicLsn>,
132    closed: AtomicBool,
133    rows_examined: AtomicU64,
134    statement_cache: RwLock<HashMap<String, Arc<CachedStatement>>>,
135}
136
137pub(crate) enum InsertRowResult {
138    Inserted(RowId),
139    NoOp,
140}
141
142#[derive(Debug, Default)]
143pub(crate) struct IndexScanTxOverlay {
144    pub deleted_row_ids: std::collections::HashSet<RowId>,
145    pub matching_inserts: Vec<VersionedRow>,
146}
147
148enum RowConstraintCheck {
149    Valid,
150    DuplicateUniqueNoOp,
151}
152
153enum ConstraintProbe {
154    NoIndex,
155    NoMatch,
156    Match(RowId),
157}
158
159impl std::fmt::Debug for Database {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        f.debug_struct("Database")
162            .field("instance_id", &self.instance_id)
163            .finish_non_exhaustive()
164    }
165}
166
167#[derive(Debug, Clone)]
168struct PropagationQueueEntry {
169    table: String,
170    uuid: uuid::Uuid,
171    target_state: String,
172    depth: u32,
173    abort_on_failure: bool,
174}
175
176#[derive(Debug, Clone, Copy)]
177struct PropagationSource<'a> {
178    table: &'a str,
179    uuid: uuid::Uuid,
180    state: &'a str,
181    depth: u32,
182}
183
184#[derive(Debug, Clone, Copy)]
185struct PropagationContext<'a> {
186    tx: TxId,
187    snapshot: SnapshotId,
188    metas: &'a HashMap<String, TableMeta>,
189}
190
191#[derive(Debug)]
192struct SubscriptionState {
193    subscribers: Vec<SyncSender<CommitEvent>>,
194    events_sent: u64,
195    events_dropped: u64,
196}
197
198impl SubscriptionState {
199    fn new() -> Self {
200        Self {
201            subscribers: Vec::new(),
202            events_sent: 0,
203            events_dropped: 0,
204        }
205    }
206}
207
208#[derive(Debug)]
209struct PruningRuntime {
210    shutdown: Arc<AtomicBool>,
211    handle: Option<JoinHandle<()>>,
212}
213
214impl PruningRuntime {
215    fn new() -> Self {
216        Self {
217            shutdown: Arc::new(AtomicBool::new(false)),
218            handle: None,
219        }
220    }
221}
222
223impl Database {
224    #[allow(clippy::too_many_arguments)]
225    fn build_db(
226        tx_mgr: Arc<TxManager<DynStore>>,
227        relational: Arc<RelationalStore>,
228        graph: Arc<GraphStore>,
229        vector_store: Arc<VectorStore>,
230        hnsw: Arc<OnceLock<parking_lot::RwLock<Option<HnswIndex>>>>,
231        change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
232        ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
233        persistence: Option<Arc<RedbPersistence>>,
234        plugin: Arc<dyn DatabasePlugin>,
235        accountant: Arc<MemoryAccountant>,
236        disk_limit: Option<u64>,
237        disk_limit_startup_ceiling: Option<u64>,
238    ) -> Self {
239        Self {
240            tx_mgr: tx_mgr.clone(),
241            relational_store: relational.clone(),
242            graph_store: graph.clone(),
243            vector_store: vector_store.clone(),
244            change_log,
245            ddl_log,
246            persistence,
247            relational: MemRelationalExecutor::new(relational, tx_mgr.clone()),
248            graph: MemGraphExecutor::new(graph, tx_mgr.clone()),
249            vector: MemVectorExecutor::new_with_accountant(
250                vector_store,
251                tx_mgr.clone(),
252                hnsw,
253                accountant.clone(),
254            ),
255            session_tx: Mutex::new(None),
256            instance_id: uuid::Uuid::new_v4(),
257            plugin,
258            accountant,
259            conflict_policies: RwLock::new(ConflictPolicies::uniform(ConflictPolicy::LatestWins)),
260            subscriptions: Mutex::new(SubscriptionState::new()),
261            pruning_runtime: Mutex::new(PruningRuntime::new()),
262            pruning_guard: Arc::new(Mutex::new(())),
263            disk_limit: AtomicU64::new(disk_limit.unwrap_or(0)),
264            disk_limit_startup_ceiling: AtomicU64::new(disk_limit_startup_ceiling.unwrap_or(0)),
265            sync_watermark: Arc::new(AtomicLsn::new(Lsn(0))),
266            closed: AtomicBool::new(false),
267            rows_examined: AtomicU64::new(0),
268            statement_cache: RwLock::new(HashMap::new()),
269        }
270    }
271
272    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
273        Self::open_with_config(
274            path,
275            Arc::new(CorePlugin),
276            Arc::new(MemoryAccountant::no_limit()),
277        )
278    }
279
280    pub fn open_memory() -> Self {
281        Self::open_memory_with_plugin_and_accountant(
282            Arc::new(CorePlugin),
283            Arc::new(MemoryAccountant::no_limit()),
284        )
285        .expect("failed to open in-memory database")
286    }
287
288    fn open_loaded(
289        path: impl AsRef<Path>,
290        plugin: Arc<dyn DatabasePlugin>,
291        mut accountant: Arc<MemoryAccountant>,
292        startup_disk_limit: Option<u64>,
293    ) -> Result<Self> {
294        let path = path.as_ref();
295        let persistence = if path.exists() {
296            Arc::new(RedbPersistence::open(path)?)
297        } else {
298            Arc::new(RedbPersistence::create(path)?)
299        };
300        if accountant.usage().limit.is_none()
301            && let Some(limit) = persistence.load_config_value::<usize>("memory_limit")?
302        {
303            accountant = Arc::new(MemoryAccountant::with_budget(limit));
304        }
305        let persisted_disk_limit = persistence.load_config_value::<u64>("disk_limit")?;
306        let startup_disk_ceiling = startup_disk_limit;
307        let effective_disk_limit = match (persisted_disk_limit, startup_disk_limit) {
308            (Some(persisted), Some(ceiling)) => Some(persisted.min(ceiling)),
309            (Some(persisted), None) => Some(persisted),
310            (None, Some(ceiling)) => Some(ceiling),
311            (None, None) => None,
312        };
313
314        let all_meta = persistence.load_all_table_meta()?;
315
316        let relational = Arc::new(RelationalStore::new());
317        for (name, meta) in &all_meta {
318            relational.create_table(name, meta.clone());
319            // Register EVERY index declared in TableMeta.indexes — this
320            // includes auto-indexes (kind=Auto) synthesized at CREATE TABLE
321            // time AND user-declared indexes (kind=UserDeclared).
322            for decl in &meta.indexes {
323                relational.create_index_storage(name, &decl.name, decl.columns.clone());
324            }
325            for row in persistence.load_relational_table(name)? {
326                relational.insert_loaded_row(name, row);
327            }
328        }
329
330        let graph = Arc::new(GraphStore::new());
331        for edge in persistence.load_forward_edges()? {
332            graph.insert_loaded_edge(edge);
333        }
334
335        let hnsw = Arc::new(OnceLock::new());
336        let vector = Arc::new(VectorStore::new(hnsw.clone()));
337        for meta in all_meta.values() {
338            for column in &meta.columns {
339                if let ColumnType::Vector(dimension) = column.column_type {
340                    vector.set_dimension(dimension);
341                    break;
342                }
343            }
344        }
345        for entry in persistence.load_vectors()? {
346            vector.insert_loaded_vector(entry);
347        }
348
349        let max_row_id = relational.max_row_id();
350        let max_tx = max_tx_across_all(&relational, &graph, &vector);
351        let max_lsn = max_lsn_across_all(&relational, &graph, &vector);
352        relational.set_next_row_id(RowId(max_row_id.0.saturating_add(1)));
353
354        let change_log = Arc::new(RwLock::new(persistence.load_change_log()?));
355        let ddl_log = Arc::new(RwLock::new(persistence.load_ddl_log()?));
356        let composite = CompositeStore::new(
357            relational.clone(),
358            graph.clone(),
359            vector.clone(),
360            change_log.clone(),
361            ddl_log.clone(),
362        );
363        let persistent = PersistentCompositeStore::new(composite, persistence.clone());
364        let store: DynStore = Box::new(persistent);
365        let tx_mgr = Arc::new(TxManager::new_with_counters(
366            store,
367            TxId(max_tx.0.saturating_add(1)),
368            Lsn(max_lsn.0.saturating_add(1)),
369            max_tx,
370        ));
371
372        let db = Self::build_db(
373            tx_mgr,
374            relational,
375            graph,
376            vector,
377            hnsw,
378            change_log,
379            ddl_log,
380            Some(persistence),
381            plugin,
382            accountant,
383            effective_disk_limit,
384            startup_disk_ceiling,
385        );
386
387        for meta in all_meta.values() {
388            if !meta.dag_edge_types.is_empty() {
389                db.graph.register_dag_edge_types(&meta.dag_edge_types);
390            }
391        }
392
393        db.account_loaded_state()?;
394        maybe_prebuild_hnsw(&db.vector_store, db.accountant());
395
396        Ok(db)
397    }
398
399    fn open_memory_internal(
400        plugin: Arc<dyn DatabasePlugin>,
401        accountant: Arc<MemoryAccountant>,
402    ) -> Result<Self> {
403        let relational = Arc::new(RelationalStore::new());
404        let graph = Arc::new(GraphStore::new());
405        let hnsw = Arc::new(OnceLock::new());
406        let vector = Arc::new(VectorStore::new(hnsw.clone()));
407        let change_log = Arc::new(RwLock::new(Vec::new()));
408        let ddl_log = Arc::new(RwLock::new(Vec::new()));
409        let store: DynStore = Box::new(CompositeStore::new(
410            relational.clone(),
411            graph.clone(),
412            vector.clone(),
413            change_log.clone(),
414            ddl_log.clone(),
415        ));
416        let tx_mgr = Arc::new(TxManager::new(store));
417
418        let db = Self::build_db(
419            tx_mgr, relational, graph, vector, hnsw, change_log, ddl_log, None, plugin, accountant,
420            None, None,
421        );
422        maybe_prebuild_hnsw(&db.vector_store, db.accountant());
423        Ok(db)
424    }
425
426    pub fn begin(&self) -> TxId {
427        self.tx_mgr.begin()
428    }
429
430    pub fn commit(&self, tx: TxId) -> Result<()> {
431        self.commit_with_source(tx, CommitSource::User)
432    }
433
434    pub fn rollback(&self, tx: TxId) -> Result<()> {
435        let ws = self.tx_mgr.cloned_write_set(tx)?;
436        self.release_insert_allocations(&ws);
437        self.tx_mgr.rollback(tx)
438    }
439
440    pub fn snapshot(&self) -> SnapshotId {
441        self.tx_mgr.snapshot()
442    }
443
444    pub fn execute(&self, sql: &str, params: &HashMap<String, Value>) -> Result<QueryResult> {
445        if let Some(cached) = self.cached_statement(sql) {
446            let active_tx = *self.session_tx.lock();
447            return self.execute_statement_with_plan(
448                &cached.stmt,
449                sql,
450                params,
451                active_tx,
452                Some(&cached.plan),
453            );
454        }
455
456        let stmt = contextdb_parser::parse(sql)?;
457
458        match &stmt {
459            Statement::Begin => {
460                let mut session = self.session_tx.lock();
461                if session.is_none() {
462                    *session = Some(self.begin());
463                }
464                return Ok(QueryResult::empty());
465            }
466            Statement::Commit => {
467                let mut session = self.session_tx.lock();
468                if let Some(tx) = session.take() {
469                    self.commit_with_source(tx, CommitSource::User)?;
470                }
471                return Ok(QueryResult::empty());
472            }
473            Statement::Rollback => {
474                let mut session = self.session_tx.lock();
475                if let Some(tx) = *session {
476                    self.rollback(tx)?;
477                    *session = None;
478                }
479                return Ok(QueryResult::empty());
480            }
481            _ => {}
482        }
483
484        let active_tx = *self.session_tx.lock();
485        self.execute_statement(&stmt, sql, params, active_tx)
486    }
487
488    fn cached_statement(&self, sql: &str) -> Option<Arc<CachedStatement>> {
489        self.statement_cache.read().get(sql).cloned()
490    }
491
492    fn cache_statement_if_eligible(&self, sql: &str, stmt: &Statement, plan: &PhysicalPlan) {
493        if !Self::is_statement_cache_eligible(stmt, plan) {
494            return;
495        }
496
497        let mut cache = self.statement_cache.write();
498        if cache.contains_key(sql) {
499            return;
500        }
501        if cache.len() >= MAX_STATEMENT_CACHE_ENTRIES {
502            return;
503        }
504        cache.insert(
505            sql.to_string(),
506            Arc::new(CachedStatement {
507                stmt: stmt.clone(),
508                plan: plan.clone(),
509            }),
510        );
511    }
512
513    fn is_statement_cache_eligible(stmt: &Statement, plan: &PhysicalPlan) -> bool {
514        matches!((stmt, plan), (Statement::Insert(ins), PhysicalPlan::Insert(_))
515            if !ins.table.eq_ignore_ascii_case("GRAPH")
516                && !ins.table.eq_ignore_ascii_case("__edges"))
517    }
518
519    pub(crate) fn clear_statement_cache(&self) {
520        self.statement_cache.write().clear();
521    }
522
523    #[doc(hidden)]
524    pub fn __statement_cache_len(&self) -> usize {
525        self.statement_cache.read().len()
526    }
527
528    fn execute_autocommit(
529        &self,
530        plan: &PhysicalPlan,
531        params: &HashMap<String, Value>,
532    ) -> Result<QueryResult> {
533        // Reset per-query rows_examined once at the entry point so every
534        // sub-plan (union, CTE, subquery IndexScan) accumulates into the
535        // shared counter rather than overwriting prior counts.
536        self.__reset_rows_examined();
537        match plan {
538            PhysicalPlan::Insert(_) | PhysicalPlan::Delete(_) | PhysicalPlan::Update(_) => {
539                let tx = self.begin();
540                let result = execute_plan(self, plan, params, Some(tx));
541                match result {
542                    Ok(qr) => {
543                        self.commit_with_source(tx, CommitSource::AutoCommit)?;
544                        Ok(qr)
545                    }
546                    Err(e) => {
547                        let _ = self.rollback(tx);
548                        Err(e)
549                    }
550                }
551            }
552            _ => execute_plan(self, plan, params, None),
553        }
554    }
555
556    pub fn explain(&self, sql: &str) -> Result<String> {
557        let stmt = contextdb_parser::parse(sql)?;
558        let plan = contextdb_planner::plan(&stmt)?;
559        if self.vector_store.vector_count() >= 1000 && !self.vector_store.has_hnsw_index() {
560            maybe_prebuild_hnsw(&self.vector_store, self.accountant());
561        }
562        let mut output = plan.explain();
563        if self.vector_store.has_hnsw_index() {
564            output = output.replace("VectorSearch(", "HNSWSearch(");
565            output = output.replace("VectorSearch {", "HNSWSearch {");
566        } else {
567            output = output.replace("VectorSearch(", "VectorSearch(strategy=BruteForce, ");
568            output = output.replace("VectorSearch {", "VectorSearch { strategy: BruteForce,");
569        }
570        Ok(output)
571    }
572
573    pub fn execute_in_tx(
574        &self,
575        tx: TxId,
576        sql: &str,
577        params: &HashMap<String, Value>,
578    ) -> Result<QueryResult> {
579        let stmt = contextdb_parser::parse(sql)?;
580        self.execute_statement(&stmt, sql, params, Some(tx))
581    }
582
583    fn commit_with_source(&self, tx: TxId, source: CommitSource) -> Result<()> {
584        let mut ws = self.tx_mgr.cloned_write_set(tx)?;
585
586        if !ws.is_empty()
587            && let Err(err) = self.validate_foreign_keys_in_tx(tx)
588        {
589            let _ = self.rollback(tx);
590            return Err(err);
591        }
592
593        if !ws.is_empty()
594            && let Err(err) = self.plugin.pre_commit(&ws, source)
595        {
596            let _ = self.rollback(tx);
597            return Err(err);
598        }
599
600        let lsn = self.tx_mgr.commit_with_lsn(tx)?;
601
602        if !ws.is_empty() {
603            self.release_delete_allocations(&ws);
604            ws.stamp_lsn(lsn);
605            self.plugin.post_commit(&ws, source);
606            self.publish_commit_event_if_subscribers(&ws, source, lsn);
607        }
608
609        Ok(())
610    }
611
612    fn build_commit_event(
613        ws: &contextdb_tx::WriteSet,
614        source: CommitSource,
615        lsn: Lsn,
616    ) -> CommitEvent {
617        let mut tables_changed: Vec<String> = ws
618            .relational_inserts
619            .iter()
620            .map(|(table, _)| table.clone())
621            .chain(
622                ws.relational_deletes
623                    .iter()
624                    .map(|(table, _, _)| table.clone()),
625            )
626            .collect::<HashSet<_>>()
627            .into_iter()
628            .collect();
629        tables_changed.sort();
630
631        CommitEvent {
632            source,
633            lsn,
634            tables_changed,
635            row_count: ws.relational_inserts.len()
636                + ws.relational_deletes.len()
637                + ws.adj_inserts.len()
638                + ws.adj_deletes.len()
639                + ws.vector_inserts.len()
640                + ws.vector_deletes.len(),
641        }
642    }
643
644    fn publish_commit_event_if_subscribers(
645        &self,
646        ws: &contextdb_tx::WriteSet,
647        source: CommitSource,
648        lsn: Lsn,
649    ) {
650        let mut subscriptions = self.subscriptions.lock();
651        if subscriptions.subscribers.is_empty() {
652            return;
653        }
654        let event = Self::build_commit_event(ws, source, lsn);
655        let subscribers = std::mem::take(&mut subscriptions.subscribers);
656        let mut live_subscribers = Vec::with_capacity(subscribers.len());
657
658        for sender in subscribers {
659            match sender.try_send(event.clone()) {
660                Ok(()) => {
661                    subscriptions.events_sent += 1;
662                    live_subscribers.push(sender);
663                }
664                Err(TrySendError::Full(_)) => {
665                    subscriptions.events_dropped += 1;
666                    live_subscribers.push(sender);
667                }
668                Err(TrySendError::Disconnected(_)) => {}
669            }
670        }
671
672        subscriptions.subscribers = live_subscribers;
673    }
674
675    fn stop_pruning_thread(&self) {
676        let handle = {
677            let mut runtime = self.pruning_runtime.lock();
678            runtime.shutdown.store(true, Ordering::SeqCst);
679            let handle = runtime.handle.take();
680            runtime.shutdown = Arc::new(AtomicBool::new(false));
681            handle
682        };
683
684        if let Some(handle) = handle {
685            let _ = handle.join();
686        }
687    }
688
689    fn execute_statement(
690        &self,
691        stmt: &Statement,
692        sql: &str,
693        params: &HashMap<String, Value>,
694        tx: Option<TxId>,
695    ) -> Result<QueryResult> {
696        self.execute_statement_with_plan(stmt, sql, params, tx, None)
697    }
698
699    fn execute_statement_with_plan(
700        &self,
701        stmt: &Statement,
702        sql: &str,
703        params: &HashMap<String, Value>,
704        tx: Option<TxId>,
705        cached_plan: Option<&PhysicalPlan>,
706    ) -> Result<QueryResult> {
707        self.plugin.on_query(sql)?;
708
709        if let Some(change) = self.ddl_change_for_statement(stmt).as_ref() {
710            self.plugin.on_ddl(change)?;
711        }
712
713        // Handle INSERT INTO GRAPH / __edges as a virtual table routing to the graph store.
714        if let Statement::Insert(ins) = stmt
715            && (ins.table.eq_ignore_ascii_case("GRAPH")
716                || ins.table.eq_ignore_ascii_case("__edges"))
717        {
718            return self.execute_graph_insert(ins, params, tx);
719        }
720
721        let started = Instant::now();
722        let result = (|| {
723            if let Some(plan) = cached_plan {
724                return self.run_planned_statement(stmt, plan, params, tx);
725            }
726
727            let (stmt, plan) = {
728                // Pre-resolve InSubquery expressions with CTE context before planning.
729                let stmt = self.pre_resolve_cte_subqueries(stmt, params, tx)?;
730                let plan = contextdb_planner::plan(&stmt)?;
731                self.cache_statement_if_eligible(sql, &stmt, &plan);
732                (stmt, plan)
733            };
734            self.run_planned_statement(&stmt, &plan, params, tx)
735        })();
736        let duration = started.elapsed();
737        let outcome = query_outcome_from_result(&result);
738        self.plugin.post_query(sql, duration, &outcome);
739        result.map(strip_internal_row_id)
740    }
741
742    fn run_planned_statement(
743        &self,
744        stmt: &Statement,
745        plan: &PhysicalPlan,
746        params: &HashMap<String, Value>,
747        tx: Option<TxId>,
748    ) -> Result<QueryResult> {
749        validate_dml(plan, self, params)?;
750        let result = match tx {
751            Some(tx) => {
752                // Reset rows_examined at the top of an in-tx statement so
753                // sub-plans accumulate rather than overwrite.
754                self.__reset_rows_examined();
755                execute_plan(self, plan, params, Some(tx))
756            }
757            None => self.execute_autocommit(plan, params),
758        };
759        if result.is_ok()
760            && let Statement::CreateTable(ct) = stmt
761            && !ct.dag_edge_types.is_empty()
762        {
763            self.graph.register_dag_edge_types(&ct.dag_edge_types);
764        }
765        result
766    }
767
768    /// Handle `INSERT INTO GRAPH (source_id, target_id, edge_type) VALUES (...)`.
769    fn execute_graph_insert(
770        &self,
771        ins: &contextdb_parser::ast::Insert,
772        params: &HashMap<String, Value>,
773        tx: Option<TxId>,
774    ) -> Result<QueryResult> {
775        use crate::executor::resolve_expr;
776
777        let col_index = |name: &str| {
778            ins.columns
779                .iter()
780                .position(|c| c.eq_ignore_ascii_case(name))
781        };
782        let source_idx = col_index("source_id")
783            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires source_id column".into()))?;
784        let target_idx = col_index("target_id")
785            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires target_id column".into()))?;
786        let edge_type_idx = col_index("edge_type")
787            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires edge_type column".into()))?;
788
789        let auto_commit = tx.is_none();
790        let tx = tx.unwrap_or_else(|| self.begin());
791        let mut count = 0u64;
792        for row_exprs in &ins.values {
793            let source = resolve_expr(&row_exprs[source_idx], params)?;
794            let target = resolve_expr(&row_exprs[target_idx], params)?;
795            let edge_type = resolve_expr(&row_exprs[edge_type_idx], params)?;
796
797            let source_uuid = match &source {
798                Value::Uuid(u) => *u,
799                Value::Text(t) => uuid::Uuid::parse_str(t)
800                    .map_err(|e| Error::PlanError(format!("invalid source_id uuid: {e}")))?,
801                _ => return Err(Error::PlanError("source_id must be UUID".into())),
802            };
803            let target_uuid = match &target {
804                Value::Uuid(u) => *u,
805                Value::Text(t) => uuid::Uuid::parse_str(t)
806                    .map_err(|e| Error::PlanError(format!("invalid target_id uuid: {e}")))?,
807                _ => return Err(Error::PlanError("target_id must be UUID".into())),
808            };
809            let edge_type_str = match &edge_type {
810                Value::Text(t) => t.clone(),
811                _ => return Err(Error::PlanError("edge_type must be TEXT".into())),
812            };
813
814            self.insert_edge(
815                tx,
816                source_uuid,
817                target_uuid,
818                edge_type_str,
819                Default::default(),
820            )?;
821            count += 1;
822        }
823
824        if auto_commit {
825            self.commit_with_source(tx, CommitSource::AutoCommit)?;
826        }
827
828        Ok(QueryResult::empty_with_affected(count))
829    }
830
831    fn ddl_change_for_statement(&self, stmt: &Statement) -> Option<DdlChange> {
832        match stmt {
833            Statement::CreateTable(ct) => Some(ddl_change_from_create_table(ct)),
834            Statement::DropTable(dt) => Some(DdlChange::DropTable {
835                name: dt.name.clone(),
836            }),
837            Statement::AlterTable(at) => {
838                let mut meta = self.table_meta(&at.table).unwrap_or_default();
839                // Simulate the alter action on a cloned meta to get post-alteration columns
840                match &at.action {
841                    AlterAction::AddColumn(col) => {
842                        meta.columns.push(contextdb_core::ColumnDef {
843                            name: col.name.clone(),
844                            column_type: crate::executor::map_column_type(&col.data_type),
845                            nullable: col.nullable,
846                            primary_key: col.primary_key,
847                            unique: col.unique,
848                            default: col
849                                .default
850                                .as_ref()
851                                .map(crate::executor::stored_default_expr),
852                            references: col.references.as_ref().map(|reference| {
853                                contextdb_core::ForeignKeyReference {
854                                    table: reference.table.clone(),
855                                    column: reference.column.clone(),
856                                }
857                            }),
858                            expires: col.expires,
859                            immutable: col.immutable,
860                        });
861                        if col.expires {
862                            meta.expires_column = Some(col.name.clone());
863                        }
864                    }
865                    AlterAction::DropColumn {
866                        column: name,
867                        cascade: _,
868                    } => {
869                        meta.columns.retain(|c| c.name != *name);
870                        if meta.expires_column.as_deref() == Some(name.as_str()) {
871                            meta.expires_column = None;
872                        }
873                    }
874                    AlterAction::RenameColumn { from, to } => {
875                        if let Some(c) = meta.columns.iter_mut().find(|c| c.name == *from) {
876                            c.name = to.clone();
877                        }
878                        if meta.expires_column.as_deref() == Some(from.as_str()) {
879                            meta.expires_column = Some(to.clone());
880                        }
881                    }
882                    AlterAction::SetRetain {
883                        duration_seconds,
884                        sync_safe,
885                    } => {
886                        meta.default_ttl_seconds = Some(*duration_seconds);
887                        meta.sync_safe = *sync_safe;
888                    }
889                    AlterAction::DropRetain => {
890                        meta.default_ttl_seconds = None;
891                        meta.sync_safe = false;
892                    }
893                    AlterAction::SetSyncConflictPolicy(_) | AlterAction::DropSyncConflictPolicy => { /* handled in executor */
894                    }
895                }
896                Some(DdlChange::AlterTable {
897                    name: at.table.clone(),
898                    columns: meta
899                        .columns
900                        .iter()
901                        .map(|c| {
902                            (
903                                c.name.clone(),
904                                sql_type_for_meta_column(c, &meta.propagation_rules),
905                            )
906                        })
907                        .collect(),
908                    constraints: create_table_constraints_from_meta(&meta),
909                })
910            }
911            _ => None,
912        }
913    }
914
915    /// Pre-resolve InSubquery expressions within SELECT statements that have CTEs.
916    /// This allows CTE-backed subqueries in WHERE clauses to be evaluated before planning.
917    fn pre_resolve_cte_subqueries(
918        &self,
919        stmt: &Statement,
920        params: &HashMap<String, Value>,
921        tx: Option<TxId>,
922    ) -> Result<Statement> {
923        if let Statement::Select(sel) = stmt
924            && !sel.ctes.is_empty()
925            && sel.body.where_clause.is_some()
926        {
927            use crate::executor::resolve_in_subqueries_with_ctes;
928            let resolved_where = sel
929                .body
930                .where_clause
931                .as_ref()
932                .map(|expr| resolve_in_subqueries_with_ctes(self, expr, params, tx, &sel.ctes))
933                .transpose()?;
934            let mut new_body = sel.body.clone();
935            new_body.where_clause = resolved_where;
936            Ok(Statement::Select(contextdb_parser::ast::SelectStatement {
937                ctes: sel.ctes.clone(),
938                body: new_body,
939            }))
940        } else {
941            Ok(stmt.clone())
942        }
943    }
944
945    pub fn insert_row(
946        &self,
947        tx: TxId,
948        table: &str,
949        values: HashMap<ColName, Value>,
950    ) -> Result<RowId> {
951        // Statement-scoped bound: `Value::TxId(n)` must satisfy
952        // `n <= max(committed_watermark, tx)` so writes inside an active
953        // transaction can reference their own allocated TxId. The error,
954        // when fired, still reports `committed_watermark` per plan B7.
955        let values =
956            self.coerce_row_for_insert(table, values, Some(self.committed_watermark()), Some(tx))?;
957        self.validate_row_constraints(tx, table, &values, None)?;
958        self.relational.insert(tx, table, values)
959    }
960
961    /// UPDATE-aware insert: the UPDATE path first deletes the old row and
962    /// then re-inserts. The constraint probe must skip the old row_id so the
963    /// same PK does not self-collide. The old row's index entry still looks
964    /// visible at the committed-watermark snapshot because its `deleted_tx`
965    /// equals the current (uncommitted) `tx`.
966    pub(crate) fn insert_row_replacing(
967        &self,
968        tx: TxId,
969        table: &str,
970        values: HashMap<ColName, Value>,
971        old_row_id: RowId,
972    ) -> Result<RowId> {
973        let values =
974            self.coerce_row_for_insert(table, values, Some(self.committed_watermark()), Some(tx))?;
975        self.validate_row_constraints(tx, table, &values, Some(old_row_id))?;
976        self.relational.insert(tx, table, values)
977    }
978
979    /// Internal variant used by sync-apply: skips the TXID bound check because
980    /// peer TxIds may legitimately exceed the local watermark. Still enforces
981    /// wrong-variant + reverse-direction TXID column rules.
982    pub(crate) fn insert_row_for_sync(
983        &self,
984        tx: TxId,
985        table: &str,
986        values: HashMap<ColName, Value>,
987    ) -> Result<RowId> {
988        let values = self.coerce_row_for_insert(table, values, None, None)?;
989        self.validate_row_constraints(tx, table, &values, None)?;
990        self.relational.insert(tx, table, values)
991    }
992
993    pub(crate) fn upsert_row_for_sync(
994        &self,
995        tx: TxId,
996        table: &str,
997        conflict_col: &str,
998        values: HashMap<ColName, Value>,
999    ) -> Result<UpsertResult> {
1000        let values = self.coerce_row_for_insert(table, values, None, None)?;
1001        self.upsert_row(tx, table, conflict_col, values)
1002    }
1003
1004    /// Route each row cell through `coerce_value_for_column` for variant
1005    /// compatibility. The one concession to historical `insert_row` behavior
1006    /// is that `Value::Vector` payloads are accepted regardless of declared
1007    /// dimension — prior integration suites (e.g. the 3-component probe into
1008    /// a VECTOR(384) embedding column) depend on the library API NOT enforcing
1009    /// dim equality there. SQL execution (`exec_insert`/`exec_update`) still
1010    /// performs the full dim check because it always threads through the
1011    /// executor module's coercion helpers.
1012    fn coerce_row_for_insert(
1013        &self,
1014        table: &str,
1015        values: HashMap<ColName, Value>,
1016        current_tx_max: Option<TxId>,
1017        active_tx: Option<TxId>,
1018    ) -> Result<HashMap<ColName, Value>> {
1019        let meta = self.table_meta(table);
1020        let mut out: HashMap<ColName, Value> = HashMap::with_capacity(values.len());
1021        for (col, v) in values {
1022            // Vector + Value::Vector: pass straight through (dim check happens on SQL path).
1023            let is_vector_bypass = matches!(&v, Value::Vector(_))
1024                && meta
1025                    .as_ref()
1026                    .and_then(|m| m.columns.iter().find(|c| c.name == col))
1027                    .map(|c| matches!(c.column_type, contextdb_core::ColumnType::Vector(_)))
1028                    .unwrap_or(false);
1029
1030            let coerced = if is_vector_bypass {
1031                v
1032            } else {
1033                crate::executor::coerce_into_column(
1034                    self,
1035                    table,
1036                    &col,
1037                    v,
1038                    current_tx_max,
1039                    active_tx,
1040                )?
1041            };
1042            out.insert(col, coerced);
1043        }
1044        Ok(out)
1045    }
1046
1047    pub(crate) fn insert_row_with_unique_noop(
1048        &self,
1049        tx: TxId,
1050        table: &str,
1051        values: HashMap<ColName, Value>,
1052    ) -> Result<InsertRowResult> {
1053        match self.check_row_constraints(tx, table, &values, None, true)? {
1054            RowConstraintCheck::Valid => self
1055                .relational
1056                .insert(tx, table, values)
1057                .map(InsertRowResult::Inserted),
1058            RowConstraintCheck::DuplicateUniqueNoOp => Ok(InsertRowResult::NoOp),
1059        }
1060    }
1061
1062    pub fn upsert_row(
1063        &self,
1064        tx: TxId,
1065        table: &str,
1066        conflict_col: &str,
1067        values: HashMap<ColName, Value>,
1068    ) -> Result<UpsertResult> {
1069        let snapshot = self.snapshot_for_read();
1070        let existing_row = values
1071            .get(conflict_col)
1072            .map(|conflict_value| {
1073                self.point_lookup_in_tx(tx, table, conflict_col, conflict_value, snapshot)
1074            })
1075            .transpose()?
1076            .flatten();
1077        let existing_row_id = existing_row.as_ref().map(|row| row.row_id);
1078        // Diff-respecting column-level IMMUTABLE check: reject any upsert whose
1079        // flagged-column value differs from the existing local value. Idempotent
1080        // replay (same-value) succeeds; new rows (no existing match) apply normally.
1081        if let (Some(existing), Some(meta)) = (existing_row.as_ref(), self.table_meta(table)) {
1082            for col_def in meta.columns.iter().filter(|c| c.immutable) {
1083                let Some(incoming) = values.get(&col_def.name) else {
1084                    continue;
1085                };
1086                let existing_value = existing.values.get(&col_def.name);
1087                if existing_value != Some(incoming) {
1088                    return Err(Error::ImmutableColumn {
1089                        table: table.to_string(),
1090                        column: col_def.name.clone(),
1091                    });
1092                }
1093            }
1094        }
1095        self.validate_row_constraints(tx, table, &values, existing_row_id)?;
1096
1097        let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1098        let meta = self.table_meta(table);
1099        let new_state = meta
1100            .as_ref()
1101            .and_then(|m| m.state_machine.as_ref())
1102            .and_then(|sm| values.get(&sm.column))
1103            .and_then(Value::as_text)
1104            .map(std::borrow::ToOwned::to_owned);
1105
1106        let result = self
1107            .relational
1108            .upsert(tx, table, conflict_col, values, snapshot)?;
1109
1110        if let (Some(uuid), Some(state), Some(_meta)) =
1111            (row_uuid, new_state.as_deref(), meta.as_ref())
1112            && matches!(result, UpsertResult::Updated)
1113        {
1114            self.propagate_state_change_if_needed(tx, table, Some(uuid), Some(state))?;
1115        }
1116
1117        Ok(result)
1118    }
1119
1120    fn validate_row_constraints(
1121        &self,
1122        tx: TxId,
1123        table: &str,
1124        values: &HashMap<ColName, Value>,
1125        skip_row_id: Option<RowId>,
1126    ) -> Result<()> {
1127        match self.check_row_constraints(tx, table, values, skip_row_id, false)? {
1128            RowConstraintCheck::Valid => Ok(()),
1129            RowConstraintCheck::DuplicateUniqueNoOp => {
1130                unreachable!("strict constraint validation cannot return no-op")
1131            }
1132        }
1133    }
1134
1135    fn check_row_constraints(
1136        &self,
1137        tx: TxId,
1138        table: &str,
1139        values: &HashMap<ColName, Value>,
1140        skip_row_id: Option<RowId>,
1141        allow_duplicate_unique_noop: bool,
1142    ) -> Result<RowConstraintCheck> {
1143        let metas = self.relational_store.table_meta.read();
1144        let meta = metas
1145            .get(table)
1146            .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1147        // Constraint probes MUST see the current committed watermark, not any
1148        // thread-local override. A PK/UNIQUE violation on a committed row must
1149        // be detected even if the caller pinned a pre-violation snapshot for
1150        // read visibility.
1151        let snapshot = self.snapshot();
1152
1153        // Scan the whole table only when no index covers any PK / UNIQUE
1154        // column we need to probe. Pulled lazily so the fast path skips it.
1155        let mut visible_rows_cache: Option<Vec<VersionedRow>> = None;
1156
1157        for column in meta.columns.iter().filter(|column| column.primary_key) {
1158            let Some(value) = values.get(&column.name) else {
1159                continue;
1160            };
1161            if *value == Value::Null {
1162                continue;
1163            }
1164            match self.probe_column_for_constraint(
1165                tx,
1166                table,
1167                &column.name,
1168                value,
1169                snapshot,
1170                skip_row_id,
1171            )? {
1172                ConstraintProbe::Match(_) => {
1173                    return Err(Error::UniqueViolation {
1174                        table: table.to_string(),
1175                        column: column.name.clone(),
1176                    });
1177                }
1178                ConstraintProbe::NoMatch => {}
1179                ConstraintProbe::NoIndex => {
1180                    // Fallback to full scan for PK columns without an index.
1181                    if visible_rows_cache.is_none() {
1182                        visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1183                            Some(tx),
1184                            table,
1185                            snapshot,
1186                            &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1187                        )?);
1188                    }
1189                    let rows = visible_rows_cache.as_deref().unwrap();
1190                    if rows
1191                        .iter()
1192                        .any(|existing| existing.values.get(&column.name) == Some(value))
1193                    {
1194                        return Err(Error::UniqueViolation {
1195                            table: table.to_string(),
1196                            column: column.name.clone(),
1197                        });
1198                    }
1199                }
1200            }
1201        }
1202
1203        let mut duplicate_unique_row_id = None;
1204
1205        for column in meta
1206            .columns
1207            .iter()
1208            .filter(|column| column.unique && !column.primary_key)
1209        {
1210            let Some(value) = values.get(&column.name) else {
1211                continue;
1212            };
1213            if *value == Value::Null {
1214                continue;
1215            }
1216            let matching_row_ids: Vec<RowId> = match self.probe_column_for_constraint(
1217                tx,
1218                table,
1219                &column.name,
1220                value,
1221                snapshot,
1222                skip_row_id,
1223            )? {
1224                ConstraintProbe::Match(rid) => vec![rid],
1225                ConstraintProbe::NoMatch => Vec::new(),
1226                ConstraintProbe::NoIndex => {
1227                    if visible_rows_cache.is_none() {
1228                        visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1229                            Some(tx),
1230                            table,
1231                            snapshot,
1232                            &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1233                        )?);
1234                    }
1235                    let rows = visible_rows_cache.as_deref().unwrap();
1236                    rows.iter()
1237                        .filter(|existing| existing.values.get(&column.name) == Some(value))
1238                        .map(|existing| existing.row_id)
1239                        .collect()
1240                }
1241            };
1242            self.merge_unique_conflict(
1243                table,
1244                &column.name,
1245                &matching_row_ids,
1246                allow_duplicate_unique_noop,
1247                &mut duplicate_unique_row_id,
1248            )?;
1249        }
1250
1251        for unique_constraint in &meta.unique_constraints {
1252            let mut candidate_values = Vec::with_capacity(unique_constraint.len());
1253            let mut has_null = false;
1254
1255            for column_name in unique_constraint {
1256                match values.get(column_name) {
1257                    Some(Value::Null) | None => {
1258                        has_null = true;
1259                        break;
1260                    }
1261                    Some(value) => candidate_values.push(value.clone()),
1262                }
1263            }
1264
1265            if has_null {
1266                continue;
1267            }
1268
1269            let matching_row_ids: Vec<RowId> = if let Some(rid) = self.probe_composite_unique(
1270                tx,
1271                table,
1272                unique_constraint,
1273                &candidate_values,
1274                snapshot,
1275                skip_row_id,
1276            )? {
1277                vec![rid]
1278            } else if self.index_covers_composite(table, unique_constraint) {
1279                Vec::new()
1280            } else {
1281                if visible_rows_cache.is_none() {
1282                    visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1283                        Some(tx),
1284                        table,
1285                        snapshot,
1286                        &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1287                    )?);
1288                }
1289                let rows = visible_rows_cache.as_deref().unwrap();
1290                rows.iter()
1291                    .filter(|existing| {
1292                        unique_constraint.iter().zip(candidate_values.iter()).all(
1293                            |(column_name, value)| existing.values.get(column_name) == Some(value),
1294                        )
1295                    })
1296                    .map(|existing| existing.row_id)
1297                    .collect()
1298            };
1299            // Report composite UNIQUE violations using the first column name,
1300            // matching the plan's single-column error convention.
1301            let column_label = unique_constraint.first().map(|s| s.as_str()).unwrap_or("");
1302            self.merge_unique_conflict(
1303                table,
1304                column_label,
1305                &matching_row_ids,
1306                allow_duplicate_unique_noop,
1307                &mut duplicate_unique_row_id,
1308            )?;
1309        }
1310
1311        if duplicate_unique_row_id.is_some() {
1312            Ok(RowConstraintCheck::DuplicateUniqueNoOp)
1313        } else {
1314            Ok(RowConstraintCheck::Valid)
1315        }
1316    }
1317
1318    fn merge_unique_conflict(
1319        &self,
1320        table: &str,
1321        column: &str,
1322        matching_row_ids: &[RowId],
1323        allow_duplicate_unique_noop: bool,
1324        duplicate_unique_row_id: &mut Option<RowId>,
1325    ) -> Result<()> {
1326        if matching_row_ids.is_empty() {
1327            return Ok(());
1328        }
1329
1330        if !allow_duplicate_unique_noop || matching_row_ids.len() != 1 {
1331            return Err(Error::UniqueViolation {
1332                table: table.to_string(),
1333                column: column.to_string(),
1334            });
1335        }
1336
1337        let matched_row_id = matching_row_ids[0];
1338
1339        if let Some(existing_row_id) = duplicate_unique_row_id {
1340            if *existing_row_id != matched_row_id {
1341                return Err(Error::UniqueViolation {
1342                    table: table.to_string(),
1343                    column: column.to_string(),
1344                });
1345            }
1346        } else {
1347            *duplicate_unique_row_id = Some(matched_row_id);
1348        }
1349
1350        Ok(())
1351    }
1352
1353    /// Returns true if `table` has any single-column index covering `column`.
1354    fn index_covers_column(&self, table: &str, column: &str) -> bool {
1355        let indexes = self.relational_store.indexes.read();
1356        indexes
1357            .iter()
1358            .any(|((t, _), idx)| t == table && idx.columns.len() == 1 && idx.columns[0].0 == column)
1359    }
1360
1361    /// Returns true if `table` has an index whose first-column prefix contains
1362    /// exactly the columns in `cols` (same order).
1363    fn index_covers_composite(&self, table: &str, cols: &[String]) -> bool {
1364        let indexes = self.relational_store.indexes.read();
1365        indexes.iter().any(|((t, _), idx)| {
1366            t == table
1367                && idx.columns.len() >= cols.len()
1368                && idx
1369                    .columns
1370                    .iter()
1371                    .zip(cols.iter())
1372                    .all(|((c, _), want)| c == want)
1373        })
1374    }
1375
1376    /// Look up `(table, column) = value` using a single-column index when one
1377    /// exists, layered with the tx's staged inserts and deletes.
1378    fn probe_column_for_constraint(
1379        &self,
1380        tx: TxId,
1381        table: &str,
1382        column: &str,
1383        value: &Value,
1384        snapshot: SnapshotId,
1385        skip_row_id: Option<RowId>,
1386    ) -> Result<ConstraintProbe> {
1387        use contextdb_core::{DirectedValue, TotalOrdAsc};
1388        let (tx_staged_deletes, staged_overlap) = self.tx_mgr.with_write_set(tx, |ws| {
1389            // Rows this tx has already staged for delete must not be treated as
1390            // obstructions by the constraint probe. The old index entry still
1391            // looks visible at the committed-watermark snapshot until commit.
1392            let deletes = if ws.relational_deletes.is_empty() {
1393                std::collections::HashSet::new()
1394            } else {
1395                ws.relational_deletes
1396                    .iter()
1397                    .filter(|(t, _, _)| t == table)
1398                    .map(|(_, row_id, _)| *row_id)
1399                    .collect()
1400            };
1401            let overlap = ws.relational_inserts.iter().find_map(|(t, row)| {
1402                if t != table {
1403                    return None;
1404                }
1405                if let Some(sid) = skip_row_id
1406                    && row.row_id == sid
1407                {
1408                    return None;
1409                }
1410                if row.values.get(column) == Some(value) {
1411                    Some(row.row_id)
1412                } else {
1413                    None
1414                }
1415            });
1416            (deletes, overlap)
1417        })?;
1418        let indexes = self.relational_store.indexes.read();
1419        // Auto constraint indexes have stable names. Try those directly before
1420        // falling back to user-declared single-column indexes.
1421        let table_key = table.to_string();
1422        let pk_key = (table_key.clone(), format!("__pk_{column}"));
1423        let unique_key = (table_key, format!("__unique_{column}"));
1424        let storage = indexes
1425            .get(&pk_key)
1426            .or_else(|| indexes.get(&unique_key))
1427            .or_else(|| {
1428                indexes.iter().find_map(|((t, _), idx)| {
1429                    if t == table && idx.columns.len() == 1 && idx.columns[0].0 == column {
1430                        Some(idx)
1431                    } else {
1432                        None
1433                    }
1434                })
1435            });
1436        let Some(storage) = storage else {
1437            return Ok(ConstraintProbe::NoIndex);
1438        };
1439        let key = vec![DirectedValue::Asc(TotalOrdAsc(value.clone()))];
1440        if let Some(entries) = storage.tree.get(&key) {
1441            for entry in entries {
1442                if let Some(sid) = skip_row_id
1443                    && entry.row_id == sid
1444                {
1445                    continue;
1446                }
1447                if tx_staged_deletes.contains(&entry.row_id) {
1448                    continue;
1449                }
1450                if entry.visible_at(snapshot) {
1451                    return Ok(ConstraintProbe::Match(entry.row_id));
1452                }
1453            }
1454        }
1455        drop(indexes);
1456        Ok(match staged_overlap {
1457            Some(row_id) => ConstraintProbe::Match(row_id),
1458            None => ConstraintProbe::NoMatch,
1459        })
1460    }
1461
1462    /// Probe a composite UNIQUE (a, b, ...) using the first index whose
1463    /// leading prefix matches `cols`. The probe walks the range for the full
1464    /// key prefix.
1465    fn probe_composite_unique(
1466        &self,
1467        tx: TxId,
1468        table: &str,
1469        cols: &[String],
1470        values: &[Value],
1471        snapshot: SnapshotId,
1472        skip_row_id: Option<RowId>,
1473    ) -> Result<Option<RowId>> {
1474        use contextdb_core::{DirectedValue, TotalOrdAsc};
1475        if cols.is_empty() || values.is_empty() || cols.len() != values.len() {
1476            return Ok(None);
1477        }
1478        // Rows this tx has already staged for delete are not obstructions.
1479        let tx_staged_deletes: std::collections::HashSet<RowId> =
1480            self.tx_mgr.with_write_set(tx, |ws| {
1481                ws.relational_deletes
1482                    .iter()
1483                    .filter(|(t, _, _)| t == table)
1484                    .map(|(_, row_id, _)| *row_id)
1485                    .collect()
1486            })?;
1487        let indexes = self.relational_store.indexes.read();
1488        let storage_entry = indexes.iter().find(|((t, _), idx)| {
1489            t == table
1490                && idx.columns.len() >= cols.len()
1491                && idx
1492                    .columns
1493                    .iter()
1494                    .zip(cols.iter())
1495                    .all(|((c, _), w)| c == w)
1496        });
1497        let (_, storage) = match storage_entry {
1498            Some(e) => e,
1499            None => return Ok(None),
1500        };
1501        // Full-prefix walk: range from (val1,...,valN, -inf) to (val1,...,valN, +inf).
1502        // For simplicity, iterate and filter by prefix match.
1503        for (key, entries) in storage.tree.iter() {
1504            if key.len() < cols.len() {
1505                continue;
1506            }
1507            let prefix_match = values.iter().enumerate().all(|(i, v)| match &key[i] {
1508                DirectedValue::Asc(TotalOrdAsc(k)) => k == v,
1509                DirectedValue::Desc(contextdb_core::TotalOrdDesc(k)) => k == v,
1510            });
1511            if !prefix_match {
1512                continue;
1513            }
1514            for entry in entries {
1515                if let Some(sid) = skip_row_id
1516                    && entry.row_id == sid
1517                {
1518                    continue;
1519                }
1520                if tx_staged_deletes.contains(&entry.row_id) {
1521                    continue;
1522                }
1523                if entry.visible_at(snapshot) {
1524                    return Ok(Some(entry.row_id));
1525                }
1526            }
1527        }
1528        drop(indexes);
1529        // Tx-staged inserts.
1530        let overlap = self.tx_mgr.with_write_set(tx, |ws| {
1531            for (t, row) in &ws.relational_inserts {
1532                if t != table {
1533                    continue;
1534                }
1535                if let Some(sid) = skip_row_id
1536                    && row.row_id == sid
1537                {
1538                    continue;
1539                }
1540                let matches = cols
1541                    .iter()
1542                    .zip(values.iter())
1543                    .all(|(c, v)| row.values.get(c) == Some(v));
1544                if matches {
1545                    return Some(row.row_id);
1546                }
1547            }
1548            None
1549        })?;
1550        Ok(overlap)
1551    }
1552
1553    fn validate_foreign_keys_in_tx(&self, tx: TxId) -> Result<()> {
1554        // Constraint probe — FK existence checks must see the committed
1555        // watermark, not any read-side snapshot override.
1556        let snapshot = self.snapshot();
1557        let relational_inserts = self
1558            .tx_mgr
1559            .with_write_set(tx, |ws| ws.relational_inserts.clone())?;
1560
1561        for (table, row) in relational_inserts {
1562            let meta = self
1563                .table_meta(&table)
1564                .ok_or_else(|| Error::TableNotFound(table.clone()))?;
1565            for column in &meta.columns {
1566                let Some(reference) = &column.references else {
1567                    continue;
1568                };
1569                let Some(value) = row.values.get(&column.name) else {
1570                    continue;
1571                };
1572                if *value == Value::Null {
1573                    continue;
1574                }
1575                if self
1576                    .point_lookup_in_tx(tx, &reference.table, &reference.column, value, snapshot)?
1577                    .is_none()
1578                {
1579                    return Err(Error::ForeignKeyViolation {
1580                        table: table.clone(),
1581                        column: column.name.clone(),
1582                        ref_table: reference.table.clone(),
1583                    });
1584                }
1585            }
1586        }
1587
1588        Ok(())
1589    }
1590
1591    pub(crate) fn propagate_state_change_if_needed(
1592        &self,
1593        tx: TxId,
1594        table: &str,
1595        row_uuid: Option<uuid::Uuid>,
1596        new_state: Option<&str>,
1597    ) -> Result<()> {
1598        if let (Some(uuid), Some(state)) = (row_uuid, new_state) {
1599            let already_propagating = self
1600                .tx_mgr
1601                .with_write_set(tx, |ws| ws.propagation_in_progress)?;
1602            if !already_propagating {
1603                self.tx_mgr
1604                    .with_write_set(tx, |ws| ws.propagation_in_progress = true)?;
1605                let propagate_result = self.propagate(tx, table, uuid, state);
1606                self.tx_mgr
1607                    .with_write_set(tx, |ws| ws.propagation_in_progress = false)?;
1608                propagate_result?;
1609            }
1610        }
1611
1612        Ok(())
1613    }
1614
1615    fn propagate(
1616        &self,
1617        tx: TxId,
1618        table: &str,
1619        row_uuid: uuid::Uuid,
1620        new_state: &str,
1621    ) -> Result<()> {
1622        let snapshot = self.snapshot_for_read();
1623        let metas = self.relational_store().table_meta.read().clone();
1624        let mut queue: VecDeque<PropagationQueueEntry> = VecDeque::new();
1625        let mut visited: HashSet<(String, uuid::Uuid)> = HashSet::new();
1626        let mut abort_violation: Option<Error> = None;
1627        let ctx = PropagationContext {
1628            tx,
1629            snapshot,
1630            metas: &metas,
1631        };
1632        let root = PropagationSource {
1633            table,
1634            uuid: row_uuid,
1635            state: new_state,
1636            depth: 0,
1637        };
1638
1639        self.enqueue_fk_children(&ctx, &mut queue, root);
1640        self.enqueue_edge_children(&ctx, &mut queue, root)?;
1641        self.apply_vector_exclusions(&ctx, root)?;
1642
1643        while let Some(entry) = queue.pop_front() {
1644            if !visited.insert((entry.table.clone(), entry.uuid)) {
1645                continue;
1646            }
1647
1648            let Some(meta) = metas.get(&entry.table) else {
1649                continue;
1650            };
1651
1652            let Some(state_machine) = &meta.state_machine else {
1653                let msg = format!(
1654                    "warning: propagation target table {} has no state machine",
1655                    entry.table
1656                );
1657                eprintln!("{msg}");
1658                if entry.abort_on_failure && abort_violation.is_none() {
1659                    abort_violation = Some(Error::PropagationAborted {
1660                        table: entry.table.clone(),
1661                        column: String::new(),
1662                        from: String::new(),
1663                        to: entry.target_state.clone(),
1664                    });
1665                }
1666                continue;
1667            };
1668
1669            let state_column = state_machine.column.clone();
1670            let Some(existing) = self.relational.point_lookup_with_tx(
1671                Some(tx),
1672                &entry.table,
1673                "id",
1674                &Value::Uuid(entry.uuid),
1675                snapshot,
1676            )?
1677            else {
1678                continue;
1679            };
1680
1681            let from_state = existing
1682                .values
1683                .get(&state_column)
1684                .and_then(Value::as_text)
1685                .unwrap_or("")
1686                .to_string();
1687
1688            let mut next_values = existing.values.clone();
1689            next_values.insert(
1690                state_column.clone(),
1691                Value::Text(entry.target_state.clone()),
1692            );
1693
1694            let upsert_outcome =
1695                self.relational
1696                    .upsert(tx, &entry.table, "id", next_values, snapshot);
1697
1698            let reached_state = match upsert_outcome {
1699                Ok(UpsertResult::Updated) => entry.target_state.as_str(),
1700                Ok(UpsertResult::NoOp) | Ok(UpsertResult::Inserted) => continue,
1701                Err(Error::InvalidStateTransition(_)) => {
1702                    eprintln!(
1703                        "warning: skipped invalid propagated transition {}.{} {} -> {}",
1704                        entry.table, state_column, from_state, entry.target_state
1705                    );
1706                    if entry.abort_on_failure && abort_violation.is_none() {
1707                        abort_violation = Some(Error::PropagationAborted {
1708                            table: entry.table.clone(),
1709                            column: state_column.clone(),
1710                            from: from_state,
1711                            to: entry.target_state.clone(),
1712                        });
1713                    }
1714                    continue;
1715                }
1716                Err(err) => return Err(err),
1717            };
1718
1719            self.enqueue_edge_children(
1720                &ctx,
1721                &mut queue,
1722                PropagationSource {
1723                    table: &entry.table,
1724                    uuid: entry.uuid,
1725                    state: reached_state,
1726                    depth: entry.depth,
1727                },
1728            )?;
1729            self.apply_vector_exclusions(
1730                &ctx,
1731                PropagationSource {
1732                    table: &entry.table,
1733                    uuid: entry.uuid,
1734                    state: reached_state,
1735                    depth: entry.depth,
1736                },
1737            )?;
1738
1739            self.enqueue_fk_children(
1740                &ctx,
1741                &mut queue,
1742                PropagationSource {
1743                    table: &entry.table,
1744                    uuid: entry.uuid,
1745                    state: reached_state,
1746                    depth: entry.depth,
1747                },
1748            );
1749        }
1750
1751        if let Some(err) = abort_violation {
1752            return Err(err);
1753        }
1754
1755        Ok(())
1756    }
1757
1758    fn enqueue_fk_children(
1759        &self,
1760        ctx: &PropagationContext<'_>,
1761        queue: &mut VecDeque<PropagationQueueEntry>,
1762        source: PropagationSource<'_>,
1763    ) {
1764        for (owner_table, owner_meta) in ctx.metas {
1765            for rule in &owner_meta.propagation_rules {
1766                let PropagationRule::ForeignKey {
1767                    fk_column,
1768                    referenced_table,
1769                    trigger_state,
1770                    target_state,
1771                    max_depth,
1772                    abort_on_failure,
1773                    ..
1774                } = rule
1775                else {
1776                    continue;
1777                };
1778
1779                if referenced_table != source.table || trigger_state != source.state {
1780                    continue;
1781                }
1782
1783                if source.depth >= *max_depth {
1784                    continue;
1785                }
1786
1787                let rows = match self.relational.scan_filter_with_tx(
1788                    Some(ctx.tx),
1789                    owner_table,
1790                    ctx.snapshot,
1791                    &|row| row.values.get(fk_column) == Some(&Value::Uuid(source.uuid)),
1792                ) {
1793                    Ok(rows) => rows,
1794                    Err(err) => {
1795                        eprintln!(
1796                            "warning: propagation scan failed for {owner_table}.{fk_column}: {err}"
1797                        );
1798                        continue;
1799                    }
1800                };
1801
1802                for row in rows {
1803                    if let Some(id) = row.values.get("id").and_then(Value::as_uuid).copied() {
1804                        queue.push_back(PropagationQueueEntry {
1805                            table: owner_table.clone(),
1806                            uuid: id,
1807                            target_state: target_state.clone(),
1808                            depth: source.depth + 1,
1809                            abort_on_failure: *abort_on_failure,
1810                        });
1811                    }
1812                }
1813            }
1814        }
1815    }
1816
1817    fn enqueue_edge_children(
1818        &self,
1819        ctx: &PropagationContext<'_>,
1820        queue: &mut VecDeque<PropagationQueueEntry>,
1821        source: PropagationSource<'_>,
1822    ) -> Result<()> {
1823        let Some(meta) = ctx.metas.get(source.table) else {
1824            return Ok(());
1825        };
1826
1827        for rule in &meta.propagation_rules {
1828            let PropagationRule::Edge {
1829                edge_type,
1830                direction,
1831                trigger_state,
1832                target_state,
1833                max_depth,
1834                abort_on_failure,
1835            } = rule
1836            else {
1837                continue;
1838            };
1839
1840            if trigger_state != source.state || source.depth >= *max_depth {
1841                continue;
1842            }
1843
1844            let bfs = self.query_bfs(
1845                source.uuid,
1846                Some(std::slice::from_ref(edge_type)),
1847                *direction,
1848                1,
1849                ctx.snapshot,
1850            )?;
1851
1852            for node in bfs.nodes {
1853                if self
1854                    .relational
1855                    .point_lookup_with_tx(
1856                        Some(ctx.tx),
1857                        source.table,
1858                        "id",
1859                        &Value::Uuid(node.id),
1860                        ctx.snapshot,
1861                    )?
1862                    .is_some()
1863                {
1864                    queue.push_back(PropagationQueueEntry {
1865                        table: source.table.to_string(),
1866                        uuid: node.id,
1867                        target_state: target_state.clone(),
1868                        depth: source.depth + 1,
1869                        abort_on_failure: *abort_on_failure,
1870                    });
1871                }
1872            }
1873        }
1874
1875        Ok(())
1876    }
1877
1878    fn apply_vector_exclusions(
1879        &self,
1880        ctx: &PropagationContext<'_>,
1881        source: PropagationSource<'_>,
1882    ) -> Result<()> {
1883        let Some(meta) = ctx.metas.get(source.table) else {
1884            return Ok(());
1885        };
1886
1887        for rule in &meta.propagation_rules {
1888            let PropagationRule::VectorExclusion { trigger_state } = rule else {
1889                continue;
1890            };
1891            if trigger_state != source.state {
1892                continue;
1893            }
1894            for row_id in self.logical_row_ids_for_uuid(ctx.tx, source.table, source.uuid) {
1895                self.delete_vector(ctx.tx, row_id)?;
1896            }
1897        }
1898
1899        Ok(())
1900    }
1901
1902    pub fn delete_row(&self, tx: TxId, table: &str, row_id: RowId) -> Result<()> {
1903        self.relational.delete(tx, table, row_id)
1904    }
1905
1906    pub fn scan(&self, table: &str, snapshot: SnapshotId) -> Result<Vec<VersionedRow>> {
1907        self.relational.scan(table, snapshot)
1908    }
1909
1910    /// Scan a table with visibility over the tx's in-flight write-set layered on
1911    /// top of the committed snapshot.
1912    pub(crate) fn scan_in_tx(
1913        &self,
1914        tx: TxId,
1915        table: &str,
1916        snapshot: SnapshotId,
1917    ) -> Result<Vec<VersionedRow>> {
1918        self.relational.scan_with_tx(Some(tx), table, snapshot)
1919    }
1920
1921    /// Compute the in-tx overlay (deleted row_ids + matching staged inserts)
1922    /// for an index-driven scan of `table` matching `shape` on `column`.
1923    /// Internal helper for the IndexScan executor arm.
1924    pub(crate) fn index_scan_tx_overlay(
1925        &self,
1926        tx: TxId,
1927        table: &str,
1928        column: &str,
1929        shape: &crate::executor::IndexPredicateShape,
1930    ) -> Result<IndexScanTxOverlay> {
1931        use crate::executor::{IndexPredicateShape, range_includes};
1932        let mut overlay = IndexScanTxOverlay::default();
1933        self.tx_mgr.with_write_set(tx, |ws| {
1934            for (t, _row_id, _) in &ws.relational_deletes {
1935                if t == table {
1936                    overlay.deleted_row_ids.insert(*_row_id);
1937                }
1938            }
1939            for (t, row) in &ws.relational_inserts {
1940                if t != table {
1941                    continue;
1942                }
1943                let v = row.values.get(column).cloned().unwrap_or(Value::Null);
1944                let include = match shape {
1945                    IndexPredicateShape::Equality(target) => v == *target,
1946                    IndexPredicateShape::NotEqual(target) => v != *target,
1947                    IndexPredicateShape::InList(list) => list.contains(&v),
1948                    IndexPredicateShape::Range { lower, upper } => range_includes(&v, lower, upper),
1949                    IndexPredicateShape::IsNull => v == Value::Null,
1950                    IndexPredicateShape::IsNotNull => v != Value::Null,
1951                };
1952                if include {
1953                    overlay.matching_inserts.push(row.clone());
1954                }
1955            }
1956        })?;
1957        Ok(overlay)
1958    }
1959
1960    pub fn scan_filter(
1961        &self,
1962        table: &str,
1963        snapshot: SnapshotId,
1964        predicate: &dyn Fn(&VersionedRow) -> bool,
1965    ) -> Result<Vec<VersionedRow>> {
1966        self.relational.scan_filter(table, snapshot, predicate)
1967    }
1968
1969    pub fn point_lookup(
1970        &self,
1971        table: &str,
1972        col: &str,
1973        value: &Value,
1974        snapshot: SnapshotId,
1975    ) -> Result<Option<VersionedRow>> {
1976        self.relational.point_lookup(table, col, value, snapshot)
1977    }
1978
1979    pub(crate) fn point_lookup_in_tx(
1980        &self,
1981        tx: TxId,
1982        table: &str,
1983        col: &str,
1984        value: &Value,
1985        snapshot: SnapshotId,
1986    ) -> Result<Option<VersionedRow>> {
1987        self.relational
1988            .point_lookup_with_tx(Some(tx), table, col, value, snapshot)
1989    }
1990
1991    pub(crate) fn logical_row_ids_for_uuid(
1992        &self,
1993        tx: TxId,
1994        table: &str,
1995        uuid: uuid::Uuid,
1996    ) -> Vec<RowId> {
1997        let mut row_ids = HashSet::new();
1998
1999        if let Some(rows) = self.relational_store.tables.read().get(table) {
2000            for row in rows {
2001                if row.values.get("id") == Some(&Value::Uuid(uuid)) {
2002                    row_ids.insert(row.row_id);
2003                }
2004            }
2005        }
2006
2007        let _ = self.tx_mgr.with_write_set(tx, |ws| {
2008            for (insert_table, row) in &ws.relational_inserts {
2009                if insert_table == table && row.values.get("id") == Some(&Value::Uuid(uuid)) {
2010                    row_ids.insert(row.row_id);
2011                }
2012            }
2013        });
2014
2015        row_ids.into_iter().collect()
2016    }
2017
2018    pub fn insert_edge(
2019        &self,
2020        tx: TxId,
2021        source: NodeId,
2022        target: NodeId,
2023        edge_type: EdgeType,
2024        properties: HashMap<String, Value>,
2025    ) -> Result<bool> {
2026        let bytes = estimate_edge_bytes(source, target, &edge_type, &properties);
2027        self.accountant.try_allocate_for(
2028            bytes,
2029            "graph_insert",
2030            "insert_edge",
2031            "Reduce edge fan-out or raise MEMORY_LIMIT before inserting more graph edges.",
2032        )?;
2033
2034        match self
2035            .graph
2036            .insert_edge(tx, source, target, edge_type, properties)
2037        {
2038            Ok(inserted) => {
2039                if !inserted {
2040                    self.accountant.release(bytes);
2041                }
2042                Ok(inserted)
2043            }
2044            Err(err) => {
2045                self.accountant.release(bytes);
2046                Err(err)
2047            }
2048        }
2049    }
2050
2051    pub fn delete_edge(
2052        &self,
2053        tx: TxId,
2054        source: NodeId,
2055        target: NodeId,
2056        edge_type: &str,
2057    ) -> Result<()> {
2058        self.graph.delete_edge(tx, source, target, edge_type)
2059    }
2060
2061    pub fn query_bfs(
2062        &self,
2063        start: NodeId,
2064        edge_types: Option<&[EdgeType]>,
2065        direction: Direction,
2066        max_depth: u32,
2067        snapshot: SnapshotId,
2068    ) -> Result<TraversalResult> {
2069        self.graph
2070            .bfs(start, edge_types, direction, 1, max_depth, snapshot)
2071    }
2072
2073    pub fn edge_count(
2074        &self,
2075        source: NodeId,
2076        edge_type: &str,
2077        snapshot: SnapshotId,
2078    ) -> Result<usize> {
2079        Ok(self.graph.edge_count(source, edge_type, snapshot))
2080    }
2081
2082    pub fn get_edge_properties(
2083        &self,
2084        source: NodeId,
2085        target: NodeId,
2086        edge_type: &str,
2087        snapshot: SnapshotId,
2088    ) -> Result<Option<HashMap<String, Value>>> {
2089        let props = self
2090            .graph_store
2091            .forward_adj
2092            .read()
2093            .get(&source)
2094            .and_then(|entries| {
2095                entries
2096                    .iter()
2097                    .rev()
2098                    .find(|entry| {
2099                        entry.target == target
2100                            && entry.edge_type == edge_type
2101                            && entry.visible_at(snapshot)
2102                    })
2103                    .map(|entry| entry.properties.clone())
2104            });
2105        Ok(props)
2106    }
2107
2108    pub fn insert_vector(&self, tx: TxId, row_id: RowId, vector: Vec<f32>) -> Result<()> {
2109        let bytes = estimate_vector_bytes(&vector);
2110        self.accountant.try_allocate_for(
2111            bytes,
2112            "insert",
2113            "vector_insert",
2114            "Reduce vector dimensionality, insert fewer rows, or raise MEMORY_LIMIT.",
2115        )?;
2116        self.vector
2117            .insert_vector(tx, row_id, vector)
2118            .inspect_err(|_| self.accountant.release(bytes))
2119    }
2120
2121    pub fn delete_vector(&self, tx: TxId, row_id: RowId) -> Result<()> {
2122        self.vector.delete_vector(tx, row_id)
2123    }
2124
2125    pub fn query_vector(
2126        &self,
2127        query: &[f32],
2128        k: usize,
2129        candidates: Option<&RoaringTreemap>,
2130        snapshot: SnapshotId,
2131    ) -> Result<Vec<(RowId, f32)>> {
2132        self.vector.search(query, k, candidates, snapshot)
2133    }
2134
2135    pub fn has_live_vector(&self, row_id: RowId, snapshot: SnapshotId) -> bool {
2136        self.vector_store
2137            .vectors
2138            .read()
2139            .iter()
2140            .any(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
2141    }
2142
2143    pub fn live_vector_entry(&self, row_id: RowId, snapshot: SnapshotId) -> Option<VectorEntry> {
2144        self.vector_store
2145            .vectors
2146            .read()
2147            .iter()
2148            .rev()
2149            .find(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
2150            .cloned()
2151    }
2152
2153    pub(crate) fn drop_table_aux_state(&self, table: &str) {
2154        let snapshot = self.snapshot_for_read();
2155        let rows = self.scan(table, snapshot).unwrap_or_default();
2156        let row_ids: HashSet<RowId> = rows.iter().map(|row| row.row_id).collect();
2157        let edge_keys: HashSet<(NodeId, EdgeType, NodeId)> = rows
2158            .iter()
2159            .filter_map(|row| {
2160                match (
2161                    row.values.get("source_id").and_then(Value::as_uuid),
2162                    row.values.get("target_id").and_then(Value::as_uuid),
2163                    row.values.get("edge_type").and_then(Value::as_text),
2164                ) {
2165                    (Some(source), Some(target), Some(edge_type)) => {
2166                        Some((*source, edge_type.to_string(), *target))
2167                    }
2168                    _ => None,
2169                }
2170            })
2171            .collect();
2172
2173        if !row_ids.is_empty() {
2174            let mut vectors = self.vector_store.vectors.write();
2175            vectors.retain(|entry| !row_ids.contains(&entry.row_id));
2176            self.vector_store.clear_hnsw(self.accountant());
2177        }
2178
2179        if !edge_keys.is_empty() {
2180            {
2181                let mut forward = self.graph_store.forward_adj.write();
2182                for entries in forward.values_mut() {
2183                    entries.retain(|entry| {
2184                        !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
2185                    });
2186                }
2187                forward.retain(|_, entries| !entries.is_empty());
2188            }
2189            {
2190                let mut reverse = self.graph_store.reverse_adj.write();
2191                for entries in reverse.values_mut() {
2192                    entries.retain(|entry| {
2193                        !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
2194                    });
2195                }
2196                reverse.retain(|_, entries| !entries.is_empty());
2197            }
2198        }
2199    }
2200
2201    pub fn table_names(&self) -> Vec<String> {
2202        self.relational_store.table_names()
2203    }
2204
2205    pub fn table_meta(&self, table: &str) -> Option<TableMeta> {
2206        self.relational_store.table_meta(table)
2207    }
2208
2209    /// Execute at a specific snapshot. Threads `snapshot` through via a
2210    /// thread-local override so scans / IndexScans filter visibility using
2211    /// the caller-provided snapshot, not the live committed watermark.
2212    #[doc(hidden)]
2213    pub fn execute_at_snapshot(
2214        &self,
2215        sql: &str,
2216        params: &HashMap<String, Value>,
2217        snapshot: SnapshotId,
2218    ) -> Result<QueryResult> {
2219        SNAPSHOT_OVERRIDE.with(|cell| {
2220            let prior = cell.replace(Some(snapshot));
2221            let r = self.execute(sql, params);
2222            cell.replace(prior);
2223            r
2224        })
2225    }
2226
2227    pub(crate) fn snapshot_for_read(&self) -> SnapshotId {
2228        SNAPSHOT_OVERRIDE.with(|cell| cell.borrow().unwrap_or_else(|| self.snapshot()))
2229    }
2230
2231    /// Return the row changes since `since`. Walks `change_log` for
2232    /// `RowInsert` / `RowDelete` entries whose LSN exceeds `since`, fetches
2233    /// the row values out of the live relational store, and emits a
2234    /// `RowChange` the receiver can replay. row_id order is preserved.
2235    #[doc(hidden)]
2236    pub fn change_log_rows_since(&self, since: Lsn) -> Result<Vec<RowChange>> {
2237        let entries = self.change_log_since(since);
2238        let tables = self.relational_store.tables.read();
2239        let mut out = Vec::new();
2240        for e in entries {
2241            match e {
2242                ChangeLogEntry::RowInsert { table, row_id, lsn } => {
2243                    let Some(rows) = tables.get(&table) else {
2244                        continue;
2245                    };
2246                    let Some(row) = rows.iter().find(|r| r.row_id == row_id) else {
2247                        continue;
2248                    };
2249                    let natural_key = row
2250                        .values
2251                        .get("id")
2252                        .cloned()
2253                        .map(|value| NaturalKey {
2254                            column: "id".to_string(),
2255                            value,
2256                        })
2257                        .unwrap_or_else(|| NaturalKey {
2258                            column: "id".to_string(),
2259                            value: Value::Int64(row_id.0 as i64),
2260                        });
2261                    out.push(RowChange {
2262                        table,
2263                        natural_key,
2264                        values: row.values.clone(),
2265                        deleted: row.deleted_tx.is_some(),
2266                        lsn,
2267                    });
2268                }
2269                ChangeLogEntry::RowDelete {
2270                    table,
2271                    row_id: _,
2272                    natural_key,
2273                    lsn,
2274                } => {
2275                    out.push(RowChange {
2276                        table,
2277                        natural_key,
2278                        values: HashMap::new(),
2279                        deleted: true,
2280                        lsn,
2281                    });
2282                }
2283                _ => {}
2284            }
2285        }
2286        Ok(out)
2287    }
2288
2289    /// Count of base rows the executor touched during the most recent query.
2290    #[doc(hidden)]
2291    pub fn __rows_examined(&self) -> u64 {
2292        self.rows_examined.load(Ordering::SeqCst)
2293    }
2294
2295    #[doc(hidden)]
2296    pub fn __reset_rows_examined(&self) {
2297        self.rows_examined.store(0, Ordering::SeqCst);
2298    }
2299
2300    #[doc(hidden)]
2301    pub fn __bump_rows_examined(&self, delta: u64) {
2302        self.rows_examined.fetch_add(delta, Ordering::SeqCst);
2303    }
2304
2305    /// Count of batch-level `indexes.write()` lock acquisitions since startup.
2306    /// `apply_changes` bumps this once per batch; per-row commits do not.
2307    #[doc(hidden)]
2308    pub fn __index_write_lock_count(&self) -> u64 {
2309        self.relational_store.index_write_lock_count()
2310    }
2311
2312    /// Total entries across every registered index's BTreeMap.
2313    #[doc(hidden)]
2314    pub fn __introspect_indexes_total_entries(&self) -> u64 {
2315        self.relational_store.introspect_indexes_total_entries()
2316    }
2317
2318    /// Probe the constraint-check path for a specific table/column/value.
2319    /// Returns a QueryResult whose trace reflects whether the probe went
2320    /// through an index (IndexScan) or a full scan. Accepts either a
2321    /// single-column index or a composite leading-column match.
2322    #[doc(hidden)]
2323    pub fn __probe_constraint_check(
2324        &self,
2325        table: &str,
2326        column: &str,
2327        value: Value,
2328    ) -> Result<QueryResult> {
2329        let covered = self.index_covers_column(table, column)
2330            || self
2331                .relational_store
2332                .indexes
2333                .read()
2334                .iter()
2335                .any(|((t, _), idx)| {
2336                    t == table && idx.columns.first().is_some_and(|(c, _)| c == column)
2337                });
2338        let trace = if covered {
2339            QueryTrace {
2340                physical_plan: "IndexScan",
2341                index_used: None,
2342                predicates_pushed: Default::default(),
2343                indexes_considered: Default::default(),
2344                sort_elided: false,
2345            }
2346        } else {
2347            QueryTrace::scan()
2348        };
2349        let _ = value;
2350        Ok(QueryResult {
2351            columns: vec![],
2352            rows: vec![],
2353            rows_affected: 0,
2354            trace,
2355            cascade: None,
2356        })
2357    }
2358
2359    /// Run one pruning cycle. Called by the background loop or manually in tests.
2360    pub fn run_pruning_cycle(&self) -> u64 {
2361        let _guard = self.pruning_guard.lock();
2362        prune_expired_rows(
2363            &self.relational_store,
2364            &self.graph_store,
2365            &self.vector_store,
2366            self.accountant(),
2367            self.persistence.as_ref(),
2368            self.sync_watermark(),
2369        )
2370    }
2371
2372    /// Set the pruning loop interval. Test-only API.
2373    pub fn set_pruning_interval(&self, interval: Duration) {
2374        self.stop_pruning_thread();
2375
2376        let shutdown = Arc::new(AtomicBool::new(false));
2377        let relational = self.relational_store.clone();
2378        let graph = self.graph_store.clone();
2379        let vector = self.vector_store.clone();
2380        let accountant = self.accountant.clone();
2381        let persistence = self.persistence.clone();
2382        let sync_watermark = self.sync_watermark.clone();
2383        let pruning_guard = self.pruning_guard.clone();
2384        let thread_shutdown = shutdown.clone();
2385
2386        let handle = thread::spawn(move || {
2387            while !thread_shutdown.load(Ordering::SeqCst) {
2388                {
2389                    let _guard = pruning_guard.lock();
2390                    let _ = prune_expired_rows(
2391                        &relational,
2392                        &graph,
2393                        &vector,
2394                        accountant.as_ref(),
2395                        persistence.as_ref(),
2396                        sync_watermark.load(Ordering::SeqCst),
2397                    );
2398                }
2399                sleep_with_shutdown(&thread_shutdown, interval);
2400            }
2401        });
2402
2403        let mut runtime = self.pruning_runtime.lock();
2404        runtime.shutdown = shutdown;
2405        runtime.handle = Some(handle);
2406    }
2407
2408    pub fn sync_watermark(&self) -> Lsn {
2409        self.sync_watermark.load(Ordering::SeqCst)
2410    }
2411
2412    pub fn set_sync_watermark(&self, watermark: Lsn) {
2413        self.sync_watermark.store(watermark, Ordering::SeqCst);
2414    }
2415
2416    pub fn instance_id(&self) -> uuid::Uuid {
2417        self.instance_id
2418    }
2419
2420    pub fn open_memory_with_plugin_and_accountant(
2421        plugin: Arc<dyn DatabasePlugin>,
2422        accountant: Arc<MemoryAccountant>,
2423    ) -> Result<Self> {
2424        Self::open_memory_internal(plugin, accountant)
2425    }
2426
2427    pub fn open_memory_with_plugin(plugin: Arc<dyn DatabasePlugin>) -> Result<Self> {
2428        let db = Self::open_memory_with_plugin_and_accountant(
2429            plugin,
2430            Arc::new(MemoryAccountant::no_limit()),
2431        )?;
2432        db.plugin.on_open()?;
2433        Ok(db)
2434    }
2435
2436    pub fn close(&self) -> Result<()> {
2437        if self.closed.swap(true, Ordering::SeqCst) {
2438            return Ok(());
2439        }
2440        let tx = self.session_tx.lock().take();
2441        if let Some(tx) = tx {
2442            self.rollback(tx)?;
2443        }
2444        self.stop_pruning_thread();
2445        self.subscriptions.lock().subscribers.clear();
2446        if let Some(persistence) = &self.persistence {
2447            persistence.close();
2448        }
2449        self.plugin.on_close()
2450    }
2451
2452    /// File-backed database with custom plugin.
2453    pub fn open_with_plugin(
2454        path: impl AsRef<Path>,
2455        plugin: Arc<dyn DatabasePlugin>,
2456    ) -> Result<Self> {
2457        let db = Self::open_loaded(path, plugin, Arc::new(MemoryAccountant::no_limit()), None)?;
2458        db.plugin.on_open()?;
2459        Ok(db)
2460    }
2461
2462    /// Full constructor with budget.
2463    pub fn open_with_config(
2464        path: impl AsRef<Path>,
2465        plugin: Arc<dyn DatabasePlugin>,
2466        accountant: Arc<MemoryAccountant>,
2467    ) -> Result<Self> {
2468        Self::open_with_config_and_disk_limit(path, plugin, accountant, None)
2469    }
2470
2471    pub fn open_with_config_and_disk_limit(
2472        path: impl AsRef<Path>,
2473        plugin: Arc<dyn DatabasePlugin>,
2474        accountant: Arc<MemoryAccountant>,
2475        startup_disk_limit: Option<u64>,
2476    ) -> Result<Self> {
2477        let path = path.as_ref();
2478        if path.as_os_str() == ":memory:" {
2479            return Self::open_memory_with_plugin_and_accountant(plugin, accountant);
2480        }
2481        let accountant = if let Some(limit) = persisted_memory_limit(path)? {
2482            let usage = accountant.usage();
2483            if usage.limit.is_none() && usage.startup_ceiling.is_none() {
2484                Arc::new(MemoryAccountant::with_budget(limit))
2485            } else {
2486                accountant
2487            }
2488        } else {
2489            accountant
2490        };
2491        let db = Self::open_loaded(path, plugin, accountant, startup_disk_limit)?;
2492        db.plugin.on_open()?;
2493        Ok(db)
2494    }
2495
2496    /// In-memory database with budget.
2497    pub fn open_memory_with_accountant(accountant: Arc<MemoryAccountant>) -> Self {
2498        Self::open_memory_internal(Arc::new(CorePlugin), accountant)
2499            .expect("failed to open in-memory database with accountant")
2500    }
2501
2502    /// Access the memory accountant.
2503    pub fn accountant(&self) -> &MemoryAccountant {
2504        &self.accountant
2505    }
2506
2507    fn account_loaded_state(&self) -> Result<()> {
2508        let metadata_bytes = self
2509            .relational_store
2510            .table_meta
2511            .read()
2512            .values()
2513            .fold(0usize, |acc, meta| {
2514                acc.saturating_add(meta.estimated_bytes())
2515            });
2516        self.accountant.try_allocate_for(
2517            metadata_bytes,
2518            "open",
2519            "load_table_metadata",
2520            "Open the database with a larger MEMORY_LIMIT or reduce stored schema metadata.",
2521        )?;
2522
2523        let row_bytes =
2524            self.relational_store
2525                .tables
2526                .read()
2527                .iter()
2528                .fold(0usize, |acc, (table, rows)| {
2529                    let meta = self.table_meta(table);
2530                    acc.saturating_add(rows.iter().fold(0usize, |inner, row| {
2531                        inner.saturating_add(meta.as_ref().map_or_else(
2532                            || row.estimated_bytes(),
2533                            |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
2534                        ))
2535                    }))
2536                });
2537        self.accountant.try_allocate_for(
2538            row_bytes,
2539            "open",
2540            "load_rows",
2541            "Open the database with a larger MEMORY_LIMIT or prune retained rows first.",
2542        )?;
2543
2544        let edge_bytes = self
2545            .graph_store
2546            .forward_adj
2547            .read()
2548            .values()
2549            .flatten()
2550            .filter(|edge| edge.deleted_tx.is_none())
2551            .fold(0usize, |acc, edge| {
2552                acc.saturating_add(edge.estimated_bytes())
2553            });
2554        self.accountant.try_allocate_for(
2555            edge_bytes,
2556            "open",
2557            "load_edges",
2558            "Open the database with a larger MEMORY_LIMIT or reduce graph edge volume.",
2559        )?;
2560
2561        let vector_bytes = self
2562            .vector_store
2563            .vectors
2564            .read()
2565            .iter()
2566            .filter(|entry| entry.deleted_tx.is_none())
2567            .fold(0usize, |acc, entry| {
2568                acc.saturating_add(entry.estimated_bytes())
2569            });
2570        self.accountant.try_allocate_for(
2571            vector_bytes,
2572            "open",
2573            "load_vectors",
2574            "Open the database with a larger MEMORY_LIMIT or reduce stored vector data.",
2575        )?;
2576
2577        Ok(())
2578    }
2579
2580    fn release_insert_allocations(&self, ws: &contextdb_tx::WriteSet) {
2581        for (table, row) in &ws.relational_inserts {
2582            let bytes = self
2583                .table_meta(table)
2584                .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
2585                .unwrap_or_else(|| row.estimated_bytes());
2586            self.accountant.release(bytes);
2587        }
2588
2589        for edge in &ws.adj_inserts {
2590            self.accountant.release(edge.estimated_bytes());
2591        }
2592
2593        for entry in &ws.vector_inserts {
2594            self.accountant.release(entry.estimated_bytes());
2595        }
2596    }
2597
2598    fn release_delete_allocations(&self, ws: &contextdb_tx::WriteSet) {
2599        for (table, row_id, _) in &ws.relational_deletes {
2600            if let Some(row) = self.find_row_by_id(table, *row_id) {
2601                let bytes = self
2602                    .table_meta(table)
2603                    .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
2604                    .unwrap_or_else(|| row.estimated_bytes());
2605                self.accountant.release(bytes);
2606            }
2607        }
2608
2609        for (source, edge_type, target, _) in &ws.adj_deletes {
2610            if let Some(edge) = self.find_edge(source, target, edge_type) {
2611                self.accountant.release(edge.estimated_bytes());
2612            }
2613        }
2614
2615        for (row_id, _) in &ws.vector_deletes {
2616            if let Some(vector) = self.find_vector_by_row_id(*row_id) {
2617                self.accountant.release(vector.estimated_bytes());
2618            }
2619        }
2620
2621        if !ws.vector_deletes.is_empty() {
2622            self.vector_store.clear_hnsw(self.accountant());
2623        }
2624    }
2625
2626    fn find_row_by_id(&self, table: &str, row_id: RowId) -> Option<VersionedRow> {
2627        self.relational_store
2628            .tables
2629            .read()
2630            .get(table)
2631            .and_then(|rows| rows.iter().find(|row| row.row_id == row_id))
2632            .cloned()
2633    }
2634
2635    fn find_vector_by_row_id(&self, row_id: RowId) -> Option<VectorEntry> {
2636        self.vector_store
2637            .vectors
2638            .read()
2639            .iter()
2640            .find(|entry| entry.row_id == row_id)
2641            .cloned()
2642    }
2643
2644    fn find_edge(&self, source: &NodeId, target: &NodeId, edge_type: &str) -> Option<AdjEntry> {
2645        self.graph_store
2646            .forward_adj
2647            .read()
2648            .get(source)
2649            .and_then(|entries| {
2650                entries
2651                    .iter()
2652                    .find(|entry| entry.target == *target && entry.edge_type == edge_type)
2653                    .cloned()
2654            })
2655    }
2656
2657    pub(crate) fn write_set_checkpoint(
2658        &self,
2659        tx: TxId,
2660    ) -> Result<(usize, usize, usize, usize, usize)> {
2661        self.tx_mgr.with_write_set(tx, |ws| {
2662            (
2663                ws.relational_inserts.len(),
2664                ws.relational_deletes.len(),
2665                ws.adj_inserts.len(),
2666                ws.vector_inserts.len(),
2667                ws.vector_deletes.len(),
2668            )
2669        })
2670    }
2671
2672    pub(crate) fn restore_write_set_checkpoint(
2673        &self,
2674        tx: TxId,
2675        checkpoint: (usize, usize, usize, usize, usize),
2676    ) -> Result<()> {
2677        self.tx_mgr.with_write_set(tx, |ws| {
2678            ws.relational_inserts.truncate(checkpoint.0);
2679            ws.relational_deletes.truncate(checkpoint.1);
2680            ws.adj_inserts.truncate(checkpoint.2);
2681            ws.vector_inserts.truncate(checkpoint.3);
2682            ws.vector_deletes.truncate(checkpoint.4);
2683        })
2684    }
2685
2686    /// Get a clone of the current conflict policies.
2687    pub fn conflict_policies(&self) -> ConflictPolicies {
2688        self.conflict_policies.read().clone()
2689    }
2690
2691    /// Set the default conflict policy.
2692    pub fn set_default_conflict_policy(&self, policy: ConflictPolicy) {
2693        self.conflict_policies.write().default = policy;
2694    }
2695
2696    /// Set a per-table conflict policy.
2697    pub fn set_table_conflict_policy(&self, table: &str, policy: ConflictPolicy) {
2698        self.conflict_policies
2699            .write()
2700            .per_table
2701            .insert(table.to_string(), policy);
2702    }
2703
2704    /// Remove a per-table conflict policy override.
2705    pub fn drop_table_conflict_policy(&self, table: &str) {
2706        self.conflict_policies.write().per_table.remove(table);
2707    }
2708
2709    pub fn plugin(&self) -> &dyn DatabasePlugin {
2710        self.plugin.as_ref()
2711    }
2712
2713    pub fn plugin_health(&self) -> PluginHealth {
2714        self.plugin.health()
2715    }
2716
2717    pub fn plugin_describe(&self) -> serde_json::Value {
2718        self.plugin.describe()
2719    }
2720
2721    pub(crate) fn graph(&self) -> &MemGraphExecutor<DynStore> {
2722        &self.graph
2723    }
2724
2725    pub(crate) fn relational_store(&self) -> &Arc<RelationalStore> {
2726        &self.relational_store
2727    }
2728
2729    pub(crate) fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
2730    where
2731        F: FnOnce(Lsn) -> R,
2732    {
2733        self.tx_mgr.allocate_ddl_lsn(f)
2734    }
2735
2736    pub(crate) fn with_commit_lock<F, R>(&self, f: F) -> R
2737    where
2738        F: FnOnce() -> R,
2739    {
2740        self.tx_mgr.with_commit_lock(f)
2741    }
2742
2743    pub(crate) fn log_create_table_ddl(
2744        &self,
2745        name: &str,
2746        meta: &TableMeta,
2747        lsn: Lsn,
2748    ) -> Result<()> {
2749        let change = ddl_change_from_meta(name, meta);
2750        self.ddl_log.write().push((lsn, change.clone()));
2751        if let Some(persistence) = &self.persistence {
2752            persistence.append_ddl_log(lsn, &change)?;
2753        }
2754        Ok(())
2755    }
2756
2757    pub(crate) fn log_drop_table_ddl(&self, name: &str, lsn: Lsn) -> Result<()> {
2758        let change = DdlChange::DropTable {
2759            name: name.to_string(),
2760        };
2761        self.ddl_log.write().push((lsn, change.clone()));
2762        if let Some(persistence) = &self.persistence {
2763            persistence.append_ddl_log(lsn, &change)?;
2764        }
2765        Ok(())
2766    }
2767
2768    pub(crate) fn log_alter_table_ddl(&self, name: &str, meta: &TableMeta, lsn: Lsn) -> Result<()> {
2769        let change = DdlChange::AlterTable {
2770            name: name.to_string(),
2771            columns: meta
2772                .columns
2773                .iter()
2774                .map(|c| {
2775                    (
2776                        c.name.clone(),
2777                        sql_type_for_meta_column(c, &meta.propagation_rules),
2778                    )
2779                })
2780                .collect(),
2781            constraints: create_table_constraints_from_meta(meta),
2782        };
2783        self.ddl_log.write().push((lsn, change.clone()));
2784        if let Some(persistence) = &self.persistence {
2785            persistence.append_ddl_log(lsn, &change)?;
2786        }
2787        Ok(())
2788    }
2789
2790    pub(crate) fn log_create_index_ddl(
2791        &self,
2792        table: &str,
2793        name: &str,
2794        columns: &[(String, contextdb_core::SortDirection)],
2795        lsn: Lsn,
2796    ) -> Result<()> {
2797        let change = DdlChange::CreateIndex {
2798            table: table.to_string(),
2799            name: name.to_string(),
2800            columns: columns.to_vec(),
2801        };
2802        self.ddl_log.write().push((lsn, change.clone()));
2803        if let Some(persistence) = &self.persistence {
2804            persistence.append_ddl_log(lsn, &change)?;
2805        }
2806        Ok(())
2807    }
2808
2809    pub(crate) fn log_drop_index_ddl(&self, table: &str, name: &str, lsn: Lsn) -> Result<()> {
2810        let change = DdlChange::DropIndex {
2811            table: table.to_string(),
2812            name: name.to_string(),
2813        };
2814        self.ddl_log.write().push((lsn, change.clone()));
2815        if let Some(persistence) = &self.persistence {
2816            persistence.append_ddl_log(lsn, &change)?;
2817        }
2818        Ok(())
2819    }
2820
2821    pub(crate) fn persist_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
2822        if let Some(persistence) = &self.persistence {
2823            persistence.flush_table_meta(name, meta)?;
2824        }
2825        Ok(())
2826    }
2827
2828    pub(crate) fn persist_memory_limit(&self, limit: Option<usize>) -> Result<()> {
2829        if let Some(persistence) = &self.persistence {
2830            match limit {
2831                Some(limit) => persistence.flush_config_value("memory_limit", &limit)?,
2832                None => persistence.remove_config_value("memory_limit")?,
2833            }
2834        }
2835        Ok(())
2836    }
2837
2838    pub fn set_disk_limit(&self, limit: Option<u64>) -> Result<()> {
2839        if self.persistence.is_none() {
2840            self.disk_limit.store(0, Ordering::SeqCst);
2841            return Ok(());
2842        }
2843
2844        let ceiling = self.disk_limit_startup_ceiling();
2845        if let Some(ceiling) = ceiling {
2846            match limit {
2847                Some(bytes) if bytes > ceiling => {
2848                    return Err(Error::Other(format!(
2849                        "disk limit {bytes} exceeds startup ceiling {ceiling}"
2850                    )));
2851                }
2852                None => {
2853                    return Err(Error::Other(
2854                        "cannot remove disk limit when a startup ceiling is set".to_string(),
2855                    ));
2856                }
2857                _ => {}
2858            }
2859        }
2860
2861        self.disk_limit.store(limit.unwrap_or(0), Ordering::SeqCst);
2862        Ok(())
2863    }
2864
2865    pub fn disk_limit(&self) -> Option<u64> {
2866        match self.disk_limit.load(Ordering::SeqCst) {
2867            0 => None,
2868            bytes => Some(bytes),
2869        }
2870    }
2871
2872    pub fn disk_limit_startup_ceiling(&self) -> Option<u64> {
2873        match self.disk_limit_startup_ceiling.load(Ordering::SeqCst) {
2874            0 => None,
2875            bytes => Some(bytes),
2876        }
2877    }
2878
2879    pub fn disk_file_size(&self) -> Option<u64> {
2880        self.persistence
2881            .as_ref()
2882            .map(|persistence| std::fs::metadata(persistence.path()).map(|meta| meta.len()))
2883            .transpose()
2884            .ok()
2885            .flatten()
2886    }
2887
2888    pub(crate) fn persist_disk_limit(&self, limit: Option<u64>) -> Result<()> {
2889        if let Some(persistence) = &self.persistence {
2890            match limit {
2891                Some(limit) => persistence.flush_config_value("disk_limit", &limit)?,
2892                None => persistence.remove_config_value("disk_limit")?,
2893            }
2894        }
2895        Ok(())
2896    }
2897
2898    pub fn check_disk_budget(&self, operation: &str) -> Result<()> {
2899        let Some(limit) = self.disk_limit() else {
2900            return Ok(());
2901        };
2902        let Some(current_bytes) = self.disk_file_size() else {
2903            return Ok(());
2904        };
2905        if current_bytes < limit {
2906            return Ok(());
2907        }
2908        Err(Error::DiskBudgetExceeded {
2909            operation: operation.to_string(),
2910            current_bytes,
2911            budget_limit_bytes: limit,
2912            hint: "Reduce retained file-backed data or raise DISK_LIMIT before writing more data."
2913                .to_string(),
2914        })
2915    }
2916
2917    pub fn persisted_sync_watermarks(&self, tenant_id: &str) -> Result<(Lsn, Lsn)> {
2918        let Some(persistence) = &self.persistence else {
2919            return Ok((Lsn(0), Lsn(0)));
2920        };
2921        let push = persistence
2922            .load_config_value::<u64>(&format!("sync_push_watermark:{tenant_id}"))?
2923            .map(Lsn)
2924            .unwrap_or(Lsn(0));
2925        let pull = persistence
2926            .load_config_value::<u64>(&format!("sync_pull_watermark:{tenant_id}"))?
2927            .map(Lsn)
2928            .unwrap_or(Lsn(0));
2929        Ok((push, pull))
2930    }
2931
2932    pub fn persist_sync_push_watermark(&self, tenant_id: &str, watermark: Lsn) -> Result<()> {
2933        if let Some(persistence) = &self.persistence {
2934            persistence
2935                .flush_config_value(&format!("sync_push_watermark:{tenant_id}"), &watermark.0)?;
2936        }
2937        Ok(())
2938    }
2939
2940    pub fn persist_sync_pull_watermark(&self, tenant_id: &str, watermark: Lsn) -> Result<()> {
2941        if let Some(persistence) = &self.persistence {
2942            persistence
2943                .flush_config_value(&format!("sync_pull_watermark:{tenant_id}"), &watermark.0)?;
2944        }
2945        Ok(())
2946    }
2947
2948    pub(crate) fn persist_table_rows(&self, name: &str) -> Result<()> {
2949        if let Some(persistence) = &self.persistence {
2950            let tables = self.relational_store.tables.read();
2951            if let Some(rows) = tables.get(name) {
2952                persistence.rewrite_table_rows(name, rows)?;
2953            }
2954        }
2955        Ok(())
2956    }
2957
2958    pub(crate) fn remove_persisted_table(&self, name: &str) -> Result<()> {
2959        if let Some(persistence) = &self.persistence {
2960            persistence.remove_table_meta(name)?;
2961            persistence.remove_table_data(name)?;
2962        }
2963        Ok(())
2964    }
2965
2966    pub fn change_log_since(&self, since_lsn: Lsn) -> Vec<ChangeLogEntry> {
2967        let log = self.change_log.read();
2968        let start = log.partition_point(|e| e.lsn() <= since_lsn);
2969        log[start..].to_vec()
2970    }
2971
2972    pub fn ddl_log_since(&self, since_lsn: Lsn) -> Vec<DdlChange> {
2973        let ddl = self.ddl_log.read();
2974        let start = ddl.partition_point(|(lsn, _)| *lsn <= since_lsn);
2975        ddl[start..].iter().map(|(_, c)| c.clone()).collect()
2976    }
2977
2978    /// Builds a complete snapshot of all live data as a ChangeSet.
2979    /// Used as fallback when change_log/ddl_log cannot serve a watermark.
2980    #[allow(dead_code)]
2981    fn full_state_snapshot(&self) -> ChangeSet {
2982        let mut rows = Vec::new();
2983        let mut edges = Vec::new();
2984        let mut vectors = Vec::new();
2985        let mut ddl = Vec::new();
2986
2987        let meta_guard = self.relational_store.table_meta.read();
2988        let tables_guard = self.relational_store.tables.read();
2989
2990        // DDL
2991        for (name, meta) in meta_guard.iter() {
2992            ddl.push(ddl_change_from_meta(name, meta));
2993        }
2994
2995        // Rows (live only) — collect row_ids that have live rows for orphan vector filtering
2996        let mut live_row_ids: HashSet<RowId> = HashSet::new();
2997        for (table_name, table_rows) in tables_guard.iter() {
2998            let meta = match meta_guard.get(table_name) {
2999                Some(m) => m,
3000                None => continue,
3001            };
3002            let key_col = meta
3003                .natural_key_column
3004                .clone()
3005                .or_else(|| {
3006                    if meta
3007                        .columns
3008                        .iter()
3009                        .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3010                    {
3011                        Some("id".to_string())
3012                    } else {
3013                        None
3014                    }
3015                })
3016                .or_else(|| {
3017                    meta.columns
3018                        .iter()
3019                        .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
3020                        .map(|c| c.name.clone())
3021                })
3022                .unwrap_or_default();
3023            if key_col.is_empty() {
3024                continue;
3025            }
3026            for row in table_rows.iter().filter(|r| r.deleted_tx.is_none()) {
3027                let key_val = match row.values.get(&key_col) {
3028                    Some(v) => v.clone(),
3029                    None => continue,
3030                };
3031                live_row_ids.insert(row.row_id);
3032                rows.push(RowChange {
3033                    table: table_name.clone(),
3034                    natural_key: NaturalKey {
3035                        column: key_col.clone(),
3036                        value: key_val,
3037                    },
3038                    values: row.values.clone(),
3039                    deleted: false,
3040                    lsn: row.lsn,
3041                });
3042            }
3043        }
3044
3045        drop(tables_guard);
3046        drop(meta_guard);
3047
3048        // Edges (live only)
3049        let fwd = self.graph_store.forward_adj.read();
3050        for (_source, entries) in fwd.iter() {
3051            for entry in entries.iter().filter(|e| e.deleted_tx.is_none()) {
3052                edges.push(EdgeChange {
3053                    source: entry.source,
3054                    target: entry.target,
3055                    edge_type: entry.edge_type.clone(),
3056                    properties: entry.properties.clone(),
3057                    lsn: entry.lsn,
3058                });
3059            }
3060        }
3061        drop(fwd);
3062
3063        // Vectors (live only, skip orphans, first entry per row_id)
3064        let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
3065        let vecs = self.vector_store.vectors.read();
3066        for entry in vecs.iter().filter(|v| v.deleted_tx.is_none()) {
3067            if !live_row_ids.contains(&entry.row_id) {
3068                continue; // skip orphan vectors
3069            }
3070            if !seen_vector_rows.insert(entry.row_id) {
3071                continue; // first live entry per row_id only
3072            }
3073            vectors.push(VectorChange {
3074                row_id: entry.row_id,
3075                vector: entry.vector.clone(),
3076                lsn: entry.lsn,
3077            });
3078        }
3079        drop(vecs);
3080
3081        ChangeSet {
3082            rows,
3083            edges,
3084            vectors,
3085            ddl,
3086        }
3087    }
3088
3089    fn persisted_state_since(&self, since_lsn: Lsn) -> ChangeSet {
3090        if since_lsn == Lsn(0) {
3091            return self.full_state_snapshot();
3092        }
3093
3094        let mut rows = Vec::new();
3095        let mut edges = Vec::new();
3096        let mut vectors = Vec::new();
3097        let ddl = Vec::new();
3098
3099        let meta_guard = self.relational_store.table_meta.read();
3100        let tables_guard = self.relational_store.tables.read();
3101
3102        let mut live_row_ids: HashSet<RowId> = HashSet::new();
3103        for (table_name, table_rows) in tables_guard.iter() {
3104            let meta = match meta_guard.get(table_name) {
3105                Some(meta) => meta,
3106                None => continue,
3107            };
3108            let key_col = meta
3109                .natural_key_column
3110                .clone()
3111                .or_else(|| {
3112                    if meta
3113                        .columns
3114                        .iter()
3115                        .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3116                    {
3117                        Some("id".to_string())
3118                    } else {
3119                        None
3120                    }
3121                })
3122                .or_else(|| {
3123                    meta.columns
3124                        .iter()
3125                        .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
3126                        .map(|c| c.name.clone())
3127                })
3128                .unwrap_or_default();
3129            if key_col.is_empty() {
3130                continue;
3131            }
3132            for row in table_rows.iter().filter(|row| row.deleted_tx.is_none()) {
3133                live_row_ids.insert(row.row_id);
3134                if row.lsn <= since_lsn {
3135                    continue;
3136                }
3137                let key_val = match row.values.get(&key_col) {
3138                    Some(value) => value.clone(),
3139                    None => continue,
3140                };
3141                rows.push(RowChange {
3142                    table: table_name.clone(),
3143                    natural_key: NaturalKey {
3144                        column: key_col.clone(),
3145                        value: key_val,
3146                    },
3147                    values: row.values.clone(),
3148                    deleted: false,
3149                    lsn: row.lsn,
3150                });
3151            }
3152        }
3153        drop(tables_guard);
3154        drop(meta_guard);
3155
3156        let fwd = self.graph_store.forward_adj.read();
3157        for entries in fwd.values() {
3158            for entry in entries
3159                .iter()
3160                .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
3161            {
3162                edges.push(EdgeChange {
3163                    source: entry.source,
3164                    target: entry.target,
3165                    edge_type: entry.edge_type.clone(),
3166                    properties: entry.properties.clone(),
3167                    lsn: entry.lsn,
3168                });
3169            }
3170        }
3171        drop(fwd);
3172
3173        let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
3174        let vecs = self.vector_store.vectors.read();
3175        for entry in vecs
3176            .iter()
3177            .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
3178        {
3179            if !live_row_ids.contains(&entry.row_id) {
3180                continue;
3181            }
3182            if !seen_vector_rows.insert(entry.row_id) {
3183                continue;
3184            }
3185            vectors.push(VectorChange {
3186                row_id: entry.row_id,
3187                vector: entry.vector.clone(),
3188                lsn: entry.lsn,
3189            });
3190        }
3191        drop(vecs);
3192
3193        ChangeSet {
3194            rows,
3195            edges,
3196            vectors,
3197            ddl,
3198        }
3199    }
3200
3201    fn preflight_sync_apply_memory(
3202        &self,
3203        changes: &ChangeSet,
3204        policies: &ConflictPolicies,
3205    ) -> Result<()> {
3206        let usage = self.accountant.usage();
3207        let Some(limit) = usage.limit else {
3208            return Ok(());
3209        };
3210        let available = usage.available.unwrap_or(limit);
3211        let mut required = 0usize;
3212
3213        for row in &changes.rows {
3214            if row.deleted || row.values.is_empty() {
3215                continue;
3216            }
3217
3218            let policy = policies
3219                .per_table
3220                .get(&row.table)
3221                .copied()
3222                .unwrap_or(policies.default);
3223            let existing = self.point_lookup(
3224                &row.table,
3225                &row.natural_key.column,
3226                &row.natural_key.value,
3227                self.snapshot(),
3228            )?;
3229
3230            if existing.is_some()
3231                && matches!(
3232                    policy,
3233                    ConflictPolicy::InsertIfNotExists | ConflictPolicy::ServerWins
3234                )
3235            {
3236                continue;
3237            }
3238
3239            required = required.saturating_add(
3240                self.table_meta(&row.table)
3241                    .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
3242                    .unwrap_or_else(|| estimate_row_value_bytes(&row.values)),
3243            );
3244        }
3245
3246        for edge in &changes.edges {
3247            required = required.saturating_add(
3248                96 + edge.edge_type.len().saturating_mul(16)
3249                    + estimate_row_value_bytes(&edge.properties),
3250            );
3251        }
3252
3253        for vector in &changes.vectors {
3254            if vector.vector.is_empty() {
3255                continue;
3256            }
3257            required = required.saturating_add(
3258                24 + vector
3259                    .vector
3260                    .len()
3261                    .saturating_mul(std::mem::size_of::<f32>()),
3262            );
3263        }
3264
3265        if required > available {
3266            return Err(Error::MemoryBudgetExceeded {
3267                subsystem: "sync".to_string(),
3268                operation: "apply_changes".to_string(),
3269                requested_bytes: required,
3270                available_bytes: available,
3271                budget_limit_bytes: limit,
3272                hint:
3273                    "Reduce sync batch size, split the push, or raise MEMORY_LIMIT on the server."
3274                        .to_string(),
3275            });
3276        }
3277
3278        Ok(())
3279    }
3280
3281    /// Extracts changes from this database since the given LSN.
3282    pub fn changes_since(&self, since_lsn: Lsn) -> ChangeSet {
3283        // Future watermark guard
3284        if since_lsn > self.current_lsn() {
3285            return ChangeSet::default();
3286        }
3287
3288        // Check if the ephemeral logs can serve the requested watermark.
3289        // After restart, both logs are empty but stores may have data — fall back to snapshot.
3290        let log = self.change_log.read();
3291        let change_first_lsn = log.first().map(|e| e.lsn());
3292        let change_log_empty = log.is_empty();
3293        drop(log);
3294
3295        let ddl = self.ddl_log.read();
3296        let ddl_first_lsn = ddl.first().map(|(lsn, _)| *lsn);
3297        let ddl_log_empty = ddl.is_empty();
3298        drop(ddl);
3299
3300        let has_table_data = !self
3301            .relational_store
3302            .tables
3303            .read()
3304            .values()
3305            .all(|rows| rows.is_empty());
3306        let has_table_meta = !self.relational_store.table_meta.read().is_empty();
3307
3308        // If both logs are empty but stores have data → post-restart, derive deltas from
3309        // persisted row/edge/vector LSNs instead of replaying a full snapshot.
3310        if change_log_empty && ddl_log_empty && (has_table_data || has_table_meta) {
3311            return self.persisted_state_since(since_lsn);
3312        }
3313
3314        // If logs have entries, check the minimum first-LSN across both covers since_lsn
3315        let min_first_lsn = match (change_first_lsn, ddl_first_lsn) {
3316            (Some(c), Some(d)) => Some(c.min(d)),
3317            (Some(c), None) => Some(c),
3318            (None, Some(d)) => Some(d),
3319            (None, None) => None, // both empty, stores empty — nothing to serve
3320        };
3321
3322        if min_first_lsn.is_some_and(|min_lsn| min_lsn.0 > since_lsn.0 + 1) {
3323            // Log doesn't cover since_lsn — derive the delta from persisted state.
3324            return self.persisted_state_since(since_lsn);
3325        }
3326
3327        let (ddl, change_entries) = self.with_commit_lock(|| {
3328            let ddl = self.ddl_log_since(since_lsn);
3329            let changes = self.change_log_since(since_lsn);
3330            (ddl, changes)
3331        });
3332
3333        let mut rows = Vec::new();
3334        let mut edges = Vec::new();
3335        let mut vectors = Vec::new();
3336
3337        for entry in change_entries {
3338            match entry {
3339                ChangeLogEntry::RowInsert { table, row_id, lsn } => {
3340                    if let Some((natural_key, values)) = self.row_change_values(&table, row_id) {
3341                        rows.push(RowChange {
3342                            table,
3343                            natural_key,
3344                            values,
3345                            deleted: false,
3346                            lsn,
3347                        });
3348                    }
3349                }
3350                ChangeLogEntry::RowDelete {
3351                    table,
3352                    natural_key,
3353                    lsn,
3354                    ..
3355                } => {
3356                    let mut values = HashMap::new();
3357                    values.insert("__deleted".to_string(), Value::Bool(true));
3358                    rows.push(RowChange {
3359                        table,
3360                        natural_key,
3361                        values,
3362                        deleted: true,
3363                        lsn,
3364                    });
3365                }
3366                ChangeLogEntry::EdgeInsert {
3367                    source,
3368                    target,
3369                    edge_type,
3370                    lsn,
3371                } => {
3372                    let properties = self
3373                        .edge_properties(source, target, &edge_type, lsn)
3374                        .unwrap_or_default();
3375                    edges.push(EdgeChange {
3376                        source,
3377                        target,
3378                        edge_type,
3379                        properties,
3380                        lsn,
3381                    });
3382                }
3383                ChangeLogEntry::EdgeDelete {
3384                    source,
3385                    target,
3386                    edge_type,
3387                    lsn,
3388                } => {
3389                    let mut properties = HashMap::new();
3390                    properties.insert("__deleted".to_string(), Value::Bool(true));
3391                    edges.push(EdgeChange {
3392                        source,
3393                        target,
3394                        edge_type,
3395                        properties,
3396                        lsn,
3397                    });
3398                }
3399                ChangeLogEntry::VectorInsert { row_id, lsn } => {
3400                    if let Some(vector) = self.vector_for_row_lsn(row_id, lsn) {
3401                        vectors.push(VectorChange {
3402                            row_id,
3403                            vector,
3404                            lsn,
3405                        });
3406                    }
3407                }
3408                ChangeLogEntry::VectorDelete { row_id, lsn } => vectors.push(VectorChange {
3409                    row_id,
3410                    vector: Vec::new(),
3411                    lsn,
3412                }),
3413            }
3414        }
3415
3416        // Deduplicate upserts: when a RowDelete is followed by a RowInsert for the same
3417        // (table, natural_key), the delete is part of an upsert — remove it.
3418        // Only remove a delete if there is a non-delete entry with a HIGHER LSN
3419        // (i.e., the insert came after the delete, indicating an upsert).
3420        // If the insert has a lower LSN, the delete is genuine and must be kept.
3421        let insert_max_lsn: HashMap<(String, String, String), Lsn> = {
3422            let mut map: HashMap<(String, String, String), Lsn> = HashMap::new();
3423            for r in rows.iter().filter(|r| !r.deleted) {
3424                let key = (
3425                    r.table.clone(),
3426                    r.natural_key.column.clone(),
3427                    format!("{:?}", r.natural_key.value),
3428                );
3429                let entry = map.entry(key).or_insert(Lsn(0));
3430                if r.lsn > *entry {
3431                    *entry = r.lsn;
3432                }
3433            }
3434            map
3435        };
3436        rows.retain(|r| {
3437            if r.deleted {
3438                let key = (
3439                    r.table.clone(),
3440                    r.natural_key.column.clone(),
3441                    format!("{:?}", r.natural_key.value),
3442                );
3443                // Keep the delete unless there is a subsequent insert (higher or equal LSN).
3444                // Equal LSN means the delete+insert are part of the same upsert transaction.
3445                match insert_max_lsn.get(&key) {
3446                    Some(&insert_lsn) => insert_lsn < r.lsn,
3447                    None => true,
3448                }
3449            } else {
3450                true
3451            }
3452        });
3453
3454        ChangeSet {
3455            rows,
3456            edges,
3457            vectors,
3458            ddl,
3459        }
3460    }
3461
3462    /// Returns the current LSN of this database.
3463    pub fn current_lsn(&self) -> Lsn {
3464        self.tx_mgr.current_lsn()
3465    }
3466
3467    /// Returns the highest-committed TxId on this database.
3468    pub fn committed_watermark(&self) -> TxId {
3469        self.tx_mgr.current_tx_max()
3470    }
3471
3472    /// Returns the next TxId the allocator will issue on this database.
3473    pub fn next_tx(&self) -> TxId {
3474        self.tx_mgr.peek_next_tx()
3475    }
3476
3477    /// Subscribe to commit events. Returns a receiver that yields a `CommitEvent`
3478    /// after each commit.
3479    pub fn subscribe(&self) -> Receiver<CommitEvent> {
3480        self.subscribe_with_capacity(DEFAULT_SUBSCRIPTION_CAPACITY)
3481    }
3482
3483    /// Subscribe with a custom channel capacity.
3484    pub fn subscribe_with_capacity(&self, capacity: usize) -> Receiver<CommitEvent> {
3485        let (tx, rx) = mpsc::sync_channel(capacity.max(1));
3486        self.subscriptions.lock().subscribers.push(tx);
3487        rx
3488    }
3489
3490    /// Returns health metrics for the subscription system.
3491    pub fn subscription_health(&self) -> SubscriptionMetrics {
3492        let subscriptions = self.subscriptions.lock();
3493        SubscriptionMetrics {
3494            active_channels: subscriptions.subscribers.len(),
3495            events_sent: subscriptions.events_sent,
3496            events_dropped: subscriptions.events_dropped,
3497        }
3498    }
3499
3500    /// Applies a ChangeSet to this database with the given conflict policies.
3501    pub fn apply_changes(
3502        &self,
3503        mut changes: ChangeSet,
3504        policies: &ConflictPolicies,
3505    ) -> Result<ApplyResult> {
3506        // Per I14: the whole batch takes the index-maintenance lock once.
3507        // Per-row commits reuse the same guard via the per-row apply that
3508        // runs inside the tx manager's commit_mutex, so no second write
3509        // acquisition happens for the scope of this call.
3510        self.relational_store.bump_index_write_lock_count();
3511        self.plugin.on_sync_pull(&mut changes)?;
3512        self.check_disk_budget("sync_pull")?;
3513        self.preflight_sync_apply_memory(&changes, policies)?;
3514
3515        // Pre-scan for TxId overflow so the allocator is untouched on rejection.
3516        for row in &changes.rows {
3517            for v in row.values.values() {
3518                if let Value::TxId(incoming) = v
3519                    && incoming.0 == u64::MAX
3520                {
3521                    return Err(Error::TxIdOverflow {
3522                        table: row.table.clone(),
3523                        incoming: u64::MAX,
3524                    });
3525                }
3526            }
3527        }
3528
3529        let mut tx = self.begin();
3530        let mut result = ApplyResult {
3531            applied_rows: 0,
3532            skipped_rows: 0,
3533            conflicts: Vec::new(),
3534            new_lsn: self.current_lsn(),
3535        };
3536        let vector_row_ids = changes.vectors.iter().map(|v| v.row_id).collect::<Vec<_>>();
3537        let mut vector_row_map: HashMap<RowId, RowId> = HashMap::new();
3538        let mut vector_row_idx = 0usize;
3539        let mut failed_row_ids: HashSet<RowId> = HashSet::new();
3540        let mut table_meta_cache: HashMap<String, Option<TableMeta>> = HashMap::new();
3541        let mut visible_rows_cache: HashMap<String, Vec<VersionedRow>> = HashMap::new();
3542
3543        for ddl in changes.ddl.clone() {
3544            match ddl {
3545                DdlChange::CreateTable {
3546                    name,
3547                    columns,
3548                    constraints,
3549                } => {
3550                    if self.table_meta(&name).is_some() {
3551                        if let Some(local_meta) = self.table_meta(&name) {
3552                            let local_cols: Vec<(String, String)> = local_meta
3553                                .columns
3554                                .iter()
3555                                .map(|c| {
3556                                    let ty = match c.column_type {
3557                                        ColumnType::Integer => "INTEGER".to_string(),
3558                                        ColumnType::Real => "REAL".to_string(),
3559                                        ColumnType::Text => "TEXT".to_string(),
3560                                        ColumnType::Boolean => "BOOLEAN".to_string(),
3561                                        ColumnType::Json => "JSON".to_string(),
3562                                        ColumnType::Uuid => "UUID".to_string(),
3563                                        ColumnType::Vector(dim) => format!("VECTOR({dim})"),
3564                                        ColumnType::Timestamp => "TIMESTAMP".to_string(),
3565                                        ColumnType::TxId => "TXID".to_string(),
3566                                    };
3567                                    (c.name.clone(), ty)
3568                                })
3569                                .collect();
3570                            let remote_cols: Vec<(String, String)> = columns
3571                                .iter()
3572                                .map(|(col_name, col_type)| {
3573                                    let base_type = col_type
3574                                        .split_whitespace()
3575                                        .next()
3576                                        .unwrap_or(col_type)
3577                                        .to_string();
3578                                    (col_name.clone(), base_type)
3579                                })
3580                                .collect();
3581                            let mut local_sorted = local_cols.clone();
3582                            local_sorted.sort();
3583                            let mut remote_sorted = remote_cols.clone();
3584                            remote_sorted.sort();
3585                            if local_sorted != remote_sorted {
3586                                result.conflicts.push(Conflict {
3587                                    natural_key: NaturalKey {
3588                                        column: "table".to_string(),
3589                                        value: Value::Text(name.clone()),
3590                                    },
3591                                    resolution: ConflictPolicy::ServerWins,
3592                                    reason: Some(format!(
3593                                        "schema mismatch: local columns {:?} differ from remote {:?}",
3594                                        local_cols, remote_cols
3595                                    )),
3596                                });
3597                            }
3598                        }
3599                        continue;
3600                    }
3601                    let mut sql = format!(
3602                        "CREATE TABLE {} ({})",
3603                        name,
3604                        columns
3605                            .iter()
3606                            .map(|(col, ty)| format!("{col} {ty}"))
3607                            .collect::<Vec<_>>()
3608                            .join(", ")
3609                    );
3610                    if !constraints.is_empty() {
3611                        sql.push(' ');
3612                        sql.push_str(&constraints.join(" "));
3613                    }
3614                    self.execute_in_tx(tx, &sql, &HashMap::new())?;
3615                    self.clear_statement_cache();
3616                    table_meta_cache.remove(&name);
3617                    visible_rows_cache.remove(&name);
3618                }
3619                DdlChange::DropTable { name } => {
3620                    if self.table_meta(&name).is_some() {
3621                        self.drop_table_aux_state(&name);
3622                        self.relational_store().drop_table(&name);
3623                        self.remove_persisted_table(&name)?;
3624                        self.clear_statement_cache();
3625                    }
3626                    table_meta_cache.remove(&name);
3627                    visible_rows_cache.remove(&name);
3628                }
3629                DdlChange::AlterTable {
3630                    name,
3631                    columns,
3632                    constraints,
3633                } => {
3634                    if self.table_meta(&name).is_none() {
3635                        continue;
3636                    }
3637                    self.relational_store().table_meta.write().remove(&name);
3638                    let mut sql = format!(
3639                        "CREATE TABLE {} ({})",
3640                        name,
3641                        columns
3642                            .iter()
3643                            .map(|(col, ty)| format!("{col} {ty}"))
3644                            .collect::<Vec<_>>()
3645                            .join(", ")
3646                    );
3647                    if !constraints.is_empty() {
3648                        sql.push(' ');
3649                        sql.push_str(&constraints.join(" "));
3650                    }
3651                    self.execute_in_tx(tx, &sql, &HashMap::new())?;
3652                    self.clear_statement_cache();
3653                    table_meta_cache.remove(&name);
3654                    visible_rows_cache.remove(&name);
3655                }
3656                DdlChange::CreateIndex {
3657                    table,
3658                    name,
3659                    columns,
3660                } => {
3661                    // Apply at the receiver: write IndexDecl into
3662                    // TableMeta.indexes, register storage, rebuild over
3663                    // locally-resident rows. Emit a matching DDL log entry.
3664                    // Silently skipping on missing table would hide sync
3665                    // divergence; surface it as TableNotFound so the caller
3666                    // can see which index couldn't land.
3667                    if self.table_meta(&table).is_none() {
3668                        return Err(Error::TableNotFound(table.clone()));
3669                    }
3670                    let already = self
3671                        .table_meta(&table)
3672                        .map(|m| m.indexes.iter().any(|i| i.name == name))
3673                        .unwrap_or(false);
3674                    if !already {
3675                        {
3676                            let store = self.relational_store();
3677                            let mut metas = store.table_meta.write();
3678                            if let Some(m) = metas.get_mut(&table) {
3679                                m.indexes.push(contextdb_core::IndexDecl {
3680                                    name: name.clone(),
3681                                    columns: columns.clone(),
3682                                    kind: contextdb_core::IndexKind::UserDeclared,
3683                                });
3684                            }
3685                        }
3686                        self.relational_store().create_index_storage(
3687                            &table,
3688                            &name,
3689                            columns.clone(),
3690                        );
3691                        self.relational_store().rebuild_index(&table, &name);
3692                        if let Some(table_meta) = self.table_meta(&table) {
3693                            self.persist_table_meta(&table, &table_meta)?;
3694                        }
3695                        self.allocate_ddl_lsn(|lsn| {
3696                            self.log_create_index_ddl(&table, &name, &columns, lsn)
3697                        })?;
3698                        self.clear_statement_cache();
3699                    }
3700                    table_meta_cache.remove(&table);
3701                }
3702                DdlChange::DropIndex { table, name } => {
3703                    if self.table_meta(&table).is_some() {
3704                        let exists = self
3705                            .table_meta(&table)
3706                            .map(|m| m.indexes.iter().any(|i| i.name == name))
3707                            .unwrap_or(false);
3708                        if exists {
3709                            {
3710                                let store = self.relational_store();
3711                                let mut metas = store.table_meta.write();
3712                                if let Some(m) = metas.get_mut(&table) {
3713                                    m.indexes.retain(|i| i.name != name);
3714                                }
3715                            }
3716                            self.relational_store().drop_index_storage(&table, &name);
3717                            if let Some(table_meta) = self.table_meta(&table) {
3718                                self.persist_table_meta(&table, &table_meta)?;
3719                            }
3720                            self.allocate_ddl_lsn(|lsn| {
3721                                self.log_drop_index_ddl(&table, &name, lsn)
3722                            })?;
3723                            self.clear_statement_cache();
3724                        }
3725                    }
3726                    table_meta_cache.remove(&table);
3727                }
3728            }
3729        }
3730
3731        self.preflight_sync_apply_memory(&changes, policies)?;
3732
3733        for row in changes.rows {
3734            if row.values.is_empty() {
3735                result.skipped_rows += 1;
3736                self.commit_with_source(tx, CommitSource::SyncPull)?;
3737                tx = self.begin();
3738                continue;
3739            }
3740
3741            let policy = policies
3742                .per_table
3743                .get(&row.table)
3744                .copied()
3745                .unwrap_or(policies.default);
3746
3747            let existing = cached_point_lookup(
3748                self,
3749                &mut visible_rows_cache,
3750                &row.table,
3751                &row.natural_key.column,
3752                &row.natural_key.value,
3753            )?;
3754            let is_delete = row.deleted;
3755            let row_has_vector = cached_table_meta(self, &mut table_meta_cache, &row.table)
3756                .is_some_and(|meta| {
3757                    meta.columns
3758                        .iter()
3759                        .any(|col| matches!(col.column_type, ColumnType::Vector(_)))
3760                });
3761
3762            if is_delete {
3763                if let Some(local) = existing {
3764                    if row_has_vector
3765                        && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3766                    {
3767                        vector_row_map.insert(*remote_row_id, local.row_id);
3768                        vector_row_idx += 1;
3769                    }
3770                    if let Err(err) = self.delete_row(tx, &row.table, local.row_id) {
3771                        result.conflicts.push(Conflict {
3772                            natural_key: row.natural_key.clone(),
3773                            resolution: policy,
3774                            reason: Some(format!("delete failed: {err}")),
3775                        });
3776                        result.skipped_rows += 1;
3777                    } else {
3778                        remove_cached_row(&mut visible_rows_cache, &row.table, local.row_id);
3779                        result.applied_rows += 1;
3780                    }
3781                } else {
3782                    result.skipped_rows += 1;
3783                }
3784                self.commit_with_source(tx, CommitSource::SyncPull)?;
3785                tx = self.begin();
3786                continue;
3787            }
3788
3789            let mut values = row.values.clone();
3790            values.remove("__deleted");
3791
3792            match (existing, policy) {
3793                (None, _) => {
3794                    if let Some(meta) = cached_table_meta(self, &mut table_meta_cache, &row.table) {
3795                        let mut constraint_error: Option<String> = None;
3796
3797                        for col_def in &meta.columns {
3798                            if !col_def.nullable
3799                                && !col_def.primary_key
3800                                && col_def.default.is_none()
3801                            {
3802                                match values.get(&col_def.name) {
3803                                    None | Some(Value::Null) => {
3804                                        constraint_error = Some(format!(
3805                                            "NOT NULL constraint violated: {}.{}",
3806                                            row.table, col_def.name
3807                                        ));
3808                                        break;
3809                                    }
3810                                    _ => {}
3811                                }
3812                            }
3813                        }
3814
3815                        let has_unique = meta.columns.iter().any(|c| c.unique && !c.primary_key);
3816                        if constraint_error.is_none() && has_unique {
3817                            for col_def in &meta.columns {
3818                                if col_def.unique
3819                                    && !col_def.primary_key
3820                                    && let Some(new_val) = values.get(&col_def.name)
3821                                    && *new_val != Value::Null
3822                                    && cached_visible_rows(
3823                                        self,
3824                                        &mut visible_rows_cache,
3825                                        &row.table,
3826                                    )?
3827                                    .iter()
3828                                    .any(|r| r.values.get(&col_def.name) == Some(new_val))
3829                                {
3830                                    constraint_error = Some(format!(
3831                                        "UNIQUE constraint violated: {}.{}",
3832                                        row.table, col_def.name
3833                                    ));
3834                                    break;
3835                                }
3836                            }
3837                        }
3838
3839                        if let Some(err_msg) = constraint_error {
3840                            result.skipped_rows += 1;
3841                            if row_has_vector
3842                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3843                            {
3844                                failed_row_ids.insert(*remote_row_id);
3845                                vector_row_idx += 1;
3846                            }
3847                            result.conflicts.push(Conflict {
3848                                natural_key: row.natural_key.clone(),
3849                                resolution: policy,
3850                                reason: Some(err_msg),
3851                            });
3852                            self.commit_with_source(tx, CommitSource::SyncPull)?;
3853                            tx = self.begin();
3854                            continue;
3855                        }
3856                    }
3857
3858                    // Sync-apply overflow guard + allocator/watermark advance for Value::TxId cells.
3859                    let mut overflow: Option<Error> = None;
3860                    for v in values.values() {
3861                        if let Value::TxId(incoming) = v
3862                            && let Err(err) = self.tx_mgr.advance_for_sync(&row.table, *incoming)
3863                        {
3864                            overflow = Some(err);
3865                            break;
3866                        }
3867                    }
3868                    if let Some(err) = overflow {
3869                        return Err(err);
3870                    }
3871
3872                    match self.insert_row_for_sync(tx, &row.table, values.clone()) {
3873                        Ok(new_row_id) => {
3874                            record_cached_insert(
3875                                &mut visible_rows_cache,
3876                                &row.table,
3877                                VersionedRow {
3878                                    row_id: new_row_id,
3879                                    values: values.clone(),
3880                                    created_tx: tx,
3881                                    deleted_tx: None,
3882                                    lsn: row.lsn,
3883                                    created_at: None,
3884                                },
3885                            );
3886                            result.applied_rows += 1;
3887                            if row_has_vector
3888                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3889                            {
3890                                vector_row_map.insert(*remote_row_id, new_row_id);
3891                                vector_row_idx += 1;
3892                            }
3893                        }
3894                        Err(err) => {
3895                            if is_fatal_sync_apply_error(&err) {
3896                                return Err(err);
3897                            }
3898                            result.skipped_rows += 1;
3899                            if row_has_vector
3900                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3901                            {
3902                                failed_row_ids.insert(*remote_row_id);
3903                                vector_row_idx += 1;
3904                            }
3905                            result.conflicts.push(Conflict {
3906                                natural_key: row.natural_key.clone(),
3907                                resolution: policy,
3908                                reason: Some(format!("{err}")),
3909                            });
3910                        }
3911                    }
3912                }
3913                (Some(local), ConflictPolicy::InsertIfNotExists) => {
3914                    if row_has_vector
3915                        && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3916                    {
3917                        vector_row_map.insert(*remote_row_id, local.row_id);
3918                        vector_row_idx += 1;
3919                    }
3920                    result.skipped_rows += 1;
3921                }
3922                (Some(_), ConflictPolicy::ServerWins) => {
3923                    result.skipped_rows += 1;
3924                    if row_has_vector
3925                        && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3926                    {
3927                        failed_row_ids.insert(*remote_row_id);
3928                        vector_row_idx += 1;
3929                    }
3930                    result.conflicts.push(Conflict {
3931                        natural_key: row.natural_key.clone(),
3932                        resolution: ConflictPolicy::ServerWins,
3933                        reason: Some("server_wins".to_string()),
3934                    });
3935                }
3936                (Some(local), ConflictPolicy::LatestWins) => {
3937                    // Deterministic tie-break when LSNs match: if both rows carry a
3938                    // `Value::TxId` cell under the same column name, the row with the
3939                    // strictly greater raw u64 wins. Otherwise fall back to the strict
3940                    // "incoming must exceed local" rule.
3941                    let incoming_wins = if row.lsn == local.lsn {
3942                        let mut winner = false;
3943                        for (col, incoming_val) in values.iter() {
3944                            if let (Value::TxId(incoming_tx), Some(Value::TxId(local_tx))) =
3945                                (incoming_val, local.values.get(col))
3946                            {
3947                                if incoming_tx.0 > local_tx.0 {
3948                                    winner = true;
3949                                    break;
3950                                } else if incoming_tx.0 < local_tx.0 {
3951                                    winner = false;
3952                                    break;
3953                                }
3954                            }
3955                        }
3956                        winner
3957                    } else {
3958                        row.lsn > local.lsn
3959                    };
3960
3961                    if !incoming_wins {
3962                        result.skipped_rows += 1;
3963                        if row_has_vector
3964                            && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3965                        {
3966                            failed_row_ids.insert(*remote_row_id);
3967                            vector_row_idx += 1;
3968                        }
3969                        result.conflicts.push(Conflict {
3970                            natural_key: row.natural_key.clone(),
3971                            resolution: ConflictPolicy::LatestWins,
3972                            reason: Some("local_lsn_newer_or_equal".to_string()),
3973                        });
3974                    } else {
3975                        // State machine conflict detection
3976                        if let Some(meta) = self.table_meta(&row.table)
3977                            && let Some(sm) = &meta.state_machine
3978                        {
3979                            let sm_col = sm.column.clone();
3980                            let transitions = sm.transitions.clone();
3981                            let incoming_state = values.get(&sm_col).and_then(|v| match v {
3982                                Value::Text(s) => Some(s.clone()),
3983                                _ => None,
3984                            });
3985                            let local_state = local.values.get(&sm_col).and_then(|v| match v {
3986                                Value::Text(s) => Some(s.clone()),
3987                                _ => None,
3988                            });
3989
3990                            if let (Some(incoming), Some(current)) = (incoming_state, local_state) {
3991                                // Check if the transition from current to incoming is valid
3992                                let valid = transitions
3993                                    .get(&current)
3994                                    .is_some_and(|targets| targets.contains(&incoming));
3995                                if !valid && incoming != current {
3996                                    result.skipped_rows += 1;
3997                                    if row_has_vector
3998                                        && let Some(remote_row_id) =
3999                                            vector_row_ids.get(vector_row_idx)
4000                                    {
4001                                        failed_row_ids.insert(*remote_row_id);
4002                                        vector_row_idx += 1;
4003                                    }
4004                                    result.conflicts.push(Conflict {
4005                                        natural_key: row.natural_key.clone(),
4006                                        resolution: ConflictPolicy::LatestWins,
4007                                        reason: Some(format!(
4008                                            "state_machine: invalid transition {} -> {} (current: {})",
4009                                            current, incoming, current
4010                                        )),
4011                                    });
4012                                    self.commit_with_source(tx, CommitSource::SyncPull)?;
4013                                    tx = self.begin();
4014                                    continue;
4015                                }
4016                            }
4017                        }
4018
4019                        // Sync-apply overflow guard + allocator/watermark advance.
4020                        let mut overflow: Option<Error> = None;
4021                        for v in values.values() {
4022                            if let Value::TxId(incoming) = v
4023                                && let Err(err) =
4024                                    self.tx_mgr.advance_for_sync(&row.table, *incoming)
4025                            {
4026                                overflow = Some(err);
4027                                break;
4028                            }
4029                        }
4030                        if let Some(err) = overflow {
4031                            return Err(err);
4032                        }
4033
4034                        match self.upsert_row_for_sync(
4035                            tx,
4036                            &row.table,
4037                            &row.natural_key.column,
4038                            values.clone(),
4039                        ) {
4040                            Ok(_) => {
4041                                visible_rows_cache.remove(&row.table);
4042                                result.applied_rows += 1;
4043                                if row_has_vector
4044                                    && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
4045                                {
4046                                    if let Ok(Some(found)) = self.point_lookup_in_tx(
4047                                        tx,
4048                                        &row.table,
4049                                        &row.natural_key.column,
4050                                        &row.natural_key.value,
4051                                        self.snapshot(),
4052                                    ) {
4053                                        vector_row_map.insert(*remote_row_id, found.row_id);
4054                                    }
4055                                    vector_row_idx += 1;
4056                                }
4057                            }
4058                            Err(err) => {
4059                                if is_fatal_sync_apply_error(&err) {
4060                                    return Err(err);
4061                                }
4062                                result.skipped_rows += 1;
4063                                if row_has_vector
4064                                    && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
4065                                {
4066                                    failed_row_ids.insert(*remote_row_id);
4067                                    vector_row_idx += 1;
4068                                }
4069                                result.conflicts.push(Conflict {
4070                                    natural_key: row.natural_key.clone(),
4071                                    resolution: ConflictPolicy::LatestWins,
4072                                    reason: Some(format!("state_machine_or_constraint: {err}")),
4073                                });
4074                            }
4075                        }
4076                    }
4077                }
4078                (Some(_), ConflictPolicy::EdgeWins) => {
4079                    result.conflicts.push(Conflict {
4080                        natural_key: row.natural_key.clone(),
4081                        resolution: ConflictPolicy::EdgeWins,
4082                        reason: Some("edge_wins".to_string()),
4083                    });
4084                    let mut overflow: Option<Error> = None;
4085                    for v in values.values() {
4086                        if let Value::TxId(incoming) = v
4087                            && let Err(err) = self.tx_mgr.advance_for_sync(&row.table, *incoming)
4088                        {
4089                            overflow = Some(err);
4090                            break;
4091                        }
4092                    }
4093                    if let Some(err) = overflow {
4094                        return Err(err);
4095                    }
4096
4097                    match self.upsert_row_for_sync(
4098                        tx,
4099                        &row.table,
4100                        &row.natural_key.column,
4101                        values.clone(),
4102                    ) {
4103                        Ok(_) => {
4104                            visible_rows_cache.remove(&row.table);
4105                            result.applied_rows += 1;
4106                            if row_has_vector
4107                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
4108                            {
4109                                if let Ok(Some(found)) = self.point_lookup_in_tx(
4110                                    tx,
4111                                    &row.table,
4112                                    &row.natural_key.column,
4113                                    &row.natural_key.value,
4114                                    self.snapshot(),
4115                                ) {
4116                                    vector_row_map.insert(*remote_row_id, found.row_id);
4117                                }
4118                                vector_row_idx += 1;
4119                            }
4120                        }
4121                        Err(err) => {
4122                            if is_fatal_sync_apply_error(&err) {
4123                                return Err(err);
4124                            }
4125                            result.skipped_rows += 1;
4126                            if row_has_vector
4127                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
4128                            {
4129                                failed_row_ids.insert(*remote_row_id);
4130                                vector_row_idx += 1;
4131                            }
4132                            if let Some(last) = result.conflicts.last_mut() {
4133                                last.reason = Some(format!("state_machine_or_constraint: {err}"));
4134                            }
4135                        }
4136                    }
4137                }
4138            }
4139
4140            self.commit_with_source(tx, CommitSource::SyncPull)?;
4141            tx = self.begin();
4142        }
4143
4144        for edge in changes.edges {
4145            let is_delete = matches!(edge.properties.get("__deleted"), Some(Value::Bool(true)));
4146            if is_delete {
4147                let _ = self.delete_edge(tx, edge.source, edge.target, &edge.edge_type);
4148            } else {
4149                let _ = self.insert_edge(
4150                    tx,
4151                    edge.source,
4152                    edge.target,
4153                    edge.edge_type,
4154                    edge.properties,
4155                );
4156            }
4157        }
4158
4159        for vector in changes.vectors {
4160            if failed_row_ids.contains(&vector.row_id) {
4161                continue; // skip vectors for rows that failed to insert
4162            }
4163            let local_row_id = vector_row_map
4164                .get(&vector.row_id)
4165                .copied()
4166                .unwrap_or(vector.row_id);
4167            if vector.vector.is_empty() {
4168                let _ = self.vector.delete_vector(tx, local_row_id);
4169            } else {
4170                if self.has_live_vector(local_row_id, self.snapshot()) {
4171                    let _ = self.delete_vector(tx, local_row_id);
4172                }
4173                if let Err(err) = self.insert_vector(tx, local_row_id, vector.vector)
4174                    && is_fatal_sync_apply_error(&err)
4175                {
4176                    return Err(err);
4177                }
4178            }
4179        }
4180
4181        self.commit_with_source(tx, CommitSource::SyncPull)?;
4182        result.new_lsn = self.current_lsn();
4183        Ok(result)
4184    }
4185
4186    fn row_change_values(
4187        &self,
4188        table: &str,
4189        row_id: RowId,
4190    ) -> Option<(NaturalKey, HashMap<String, Value>)> {
4191        let tables = self.relational_store.tables.read();
4192        let meta = self.relational_store.table_meta.read();
4193        let rows = tables.get(table)?;
4194        let row = rows.iter().find(|r| r.row_id == row_id)?;
4195        let key_col = meta
4196            .get(table)
4197            .and_then(|m| m.natural_key_column.clone())
4198            .or_else(|| {
4199                meta.get(table).and_then(|m| {
4200                    m.columns
4201                        .iter()
4202                        .find(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
4203                        .map(|_| "id".to_string())
4204                })
4205            })
4206            .or_else(|| {
4207                meta.get(table).and_then(|m| {
4208                    m.columns
4209                        .iter()
4210                        .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
4211                        .map(|c| c.name.clone())
4212                })
4213            })?;
4214
4215        let key_val = row.values.get(&key_col)?.clone();
4216        let values = row
4217            .values
4218            .iter()
4219            .map(|(k, v)| (k.clone(), v.clone()))
4220            .collect::<HashMap<_, _>>();
4221        Some((
4222            NaturalKey {
4223                column: key_col,
4224                value: key_val,
4225            },
4226            values,
4227        ))
4228    }
4229
4230    fn edge_properties(
4231        &self,
4232        source: NodeId,
4233        target: NodeId,
4234        edge_type: &str,
4235        lsn: Lsn,
4236    ) -> Option<HashMap<String, Value>> {
4237        self.graph_store
4238            .forward_adj
4239            .read()
4240            .get(&source)
4241            .and_then(|entries| {
4242                entries
4243                    .iter()
4244                    .find(|e| e.target == target && e.edge_type == edge_type && e.lsn == lsn)
4245                    .map(|e| e.properties.clone())
4246            })
4247    }
4248
4249    fn vector_for_row_lsn(&self, row_id: RowId, lsn: Lsn) -> Option<Vec<f32>> {
4250        self.vector_store
4251            .vectors
4252            .read()
4253            .iter()
4254            .find(|v| v.row_id == row_id && v.lsn == lsn)
4255            .map(|v| v.vector.clone())
4256    }
4257}
4258
4259fn strip_internal_row_id(mut qr: QueryResult) -> QueryResult {
4260    if let Some(pos) = qr.columns.iter().position(|c| c == "row_id") {
4261        qr.columns.remove(pos);
4262        for row in &mut qr.rows {
4263            if pos < row.len() {
4264                row.remove(pos);
4265            }
4266        }
4267    }
4268    qr
4269}
4270
4271fn cached_table_meta(
4272    db: &Database,
4273    cache: &mut HashMap<String, Option<TableMeta>>,
4274    table: &str,
4275) -> Option<TableMeta> {
4276    cache
4277        .entry(table.to_string())
4278        .or_insert_with(|| db.table_meta(table))
4279        .clone()
4280}
4281
4282fn cached_visible_rows<'a>(
4283    db: &Database,
4284    cache: &'a mut HashMap<String, Vec<VersionedRow>>,
4285    table: &str,
4286) -> Result<&'a mut Vec<VersionedRow>> {
4287    if !cache.contains_key(table) {
4288        let rows = db.scan(table, db.snapshot())?;
4289        cache.insert(table.to_string(), rows);
4290    }
4291    Ok(cache.get_mut(table).expect("cached visible rows"))
4292}
4293
4294fn cached_point_lookup(
4295    db: &Database,
4296    cache: &mut HashMap<String, Vec<VersionedRow>>,
4297    table: &str,
4298    col: &str,
4299    value: &Value,
4300) -> Result<Option<VersionedRow>> {
4301    let rows = cached_visible_rows(db, cache, table)?;
4302    Ok(rows
4303        .iter()
4304        .find(|r| r.values.get(col) == Some(value))
4305        .cloned())
4306}
4307
4308fn record_cached_insert(
4309    cache: &mut HashMap<String, Vec<VersionedRow>>,
4310    table: &str,
4311    row: VersionedRow,
4312) {
4313    if let Some(rows) = cache.get_mut(table) {
4314        rows.push(row);
4315    }
4316}
4317
4318fn remove_cached_row(cache: &mut HashMap<String, Vec<VersionedRow>>, table: &str, row_id: RowId) {
4319    if let Some(rows) = cache.get_mut(table) {
4320        rows.retain(|row| row.row_id != row_id);
4321    }
4322}
4323
4324fn query_outcome_from_result(result: &Result<QueryResult>) -> QueryOutcome {
4325    match result {
4326        Ok(query_result) => QueryOutcome::Success {
4327            row_count: if query_result.rows.is_empty() {
4328                query_result.rows_affected as usize
4329            } else {
4330                query_result.rows.len()
4331            },
4332        },
4333        Err(error) => QueryOutcome::Error {
4334            error: error.to_string(),
4335        },
4336    }
4337}
4338
4339fn maybe_prebuild_hnsw(vector_store: &VectorStore, accountant: &MemoryAccountant) {
4340    if vector_store.vector_count() >= 1000 {
4341        let entries = vector_store.all_entries();
4342        let dim = vector_store.dimension().unwrap_or(0);
4343        let estimated_bytes = estimate_hnsw_index_bytes(entries.len(), dim);
4344        if accountant
4345            .try_allocate_for(
4346                estimated_bytes,
4347                "vector_index",
4348                "prebuild_hnsw",
4349                "Open the database with a larger MEMORY_LIMIT to enable HNSW indexing.",
4350            )
4351            .is_err()
4352        {
4353            return;
4354        }
4355
4356        let hnsw_opt = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4357            HnswIndex::new(&entries, dim)
4358        }))
4359        .ok();
4360        if hnsw_opt.is_some() {
4361            vector_store.set_hnsw_bytes(estimated_bytes);
4362        } else {
4363            accountant.release(estimated_bytes);
4364        }
4365        let _ = vector_store.hnsw.set(RwLock::new(hnsw_opt));
4366    }
4367}
4368
4369fn estimate_row_bytes_for_meta(
4370    values: &HashMap<ColName, Value>,
4371    meta: &TableMeta,
4372    include_vectors: bool,
4373) -> usize {
4374    let mut bytes = 96usize;
4375    for column in &meta.columns {
4376        let Some(value) = values.get(&column.name) else {
4377            continue;
4378        };
4379        if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
4380            continue;
4381        }
4382        bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
4383    }
4384    bytes
4385}
4386
4387fn estimate_vector_bytes(vector: &[f32]) -> usize {
4388    24 + vector.len().saturating_mul(std::mem::size_of::<f32>())
4389}
4390
4391fn estimate_edge_bytes(
4392    source: NodeId,
4393    target: NodeId,
4394    edge_type: &str,
4395    properties: &HashMap<String, Value>,
4396) -> usize {
4397    AdjEntry {
4398        source,
4399        target,
4400        edge_type: edge_type.to_string(),
4401        properties: properties.clone(),
4402        created_tx: TxId(0),
4403        deleted_tx: None,
4404        lsn: Lsn(0),
4405    }
4406    .estimated_bytes()
4407}
4408
4409fn estimate_hnsw_index_bytes(entry_count: usize, dimension: usize) -> usize {
4410    entry_count
4411        .saturating_mul(dimension)
4412        .saturating_mul(std::mem::size_of::<f32>())
4413        .saturating_mul(3)
4414}
4415
4416impl Drop for Database {
4417    fn drop(&mut self) {
4418        if self.closed.swap(true, Ordering::SeqCst) {
4419            return;
4420        }
4421        let runtime = self.pruning_runtime.get_mut();
4422        runtime.shutdown.store(true, Ordering::SeqCst);
4423        if let Some(handle) = runtime.handle.take() {
4424            let _ = handle.join();
4425        }
4426        self.subscriptions.lock().subscribers.clear();
4427        if let Some(persistence) = &self.persistence {
4428            persistence.close();
4429        }
4430    }
4431}
4432
4433fn sleep_with_shutdown(shutdown: &AtomicBool, interval: Duration) {
4434    let deadline = Instant::now() + interval;
4435    while !shutdown.load(Ordering::SeqCst) {
4436        let now = Instant::now();
4437        if now >= deadline {
4438            break;
4439        }
4440        let remaining = deadline.saturating_duration_since(now);
4441        thread::sleep(remaining.min(Duration::from_millis(50)));
4442    }
4443}
4444
4445fn prune_expired_rows(
4446    relational_store: &Arc<RelationalStore>,
4447    graph_store: &Arc<GraphStore>,
4448    vector_store: &Arc<VectorStore>,
4449    accountant: &MemoryAccountant,
4450    persistence: Option<&Arc<RedbPersistence>>,
4451    sync_watermark: Lsn,
4452) -> u64 {
4453    let now = Wallclock::now();
4454    let metas = relational_store.table_meta.read().clone();
4455    let mut pruned_by_table: HashMap<String, Vec<RowId>> = HashMap::new();
4456    let mut pruned_node_ids = HashSet::new();
4457    let mut released_row_bytes = 0usize;
4458
4459    {
4460        let mut tables = relational_store.tables.write();
4461        for (table_name, rows) in tables.iter_mut() {
4462            let Some(meta) = metas.get(table_name) else {
4463                continue;
4464            };
4465            if meta.default_ttl_seconds.is_none() {
4466                continue;
4467            }
4468
4469            rows.retain(|row| {
4470                if !row_is_prunable(row, meta, now, sync_watermark) {
4471                    return true;
4472                }
4473
4474                pruned_by_table
4475                    .entry(table_name.clone())
4476                    .or_default()
4477                    .push(row.row_id);
4478                released_row_bytes = released_row_bytes
4479                    .saturating_add(estimate_row_bytes_for_meta(&row.values, meta, false));
4480                if let Some(Value::Uuid(id)) = row.values.get("id") {
4481                    pruned_node_ids.insert(*id);
4482                }
4483                false
4484            });
4485        }
4486    }
4487
4488    let pruned_row_ids: HashSet<RowId> = pruned_by_table
4489        .values()
4490        .flat_map(|rows| rows.iter().copied())
4491        .collect();
4492    if pruned_row_ids.is_empty() {
4493        return 0;
4494    }
4495
4496    let mut released_vector_bytes = 0usize;
4497    {
4498        let mut vectors = vector_store.vectors.write();
4499        vectors.retain(|entry| {
4500            if pruned_row_ids.contains(&entry.row_id) {
4501                released_vector_bytes =
4502                    released_vector_bytes.saturating_add(entry.estimated_bytes());
4503                false
4504            } else {
4505                true
4506            }
4507        });
4508    }
4509    if let Some(hnsw) = vector_store.hnsw.get() {
4510        *hnsw.write() = None;
4511    }
4512
4513    let mut released_edge_bytes = 0usize;
4514    {
4515        let mut forward = graph_store.forward_adj.write();
4516        for entries in forward.values_mut() {
4517            entries.retain(|entry| {
4518                if pruned_node_ids.contains(&entry.source)
4519                    || pruned_node_ids.contains(&entry.target)
4520                {
4521                    released_edge_bytes =
4522                        released_edge_bytes.saturating_add(entry.estimated_bytes());
4523                    false
4524                } else {
4525                    true
4526                }
4527            });
4528        }
4529        forward.retain(|_, entries| !entries.is_empty());
4530    }
4531    {
4532        let mut reverse = graph_store.reverse_adj.write();
4533        for entries in reverse.values_mut() {
4534            entries.retain(|entry| {
4535                !pruned_node_ids.contains(&entry.source) && !pruned_node_ids.contains(&entry.target)
4536            });
4537        }
4538        reverse.retain(|_, entries| !entries.is_empty());
4539    }
4540
4541    if let Some(persistence) = persistence {
4542        for table_name in pruned_by_table.keys() {
4543            let rows = relational_store
4544                .tables
4545                .read()
4546                .get(table_name)
4547                .cloned()
4548                .unwrap_or_default();
4549            let _ = persistence.rewrite_table_rows(table_name, &rows);
4550        }
4551
4552        let vectors = vector_store.vectors.read().clone();
4553        let edges = graph_store
4554            .forward_adj
4555            .read()
4556            .values()
4557            .flat_map(|entries| entries.iter().cloned())
4558            .collect::<Vec<_>>();
4559        let _ = persistence.rewrite_vectors(&vectors);
4560        let _ = persistence.rewrite_graph_edges(&edges);
4561    }
4562
4563    accountant.release(
4564        released_row_bytes
4565            .saturating_add(released_vector_bytes)
4566            .saturating_add(released_edge_bytes),
4567    );
4568
4569    pruned_row_ids.len() as u64
4570}
4571
4572fn row_is_prunable(
4573    row: &VersionedRow,
4574    meta: &TableMeta,
4575    now: Wallclock,
4576    sync_watermark: Lsn,
4577) -> bool {
4578    if meta.sync_safe && row.lsn >= sync_watermark {
4579        return false;
4580    }
4581
4582    let Some(default_ttl_seconds) = meta.default_ttl_seconds else {
4583        return false;
4584    };
4585
4586    if let Some(expires_column) = &meta.expires_column {
4587        match row.values.get(expires_column) {
4588            Some(Value::Timestamp(millis)) if *millis == i64::MAX => return false,
4589            Some(Value::Timestamp(millis)) if *millis < 0 => return true,
4590            Some(Value::Timestamp(millis)) => return (*millis as u64) <= now.0,
4591            Some(Value::Null) | None => {}
4592            Some(_) => {}
4593        }
4594    }
4595
4596    let ttl_millis = default_ttl_seconds.saturating_mul(1000);
4597    row.created_at
4598        .map(|created_at| now.0.saturating_sub(created_at.0) > ttl_millis)
4599        .unwrap_or(false)
4600}
4601
4602fn max_tx_across_all(
4603    relational: &RelationalStore,
4604    graph: &GraphStore,
4605    vector: &VectorStore,
4606) -> TxId {
4607    let relational_max = relational
4608        .tables
4609        .read()
4610        .values()
4611        .flat_map(|rows| rows.iter())
4612        .flat_map(|row| std::iter::once(row.created_tx).chain(row.deleted_tx))
4613        .max()
4614        .unwrap_or(TxId(0));
4615    let graph_max = graph
4616        .forward_adj
4617        .read()
4618        .values()
4619        .flat_map(|entries| entries.iter())
4620        .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
4621        .max()
4622        .unwrap_or(TxId(0));
4623    let vector_max = vector
4624        .vectors
4625        .read()
4626        .iter()
4627        .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
4628        .max()
4629        .unwrap_or(TxId(0));
4630
4631    relational_max.max(graph_max).max(vector_max)
4632}
4633
4634fn max_lsn_across_all(
4635    relational: &RelationalStore,
4636    graph: &GraphStore,
4637    vector: &VectorStore,
4638) -> Lsn {
4639    let relational_max = relational
4640        .tables
4641        .read()
4642        .values()
4643        .flat_map(|rows| rows.iter().map(|row| row.lsn))
4644        .max()
4645        .unwrap_or(Lsn(0));
4646    let graph_max = graph
4647        .forward_adj
4648        .read()
4649        .values()
4650        .flat_map(|entries| entries.iter().map(|entry| entry.lsn))
4651        .max()
4652        .unwrap_or(Lsn(0));
4653    let vector_max = vector
4654        .vectors
4655        .read()
4656        .iter()
4657        .map(|entry| entry.lsn)
4658        .max()
4659        .unwrap_or(Lsn(0));
4660
4661    relational_max.max(graph_max).max(vector_max)
4662}
4663
4664fn persisted_memory_limit(path: &Path) -> Result<Option<usize>> {
4665    if !path.exists() {
4666        return Ok(None);
4667    }
4668    let persistence = RedbPersistence::open(path)?;
4669    let limit = persistence.load_config_value::<usize>("memory_limit")?;
4670    persistence.close();
4671    Ok(limit)
4672}
4673
4674fn is_fatal_sync_apply_error(err: &Error) -> bool {
4675    matches!(
4676        err,
4677        Error::MemoryBudgetExceeded { .. } | Error::DiskBudgetExceeded { .. }
4678    )
4679}
4680
4681fn ddl_change_from_create_table(ct: &CreateTable) -> DdlChange {
4682    DdlChange::CreateTable {
4683        name: ct.name.clone(),
4684        columns: ct
4685            .columns
4686            .iter()
4687            .map(|col| {
4688                (
4689                    col.name.clone(),
4690                    sql_type_for_ast_column(col, &ct.propagation_rules),
4691                )
4692            })
4693            .collect(),
4694        constraints: create_table_constraints_from_ast(ct),
4695    }
4696}
4697
4698fn ddl_change_from_meta(name: &str, meta: &TableMeta) -> DdlChange {
4699    DdlChange::CreateTable {
4700        name: name.to_string(),
4701        columns: meta
4702            .columns
4703            .iter()
4704            .map(|col| {
4705                (
4706                    col.name.clone(),
4707                    sql_type_for_meta_column(col, &meta.propagation_rules),
4708                )
4709            })
4710            .collect(),
4711        constraints: create_table_constraints_from_meta(meta),
4712    }
4713}
4714
4715fn sql_type_for_ast(data_type: &DataType) -> String {
4716    match data_type {
4717        DataType::Uuid => "UUID".to_string(),
4718        DataType::Text => "TEXT".to_string(),
4719        DataType::Integer => "INTEGER".to_string(),
4720        DataType::Real => "REAL".to_string(),
4721        DataType::Boolean => "BOOLEAN".to_string(),
4722        DataType::Timestamp => "TIMESTAMP".to_string(),
4723        DataType::Json => "JSON".to_string(),
4724        DataType::Vector(dim) => format!("VECTOR({dim})"),
4725        DataType::TxId => "TXID".to_string(),
4726    }
4727}
4728
4729fn sql_type_for_ast_column(
4730    col: &contextdb_parser::ast::ColumnDef,
4731    _rules: &[contextdb_parser::ast::AstPropagationRule],
4732) -> String {
4733    let mut ty = sql_type_for_ast(&col.data_type);
4734    if let Some(reference) = &col.references {
4735        ty.push_str(&format!(
4736            " REFERENCES {}({})",
4737            reference.table, reference.column
4738        ));
4739        for rule in &reference.propagation_rules {
4740            if let contextdb_parser::ast::AstPropagationRule::FkState {
4741                trigger_state,
4742                target_state,
4743                max_depth,
4744                abort_on_failure,
4745            } = rule
4746            {
4747                ty.push_str(&format!(
4748                    " ON STATE {} PROPAGATE SET {}",
4749                    trigger_state, target_state
4750                ));
4751                if max_depth.unwrap_or(10) != 10 {
4752                    ty.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
4753                }
4754                if *abort_on_failure {
4755                    ty.push_str(" ABORT ON FAILURE");
4756                }
4757            }
4758        }
4759    }
4760    if col.primary_key {
4761        ty.push_str(" PRIMARY KEY");
4762    }
4763    if !col.nullable && !col.primary_key {
4764        ty.push_str(" NOT NULL");
4765    }
4766    if col.unique {
4767        ty.push_str(" UNIQUE");
4768    }
4769    if col.immutable {
4770        ty.push_str(" IMMUTABLE");
4771    }
4772    ty
4773}
4774
4775fn sql_type_for_meta_column(col: &contextdb_core::ColumnDef, rules: &[PropagationRule]) -> String {
4776    let mut ty = match col.column_type {
4777        ColumnType::Integer => "INTEGER".to_string(),
4778        ColumnType::Real => "REAL".to_string(),
4779        ColumnType::Text => "TEXT".to_string(),
4780        ColumnType::Boolean => "BOOLEAN".to_string(),
4781        ColumnType::Json => "JSON".to_string(),
4782        ColumnType::Uuid => "UUID".to_string(),
4783        ColumnType::Vector(dim) => format!("VECTOR({dim})"),
4784        ColumnType::Timestamp => "TIMESTAMP".to_string(),
4785        ColumnType::TxId => "TXID".to_string(),
4786    };
4787
4788    let fk_rules = rules
4789        .iter()
4790        .filter_map(|rule| match rule {
4791            PropagationRule::ForeignKey {
4792                fk_column,
4793                referenced_table,
4794                referenced_column,
4795                trigger_state,
4796                target_state,
4797                max_depth,
4798                abort_on_failure,
4799            } if fk_column == &col.name => Some((
4800                referenced_table,
4801                referenced_column,
4802                trigger_state,
4803                target_state,
4804                *max_depth,
4805                *abort_on_failure,
4806            )),
4807            _ => None,
4808        })
4809        .collect::<Vec<_>>();
4810
4811    if let Some(reference) = &col.references {
4812        ty.push_str(&format!(
4813            " REFERENCES {}({})",
4814            reference.table, reference.column
4815        ));
4816    } else if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
4817        ty.push_str(&format!(
4818            " REFERENCES {}({})",
4819            referenced_table, referenced_column
4820        ));
4821    }
4822
4823    if col.references.is_some() || !fk_rules.is_empty() {
4824        for (_, _, trigger_state, target_state, max_depth, abort_on_failure) in fk_rules {
4825            ty.push_str(&format!(
4826                " ON STATE {} PROPAGATE SET {}",
4827                trigger_state, target_state
4828            ));
4829            if max_depth != 10 {
4830                ty.push_str(&format!(" MAX DEPTH {max_depth}"));
4831            }
4832            if abort_on_failure {
4833                ty.push_str(" ABORT ON FAILURE");
4834            }
4835        }
4836    }
4837    if col.primary_key {
4838        ty.push_str(" PRIMARY KEY");
4839    }
4840    if !col.nullable && !col.primary_key {
4841        ty.push_str(" NOT NULL");
4842    }
4843    if col.unique {
4844        ty.push_str(" UNIQUE");
4845    }
4846    if col.expires {
4847        ty.push_str(" EXPIRES");
4848    }
4849    if col.immutable {
4850        ty.push_str(" IMMUTABLE");
4851    }
4852
4853    ty
4854}
4855
4856fn create_table_constraints_from_ast(ct: &CreateTable) -> Vec<String> {
4857    let mut constraints = Vec::new();
4858
4859    if ct.immutable {
4860        constraints.push("IMMUTABLE".to_string());
4861    }
4862
4863    if let Some(sm) = &ct.state_machine {
4864        let transitions = sm
4865            .transitions
4866            .iter()
4867            .map(|(from, tos)| format!("{from} -> [{}]", tos.join(", ")))
4868            .collect::<Vec<_>>()
4869            .join(", ");
4870        constraints.push(format!("STATE MACHINE ({}: {})", sm.column, transitions));
4871    }
4872
4873    if !ct.dag_edge_types.is_empty() {
4874        let edge_types = ct
4875            .dag_edge_types
4876            .iter()
4877            .map(|edge_type| format!("'{edge_type}'"))
4878            .collect::<Vec<_>>()
4879            .join(", ");
4880        constraints.push(format!("DAG({edge_types})"));
4881    }
4882
4883    if let Some(retain) = &ct.retain {
4884        let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(retain.duration_seconds));
4885        if retain.sync_safe {
4886            clause.push_str(" SYNC SAFE");
4887        }
4888        constraints.push(clause);
4889    }
4890
4891    for unique_constraint in &ct.unique_constraints {
4892        constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
4893    }
4894
4895    for rule in &ct.propagation_rules {
4896        match rule {
4897            contextdb_parser::ast::AstPropagationRule::EdgeState {
4898                edge_type,
4899                direction,
4900                trigger_state,
4901                target_state,
4902                max_depth,
4903                abort_on_failure,
4904            } => {
4905                let mut clause = format!(
4906                    "PROPAGATE ON EDGE {} {} STATE {} SET {}",
4907                    edge_type, direction, trigger_state, target_state
4908                );
4909                if max_depth.unwrap_or(10) != 10 {
4910                    clause.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
4911                }
4912                if *abort_on_failure {
4913                    clause.push_str(" ABORT ON FAILURE");
4914                }
4915                constraints.push(clause);
4916            }
4917            contextdb_parser::ast::AstPropagationRule::VectorExclusion { trigger_state } => {
4918                constraints.push(format!(
4919                    "PROPAGATE ON STATE {} EXCLUDE VECTOR",
4920                    trigger_state
4921                ));
4922            }
4923            contextdb_parser::ast::AstPropagationRule::FkState { .. } => {}
4924        }
4925    }
4926
4927    constraints
4928}
4929
4930fn create_table_constraints_from_meta(meta: &TableMeta) -> Vec<String> {
4931    let mut constraints = Vec::new();
4932
4933    if meta.immutable {
4934        constraints.push("IMMUTABLE".to_string());
4935    }
4936
4937    if let Some(sm) = &meta.state_machine {
4938        let states = sm
4939            .transitions
4940            .iter()
4941            .map(|(from, to)| format!("{from} -> [{}]", to.join(", ")))
4942            .collect::<Vec<_>>()
4943            .join(", ");
4944        constraints.push(format!("STATE MACHINE ({}: {})", sm.column, states));
4945    }
4946
4947    if !meta.dag_edge_types.is_empty() {
4948        let edge_types = meta
4949            .dag_edge_types
4950            .iter()
4951            .map(|edge_type| format!("'{edge_type}'"))
4952            .collect::<Vec<_>>()
4953            .join(", ");
4954        constraints.push(format!("DAG({edge_types})"));
4955    }
4956
4957    if let Some(ttl_seconds) = meta.default_ttl_seconds {
4958        let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(ttl_seconds));
4959        if meta.sync_safe {
4960            clause.push_str(" SYNC SAFE");
4961        }
4962        constraints.push(clause);
4963    }
4964
4965    for unique_constraint in &meta.unique_constraints {
4966        constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
4967    }
4968
4969    for rule in &meta.propagation_rules {
4970        match rule {
4971            PropagationRule::Edge {
4972                edge_type,
4973                direction,
4974                trigger_state,
4975                target_state,
4976                max_depth,
4977                abort_on_failure,
4978            } => {
4979                let dir = match direction {
4980                    Direction::Incoming => "INCOMING",
4981                    Direction::Outgoing => "OUTGOING",
4982                    Direction::Both => "BOTH",
4983                };
4984                let mut clause = format!(
4985                    "PROPAGATE ON EDGE {} {} STATE {} SET {}",
4986                    edge_type, dir, trigger_state, target_state
4987                );
4988                if *max_depth != 10 {
4989                    clause.push_str(&format!(" MAX DEPTH {max_depth}"));
4990                }
4991                if *abort_on_failure {
4992                    clause.push_str(" ABORT ON FAILURE");
4993                }
4994                constraints.push(clause);
4995            }
4996            PropagationRule::VectorExclusion { trigger_state } => {
4997                constraints.push(format!(
4998                    "PROPAGATE ON STATE {} EXCLUDE VECTOR",
4999                    trigger_state
5000                ));
5001            }
5002            PropagationRule::ForeignKey { .. } => {}
5003        }
5004    }
5005
5006    constraints
5007}
5008
5009fn ttl_seconds_to_sql(seconds: u64) -> String {
5010    if seconds.is_multiple_of(24 * 60 * 60) {
5011        format!("{} DAYS", seconds / (24 * 60 * 60))
5012    } else if seconds.is_multiple_of(60 * 60) {
5013        format!("{} HOURS", seconds / (60 * 60))
5014    } else if seconds.is_multiple_of(60) {
5015        format!("{} MINUTES", seconds / 60)
5016    } else {
5017        format!("{seconds} SECONDS")
5018    }
5019}