Skip to main content

contextdb_engine/
database.rs

1use crate::composite_store::{ChangeLogEntry, CompositeStore};
2use crate::executor::execute_plan;
3use crate::persistence::RedbPersistence;
4use crate::persistent_store::PersistentCompositeStore;
5use crate::plugin::{
6    CommitEvent, CommitSource, CorePlugin, DatabasePlugin, PluginHealth, QueryOutcome,
7    SubscriptionMetrics,
8};
9use crate::schema_enforcer::validate_dml;
10use crate::sync_types::{
11    ApplyResult, ChangeSet, Conflict, ConflictPolicies, ConflictPolicy, DdlChange, EdgeChange,
12    NaturalKey, RowChange, VectorChange,
13};
14use contextdb_core::*;
15use contextdb_graph::{GraphStore, MemGraphExecutor};
16use contextdb_parser::Statement;
17use contextdb_parser::ast::{AlterAction, CreateTable, DataType};
18use contextdb_planner::PhysicalPlan;
19use contextdb_relational::{MemRelationalExecutor, RelationalStore};
20use contextdb_tx::{TxManager, WriteSetApplicator};
21use contextdb_vector::{HnswIndex, MemVectorExecutor, VectorStore};
22use parking_lot::{Mutex, RwLock};
23use roaring::RoaringTreemap;
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::path::Path;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
28use std::sync::{Arc, OnceLock};
29use std::thread::{self, JoinHandle};
30use std::time::{Duration, Instant};
31
32type DynStore = Box<dyn WriteSetApplicator>;
33const DEFAULT_SUBSCRIPTION_CAPACITY: usize = 64;
34
35#[derive(Debug, Clone)]
36pub struct QueryResult {
37    pub columns: Vec<String>,
38    pub rows: Vec<Vec<Value>>,
39    pub rows_affected: u64,
40}
41
42impl QueryResult {
43    pub fn empty() -> Self {
44        Self {
45            columns: vec![],
46            rows: vec![],
47            rows_affected: 0,
48        }
49    }
50
51    pub fn empty_with_affected(rows_affected: u64) -> Self {
52        Self {
53            columns: vec![],
54            rows: vec![],
55            rows_affected,
56        }
57    }
58}
59
60pub struct Database {
61    tx_mgr: Arc<TxManager<DynStore>>,
62    relational_store: Arc<RelationalStore>,
63    graph_store: Arc<GraphStore>,
64    vector_store: Arc<VectorStore>,
65    change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
66    ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
67    persistence: Option<Arc<RedbPersistence>>,
68    relational: MemRelationalExecutor<DynStore>,
69    graph: MemGraphExecutor<DynStore>,
70    vector: MemVectorExecutor<DynStore>,
71    session_tx: Mutex<Option<TxId>>,
72    instance_id: uuid::Uuid,
73    plugin: Arc<dyn DatabasePlugin>,
74    accountant: Arc<MemoryAccountant>,
75    conflict_policies: RwLock<ConflictPolicies>,
76    subscriptions: Mutex<SubscriptionState>,
77    pruning_runtime: Mutex<PruningRuntime>,
78    pruning_guard: Arc<Mutex<()>>,
79    disk_limit: AtomicU64,
80    disk_limit_startup_ceiling: AtomicU64,
81    sync_watermark: Arc<AtomicU64>,
82    closed: AtomicBool,
83}
84
85impl std::fmt::Debug for Database {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct("Database")
88            .field("instance_id", &self.instance_id)
89            .finish_non_exhaustive()
90    }
91}
92
93#[derive(Debug, Clone)]
94struct PropagationQueueEntry {
95    table: String,
96    uuid: uuid::Uuid,
97    target_state: String,
98    depth: u32,
99    abort_on_failure: bool,
100}
101
102#[derive(Debug, Clone, Copy)]
103struct PropagationSource<'a> {
104    table: &'a str,
105    uuid: uuid::Uuid,
106    state: &'a str,
107    depth: u32,
108}
109
110#[derive(Debug, Clone, Copy)]
111struct PropagationContext<'a> {
112    tx: TxId,
113    snapshot: SnapshotId,
114    metas: &'a HashMap<String, TableMeta>,
115}
116
117#[derive(Debug)]
118struct SubscriptionState {
119    subscribers: Vec<SyncSender<CommitEvent>>,
120    events_sent: u64,
121    events_dropped: u64,
122}
123
124impl SubscriptionState {
125    fn new() -> Self {
126        Self {
127            subscribers: Vec::new(),
128            events_sent: 0,
129            events_dropped: 0,
130        }
131    }
132}
133
134#[derive(Debug)]
135struct PruningRuntime {
136    shutdown: Arc<AtomicBool>,
137    handle: Option<JoinHandle<()>>,
138}
139
140impl PruningRuntime {
141    fn new() -> Self {
142        Self {
143            shutdown: Arc::new(AtomicBool::new(false)),
144            handle: None,
145        }
146    }
147}
148
149impl Database {
150    #[allow(clippy::too_many_arguments)]
151    fn build_db(
152        tx_mgr: Arc<TxManager<DynStore>>,
153        relational: Arc<RelationalStore>,
154        graph: Arc<GraphStore>,
155        vector_store: Arc<VectorStore>,
156        hnsw: Arc<OnceLock<parking_lot::RwLock<Option<HnswIndex>>>>,
157        change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
158        ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
159        persistence: Option<Arc<RedbPersistence>>,
160        plugin: Arc<dyn DatabasePlugin>,
161        accountant: Arc<MemoryAccountant>,
162        disk_limit: Option<u64>,
163        disk_limit_startup_ceiling: Option<u64>,
164    ) -> Self {
165        Self {
166            tx_mgr: tx_mgr.clone(),
167            relational_store: relational.clone(),
168            graph_store: graph.clone(),
169            vector_store: vector_store.clone(),
170            change_log,
171            ddl_log,
172            persistence,
173            relational: MemRelationalExecutor::new(relational, tx_mgr.clone()),
174            graph: MemGraphExecutor::new(graph, tx_mgr.clone()),
175            vector: MemVectorExecutor::new_with_accountant(
176                vector_store,
177                tx_mgr.clone(),
178                hnsw,
179                accountant.clone(),
180            ),
181            session_tx: Mutex::new(None),
182            instance_id: uuid::Uuid::new_v4(),
183            plugin,
184            accountant,
185            conflict_policies: RwLock::new(ConflictPolicies::uniform(ConflictPolicy::LatestWins)),
186            subscriptions: Mutex::new(SubscriptionState::new()),
187            pruning_runtime: Mutex::new(PruningRuntime::new()),
188            pruning_guard: Arc::new(Mutex::new(())),
189            disk_limit: AtomicU64::new(disk_limit.unwrap_or(0)),
190            disk_limit_startup_ceiling: AtomicU64::new(disk_limit_startup_ceiling.unwrap_or(0)),
191            sync_watermark: Arc::new(AtomicU64::new(0)),
192            closed: AtomicBool::new(false),
193        }
194    }
195
196    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
197        Self::open_with_config(
198            path,
199            Arc::new(CorePlugin),
200            Arc::new(MemoryAccountant::no_limit()),
201        )
202    }
203
204    pub fn open_memory() -> Self {
205        Self::open_memory_with_plugin_and_accountant(
206            Arc::new(CorePlugin),
207            Arc::new(MemoryAccountant::no_limit()),
208        )
209        .expect("failed to open in-memory database")
210    }
211
212    fn open_loaded(
213        path: impl AsRef<Path>,
214        plugin: Arc<dyn DatabasePlugin>,
215        mut accountant: Arc<MemoryAccountant>,
216        startup_disk_limit: Option<u64>,
217    ) -> Result<Self> {
218        let path = path.as_ref();
219        let persistence = if path.exists() {
220            Arc::new(RedbPersistence::open(path)?)
221        } else {
222            Arc::new(RedbPersistence::create(path)?)
223        };
224        if accountant.usage().limit.is_none()
225            && let Some(limit) = persistence.load_config_value::<usize>("memory_limit")?
226        {
227            accountant = Arc::new(MemoryAccountant::with_budget(limit));
228        }
229        let persisted_disk_limit = persistence.load_config_value::<u64>("disk_limit")?;
230        let startup_disk_ceiling = startup_disk_limit;
231        let effective_disk_limit = match (persisted_disk_limit, startup_disk_limit) {
232            (Some(persisted), Some(ceiling)) => Some(persisted.min(ceiling)),
233            (Some(persisted), None) => Some(persisted),
234            (None, Some(ceiling)) => Some(ceiling),
235            (None, None) => None,
236        };
237
238        let all_meta = persistence.load_all_table_meta()?;
239
240        let relational = Arc::new(RelationalStore::new());
241        for (name, meta) in &all_meta {
242            relational.create_table(name, meta.clone());
243            for row in persistence.load_relational_table(name)? {
244                relational.insert_loaded_row(name, row);
245            }
246        }
247
248        let graph = Arc::new(GraphStore::new());
249        for edge in persistence.load_forward_edges()? {
250            graph.insert_loaded_edge(edge);
251        }
252
253        let hnsw = Arc::new(OnceLock::new());
254        let vector = Arc::new(VectorStore::new(hnsw.clone()));
255        for meta in all_meta.values() {
256            for column in &meta.columns {
257                if let ColumnType::Vector(dimension) = column.column_type {
258                    vector.set_dimension(dimension);
259                    break;
260                }
261            }
262        }
263        for entry in persistence.load_vectors()? {
264            vector.insert_loaded_vector(entry);
265        }
266
267        let max_row_id = relational.max_row_id();
268        let max_tx = max_tx_across_all(&relational, &graph, &vector);
269        let max_lsn = max_lsn_across_all(&relational, &graph, &vector);
270        relational.set_next_row_id(max_row_id.saturating_add(1));
271
272        let change_log = Arc::new(RwLock::new(persistence.load_change_log()?));
273        let ddl_log = Arc::new(RwLock::new(persistence.load_ddl_log()?));
274        let composite = CompositeStore::new(
275            relational.clone(),
276            graph.clone(),
277            vector.clone(),
278            change_log.clone(),
279            ddl_log.clone(),
280        );
281        let persistent = PersistentCompositeStore::new(composite, persistence.clone());
282        let store: DynStore = Box::new(persistent);
283        let tx_mgr = Arc::new(TxManager::new_with_counters(
284            store,
285            max_tx.saturating_add(1),
286            max_lsn.saturating_add(1),
287            max_tx,
288        ));
289
290        let db = Self::build_db(
291            tx_mgr,
292            relational,
293            graph,
294            vector,
295            hnsw,
296            change_log,
297            ddl_log,
298            Some(persistence),
299            plugin,
300            accountant,
301            effective_disk_limit,
302            startup_disk_ceiling,
303        );
304
305        for meta in all_meta.values() {
306            if !meta.dag_edge_types.is_empty() {
307                db.graph.register_dag_edge_types(&meta.dag_edge_types);
308            }
309        }
310
311        db.account_loaded_state()?;
312        maybe_prebuild_hnsw(&db.vector_store, db.accountant());
313
314        Ok(db)
315    }
316
317    fn open_memory_internal(
318        plugin: Arc<dyn DatabasePlugin>,
319        accountant: Arc<MemoryAccountant>,
320    ) -> Result<Self> {
321        let relational = Arc::new(RelationalStore::new());
322        let graph = Arc::new(GraphStore::new());
323        let hnsw = Arc::new(OnceLock::new());
324        let vector = Arc::new(VectorStore::new(hnsw.clone()));
325        let change_log = Arc::new(RwLock::new(Vec::new()));
326        let ddl_log = Arc::new(RwLock::new(Vec::new()));
327        let store: DynStore = Box::new(CompositeStore::new(
328            relational.clone(),
329            graph.clone(),
330            vector.clone(),
331            change_log.clone(),
332            ddl_log.clone(),
333        ));
334        let tx_mgr = Arc::new(TxManager::new(store));
335
336        let db = Self::build_db(
337            tx_mgr, relational, graph, vector, hnsw, change_log, ddl_log, None, plugin, accountant,
338            None, None,
339        );
340        maybe_prebuild_hnsw(&db.vector_store, db.accountant());
341        Ok(db)
342    }
343
344    pub fn begin(&self) -> TxId {
345        self.tx_mgr.begin()
346    }
347
348    pub fn commit(&self, tx: TxId) -> Result<()> {
349        self.commit_with_source(tx, CommitSource::User)
350    }
351
352    pub fn rollback(&self, tx: TxId) -> Result<()> {
353        let ws = self.tx_mgr.cloned_write_set(tx)?;
354        self.release_insert_allocations(&ws);
355        self.tx_mgr.rollback(tx)
356    }
357
358    pub fn snapshot(&self) -> SnapshotId {
359        self.tx_mgr.snapshot()
360    }
361
362    pub fn execute(&self, sql: &str, params: &HashMap<String, Value>) -> Result<QueryResult> {
363        let stmt = contextdb_parser::parse(sql)?;
364
365        match &stmt {
366            Statement::Begin => {
367                let mut session = self.session_tx.lock();
368                if session.is_none() {
369                    *session = Some(self.begin());
370                }
371                return Ok(QueryResult::empty());
372            }
373            Statement::Commit => {
374                let mut session = self.session_tx.lock();
375                if let Some(tx) = session.take() {
376                    self.commit_with_source(tx, CommitSource::User)?;
377                }
378                return Ok(QueryResult::empty());
379            }
380            Statement::Rollback => {
381                let mut session = self.session_tx.lock();
382                if let Some(tx) = *session {
383                    self.rollback(tx)?;
384                    *session = None;
385                }
386                return Ok(QueryResult::empty());
387            }
388            _ => {}
389        }
390
391        let active_tx = *self.session_tx.lock();
392        self.execute_statement(&stmt, sql, params, active_tx)
393    }
394
395    fn execute_autocommit(
396        &self,
397        plan: &PhysicalPlan,
398        params: &HashMap<String, Value>,
399    ) -> Result<QueryResult> {
400        match plan {
401            PhysicalPlan::Insert(_) | PhysicalPlan::Delete(_) | PhysicalPlan::Update(_) => {
402                let tx = self.begin();
403                let result = execute_plan(self, plan, params, Some(tx));
404                match result {
405                    Ok(qr) => {
406                        self.commit_with_source(tx, CommitSource::AutoCommit)?;
407                        Ok(qr)
408                    }
409                    Err(e) => {
410                        let _ = self.rollback(tx);
411                        Err(e)
412                    }
413                }
414            }
415            _ => execute_plan(self, plan, params, None),
416        }
417    }
418
419    pub fn explain(&self, sql: &str) -> Result<String> {
420        let stmt = contextdb_parser::parse(sql)?;
421        let plan = contextdb_planner::plan(&stmt)?;
422        if self.vector_store.vector_count() >= 1000 && !self.vector_store.has_hnsw_index() {
423            maybe_prebuild_hnsw(&self.vector_store, self.accountant());
424        }
425        let mut output = plan.explain();
426        if self.vector_store.has_hnsw_index() {
427            output = output.replace("VectorSearch(", "HNSWSearch(");
428            output = output.replace("VectorSearch {", "HNSWSearch {");
429        } else {
430            output = output.replace("VectorSearch(", "VectorSearch(strategy=BruteForce, ");
431            output = output.replace("VectorSearch {", "VectorSearch { strategy: BruteForce,");
432        }
433        Ok(output)
434    }
435
436    pub fn execute_in_tx(
437        &self,
438        tx: TxId,
439        sql: &str,
440        params: &HashMap<String, Value>,
441    ) -> Result<QueryResult> {
442        let stmt = contextdb_parser::parse(sql)?;
443        self.execute_statement(&stmt, sql, params, Some(tx))
444    }
445
446    fn commit_with_source(&self, tx: TxId, source: CommitSource) -> Result<()> {
447        let mut ws = self.tx_mgr.cloned_write_set(tx)?;
448
449        if !ws.is_empty()
450            && let Err(err) = self.validate_foreign_keys_in_tx(tx)
451        {
452            let _ = self.rollback(tx);
453            return Err(err);
454        }
455
456        if !ws.is_empty()
457            && let Err(err) = self.plugin.pre_commit(&ws, source)
458        {
459            let _ = self.rollback(tx);
460            return Err(err);
461        }
462
463        let lsn = self.tx_mgr.commit_with_lsn(tx)?;
464
465        if !ws.is_empty() {
466            self.release_delete_allocations(&ws);
467            ws.stamp_lsn(lsn);
468            self.plugin.post_commit(&ws, source);
469            self.publish_commit_event(Self::build_commit_event(&ws, source, lsn));
470        }
471
472        Ok(())
473    }
474
475    fn build_commit_event(
476        ws: &contextdb_tx::WriteSet,
477        source: CommitSource,
478        lsn: u64,
479    ) -> CommitEvent {
480        let mut tables_changed: Vec<String> = ws
481            .relational_inserts
482            .iter()
483            .map(|(table, _)| table.clone())
484            .chain(
485                ws.relational_deletes
486                    .iter()
487                    .map(|(table, _, _)| table.clone()),
488            )
489            .collect::<HashSet<_>>()
490            .into_iter()
491            .collect();
492        tables_changed.sort();
493
494        CommitEvent {
495            source,
496            lsn,
497            tables_changed,
498            row_count: ws.relational_inserts.len()
499                + ws.relational_deletes.len()
500                + ws.adj_inserts.len()
501                + ws.adj_deletes.len()
502                + ws.vector_inserts.len()
503                + ws.vector_deletes.len(),
504        }
505    }
506
507    fn publish_commit_event(&self, event: CommitEvent) {
508        let mut subscriptions = self.subscriptions.lock();
509        let subscribers = std::mem::take(&mut subscriptions.subscribers);
510        let mut live_subscribers = Vec::with_capacity(subscribers.len());
511
512        for sender in subscribers {
513            match sender.try_send(event.clone()) {
514                Ok(()) => {
515                    subscriptions.events_sent += 1;
516                    live_subscribers.push(sender);
517                }
518                Err(TrySendError::Full(_)) => {
519                    subscriptions.events_dropped += 1;
520                    live_subscribers.push(sender);
521                }
522                Err(TrySendError::Disconnected(_)) => {}
523            }
524        }
525
526        subscriptions.subscribers = live_subscribers;
527    }
528
529    fn stop_pruning_thread(&self) {
530        let handle = {
531            let mut runtime = self.pruning_runtime.lock();
532            runtime.shutdown.store(true, Ordering::SeqCst);
533            let handle = runtime.handle.take();
534            runtime.shutdown = Arc::new(AtomicBool::new(false));
535            handle
536        };
537
538        if let Some(handle) = handle {
539            let _ = handle.join();
540        }
541    }
542
543    fn execute_statement(
544        &self,
545        stmt: &Statement,
546        sql: &str,
547        params: &HashMap<String, Value>,
548        tx: Option<TxId>,
549    ) -> Result<QueryResult> {
550        self.plugin.on_query(sql)?;
551
552        if let Some(change) = self.ddl_change_for_statement(stmt).as_ref() {
553            self.plugin.on_ddl(change)?;
554        }
555
556        // Handle INSERT INTO GRAPH / __edges as a virtual table routing to the graph store.
557        if let Statement::Insert(ins) = stmt
558            && (ins.table.eq_ignore_ascii_case("GRAPH")
559                || ins.table.eq_ignore_ascii_case("__edges"))
560        {
561            return self.execute_graph_insert(ins, params, tx);
562        }
563
564        let started = Instant::now();
565        let result = (|| {
566            // Pre-resolve InSubquery expressions with CTE context before planning
567            let stmt = self.pre_resolve_cte_subqueries(stmt, params, tx)?;
568            let plan = contextdb_planner::plan(&stmt)?;
569            validate_dml(&plan, self, params)?;
570            let result = match tx {
571                Some(tx) => execute_plan(self, &plan, params, Some(tx)),
572                None => self.execute_autocommit(&plan, params),
573            };
574            if result.is_ok()
575                && let Statement::CreateTable(ct) = &stmt
576                && !ct.dag_edge_types.is_empty()
577            {
578                self.graph.register_dag_edge_types(&ct.dag_edge_types);
579            }
580            result
581        })();
582        let duration = started.elapsed();
583        let outcome = query_outcome_from_result(&result);
584        self.plugin.post_query(sql, duration, &outcome);
585        result.map(strip_internal_row_id)
586    }
587
588    /// Handle `INSERT INTO GRAPH (source_id, target_id, edge_type) VALUES (...)`.
589    fn execute_graph_insert(
590        &self,
591        ins: &contextdb_parser::ast::Insert,
592        params: &HashMap<String, Value>,
593        tx: Option<TxId>,
594    ) -> Result<QueryResult> {
595        use crate::executor::resolve_expr;
596
597        let col_index = |name: &str| {
598            ins.columns
599                .iter()
600                .position(|c| c.eq_ignore_ascii_case(name))
601        };
602        let source_idx = col_index("source_id")
603            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires source_id column".into()))?;
604        let target_idx = col_index("target_id")
605            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires target_id column".into()))?;
606        let edge_type_idx = col_index("edge_type")
607            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires edge_type column".into()))?;
608
609        let auto_commit = tx.is_none();
610        let tx = tx.unwrap_or_else(|| self.begin());
611        let mut count = 0u64;
612        for row_exprs in &ins.values {
613            let source = resolve_expr(&row_exprs[source_idx], params)?;
614            let target = resolve_expr(&row_exprs[target_idx], params)?;
615            let edge_type = resolve_expr(&row_exprs[edge_type_idx], params)?;
616
617            let source_uuid = match &source {
618                Value::Uuid(u) => *u,
619                Value::Text(t) => uuid::Uuid::parse_str(t)
620                    .map_err(|e| Error::PlanError(format!("invalid source_id uuid: {e}")))?,
621                _ => return Err(Error::PlanError("source_id must be UUID".into())),
622            };
623            let target_uuid = match &target {
624                Value::Uuid(u) => *u,
625                Value::Text(t) => uuid::Uuid::parse_str(t)
626                    .map_err(|e| Error::PlanError(format!("invalid target_id uuid: {e}")))?,
627                _ => return Err(Error::PlanError("target_id must be UUID".into())),
628            };
629            let edge_type_str = match &edge_type {
630                Value::Text(t) => t.clone(),
631                _ => return Err(Error::PlanError("edge_type must be TEXT".into())),
632            };
633
634            self.insert_edge(
635                tx,
636                source_uuid,
637                target_uuid,
638                edge_type_str,
639                Default::default(),
640            )?;
641            count += 1;
642        }
643
644        if auto_commit {
645            self.commit_with_source(tx, CommitSource::AutoCommit)?;
646        }
647
648        Ok(QueryResult::empty_with_affected(count))
649    }
650
651    fn ddl_change_for_statement(&self, stmt: &Statement) -> Option<DdlChange> {
652        match stmt {
653            Statement::CreateTable(ct) => Some(ddl_change_from_create_table(ct)),
654            Statement::DropTable(dt) => Some(DdlChange::DropTable {
655                name: dt.name.clone(),
656            }),
657            Statement::AlterTable(at) => {
658                let mut meta = self.table_meta(&at.table).unwrap_or_default();
659                // Simulate the alter action on a cloned meta to get post-alteration columns
660                match &at.action {
661                    AlterAction::AddColumn(col) => {
662                        meta.columns.push(contextdb_core::ColumnDef {
663                            name: col.name.clone(),
664                            column_type: crate::executor::map_column_type(&col.data_type),
665                            nullable: col.nullable,
666                            primary_key: col.primary_key,
667                            unique: col.unique,
668                            default: col
669                                .default
670                                .as_ref()
671                                .map(crate::executor::stored_default_expr),
672                            references: col.references.as_ref().map(|reference| {
673                                contextdb_core::ForeignKeyReference {
674                                    table: reference.table.clone(),
675                                    column: reference.column.clone(),
676                                }
677                            }),
678                            expires: col.expires,
679                        });
680                        if col.expires {
681                            meta.expires_column = Some(col.name.clone());
682                        }
683                    }
684                    AlterAction::DropColumn(name) => {
685                        meta.columns.retain(|c| c.name != *name);
686                        if meta.expires_column.as_deref() == Some(name.as_str()) {
687                            meta.expires_column = None;
688                        }
689                    }
690                    AlterAction::RenameColumn { from, to } => {
691                        if let Some(c) = meta.columns.iter_mut().find(|c| c.name == *from) {
692                            c.name = to.clone();
693                        }
694                        if meta.expires_column.as_deref() == Some(from.as_str()) {
695                            meta.expires_column = Some(to.clone());
696                        }
697                    }
698                    AlterAction::SetRetain {
699                        duration_seconds,
700                        sync_safe,
701                    } => {
702                        meta.default_ttl_seconds = Some(*duration_seconds);
703                        meta.sync_safe = *sync_safe;
704                    }
705                    AlterAction::DropRetain => {
706                        meta.default_ttl_seconds = None;
707                        meta.sync_safe = false;
708                    }
709                    AlterAction::SetSyncConflictPolicy(_) | AlterAction::DropSyncConflictPolicy => { /* handled in executor */
710                    }
711                }
712                Some(DdlChange::AlterTable {
713                    name: at.table.clone(),
714                    columns: meta
715                        .columns
716                        .iter()
717                        .map(|c| {
718                            (
719                                c.name.clone(),
720                                sql_type_for_meta_column(c, &meta.propagation_rules),
721                            )
722                        })
723                        .collect(),
724                    constraints: create_table_constraints_from_meta(&meta),
725                })
726            }
727            _ => None,
728        }
729    }
730
731    /// Pre-resolve InSubquery expressions within SELECT statements that have CTEs.
732    /// This allows CTE-backed subqueries in WHERE clauses to be evaluated before planning.
733    fn pre_resolve_cte_subqueries(
734        &self,
735        stmt: &Statement,
736        params: &HashMap<String, Value>,
737        tx: Option<TxId>,
738    ) -> Result<Statement> {
739        if let Statement::Select(sel) = stmt
740            && !sel.ctes.is_empty()
741            && sel.body.where_clause.is_some()
742        {
743            use crate::executor::resolve_in_subqueries_with_ctes;
744            let resolved_where = sel
745                .body
746                .where_clause
747                .as_ref()
748                .map(|expr| resolve_in_subqueries_with_ctes(self, expr, params, tx, &sel.ctes))
749                .transpose()?;
750            let mut new_body = sel.body.clone();
751            new_body.where_clause = resolved_where;
752            Ok(Statement::Select(contextdb_parser::ast::SelectStatement {
753                ctes: sel.ctes.clone(),
754                body: new_body,
755            }))
756        } else {
757            Ok(stmt.clone())
758        }
759    }
760
761    pub fn insert_row(
762        &self,
763        tx: TxId,
764        table: &str,
765        values: HashMap<ColName, Value>,
766    ) -> Result<RowId> {
767        self.validate_row_constraints(tx, table, &values, None)?;
768        self.relational.insert(tx, table, values)
769    }
770
771    pub fn upsert_row(
772        &self,
773        tx: TxId,
774        table: &str,
775        conflict_col: &str,
776        values: HashMap<ColName, Value>,
777    ) -> Result<UpsertResult> {
778        let snapshot = self.snapshot();
779        let existing_row_id = values
780            .get(conflict_col)
781            .map(|conflict_value| {
782                self.point_lookup_in_tx(tx, table, conflict_col, conflict_value, snapshot)
783                    .map(|row| row.map(|row| row.row_id))
784            })
785            .transpose()?
786            .flatten();
787        self.validate_row_constraints(tx, table, &values, existing_row_id)?;
788
789        let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
790        let meta = self.table_meta(table);
791        let new_state = meta
792            .as_ref()
793            .and_then(|m| m.state_machine.as_ref())
794            .and_then(|sm| values.get(&sm.column))
795            .and_then(Value::as_text)
796            .map(std::borrow::ToOwned::to_owned);
797
798        let result = self
799            .relational
800            .upsert(tx, table, conflict_col, values, snapshot)?;
801
802        if let (Some(uuid), Some(state), Some(_meta)) =
803            (row_uuid, new_state.as_deref(), meta.as_ref())
804            && matches!(result, UpsertResult::Updated)
805        {
806            self.propagate_state_change_if_needed(tx, table, Some(uuid), Some(state))?;
807        }
808
809        Ok(result)
810    }
811
812    fn validate_row_constraints(
813        &self,
814        tx: TxId,
815        table: &str,
816        values: &HashMap<ColName, Value>,
817        skip_row_id: Option<RowId>,
818    ) -> Result<()> {
819        let meta = self
820            .table_meta(table)
821            .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
822        let snapshot = self.snapshot();
823
824        let visible_rows =
825            self.relational
826                .scan_filter_with_tx(Some(tx), table, snapshot, &|row| {
827                    skip_row_id.is_none_or(|row_id| row.row_id != row_id)
828                })?;
829
830        for column in meta
831            .columns
832            .iter()
833            .filter(|column| column.unique && !column.primary_key)
834        {
835            let Some(value) = values.get(&column.name) else {
836                continue;
837            };
838            if *value == Value::Null {
839                continue;
840            }
841            if visible_rows
842                .iter()
843                .any(|existing| existing.values.get(&column.name) == Some(value))
844            {
845                return Err(Error::UniqueViolation {
846                    table: table.to_string(),
847                    column: column.name.clone(),
848                });
849            }
850        }
851
852        for unique_constraint in &meta.unique_constraints {
853            let mut candidate_values = Vec::with_capacity(unique_constraint.len());
854            let mut has_null = false;
855
856            for column_name in unique_constraint {
857                match values.get(column_name) {
858                    Some(Value::Null) | None => {
859                        has_null = true;
860                        break;
861                    }
862                    Some(value) => candidate_values.push(value),
863                }
864            }
865
866            if has_null {
867                continue;
868            }
869
870            if visible_rows.iter().any(|existing| {
871                unique_constraint
872                    .iter()
873                    .zip(candidate_values.iter())
874                    .all(|(column_name, value)| existing.values.get(column_name) == Some(*value))
875            }) {
876                return Err(Error::UniqueViolation {
877                    table: table.to_string(),
878                    column: unique_constraint.join(","),
879                });
880            }
881        }
882
883        Ok(())
884    }
885
886    fn validate_foreign_keys_in_tx(&self, tx: TxId) -> Result<()> {
887        let snapshot = self.snapshot();
888        let relational_inserts = self
889            .tx_mgr
890            .with_write_set(tx, |ws| ws.relational_inserts.clone())?;
891
892        for (table, row) in relational_inserts {
893            let meta = self
894                .table_meta(&table)
895                .ok_or_else(|| Error::TableNotFound(table.clone()))?;
896            for column in &meta.columns {
897                let Some(reference) = &column.references else {
898                    continue;
899                };
900                let Some(value) = row.values.get(&column.name) else {
901                    continue;
902                };
903                if *value == Value::Null {
904                    continue;
905                }
906                if self
907                    .point_lookup_in_tx(tx, &reference.table, &reference.column, value, snapshot)?
908                    .is_none()
909                {
910                    return Err(Error::ForeignKeyViolation {
911                        table: table.clone(),
912                        column: column.name.clone(),
913                        ref_table: reference.table.clone(),
914                    });
915                }
916            }
917        }
918
919        Ok(())
920    }
921
922    pub(crate) fn propagate_state_change_if_needed(
923        &self,
924        tx: TxId,
925        table: &str,
926        row_uuid: Option<uuid::Uuid>,
927        new_state: Option<&str>,
928    ) -> Result<()> {
929        if let (Some(uuid), Some(state)) = (row_uuid, new_state) {
930            let already_propagating = self
931                .tx_mgr
932                .with_write_set(tx, |ws| ws.propagation_in_progress)?;
933            if !already_propagating {
934                self.tx_mgr
935                    .with_write_set(tx, |ws| ws.propagation_in_progress = true)?;
936                let propagate_result = self.propagate(tx, table, uuid, state);
937                self.tx_mgr
938                    .with_write_set(tx, |ws| ws.propagation_in_progress = false)?;
939                propagate_result?;
940            }
941        }
942
943        Ok(())
944    }
945
946    fn propagate(
947        &self,
948        tx: TxId,
949        table: &str,
950        row_uuid: uuid::Uuid,
951        new_state: &str,
952    ) -> Result<()> {
953        let snapshot = self.snapshot();
954        let metas = self.relational_store().table_meta.read().clone();
955        let mut queue: VecDeque<PropagationQueueEntry> = VecDeque::new();
956        let mut visited: HashSet<(String, uuid::Uuid)> = HashSet::new();
957        let mut abort_violation: Option<Error> = None;
958        let ctx = PropagationContext {
959            tx,
960            snapshot,
961            metas: &metas,
962        };
963        let root = PropagationSource {
964            table,
965            uuid: row_uuid,
966            state: new_state,
967            depth: 0,
968        };
969
970        self.enqueue_fk_children(&ctx, &mut queue, root);
971        self.enqueue_edge_children(&ctx, &mut queue, root)?;
972        self.apply_vector_exclusions(&ctx, root)?;
973
974        while let Some(entry) = queue.pop_front() {
975            if !visited.insert((entry.table.clone(), entry.uuid)) {
976                continue;
977            }
978
979            let Some(meta) = metas.get(&entry.table) else {
980                continue;
981            };
982
983            let Some(state_machine) = &meta.state_machine else {
984                let msg = format!(
985                    "warning: propagation target table {} has no state machine",
986                    entry.table
987                );
988                eprintln!("{msg}");
989                if entry.abort_on_failure && abort_violation.is_none() {
990                    abort_violation = Some(Error::PropagationAborted {
991                        table: entry.table.clone(),
992                        column: String::new(),
993                        from: String::new(),
994                        to: entry.target_state.clone(),
995                    });
996                }
997                continue;
998            };
999
1000            let state_column = state_machine.column.clone();
1001            let Some(existing) = self.relational.point_lookup_with_tx(
1002                Some(tx),
1003                &entry.table,
1004                "id",
1005                &Value::Uuid(entry.uuid),
1006                snapshot,
1007            )?
1008            else {
1009                continue;
1010            };
1011
1012            let from_state = existing
1013                .values
1014                .get(&state_column)
1015                .and_then(Value::as_text)
1016                .unwrap_or("")
1017                .to_string();
1018
1019            let mut next_values = existing.values.clone();
1020            next_values.insert(
1021                state_column.clone(),
1022                Value::Text(entry.target_state.clone()),
1023            );
1024
1025            let upsert_outcome =
1026                self.relational
1027                    .upsert(tx, &entry.table, "id", next_values, snapshot);
1028
1029            let reached_state = match upsert_outcome {
1030                Ok(UpsertResult::Updated) => entry.target_state.as_str(),
1031                Ok(UpsertResult::NoOp) | Ok(UpsertResult::Inserted) => continue,
1032                Err(Error::InvalidStateTransition(_)) => {
1033                    eprintln!(
1034                        "warning: skipped invalid propagated transition {}.{} {} -> {}",
1035                        entry.table, state_column, from_state, entry.target_state
1036                    );
1037                    if entry.abort_on_failure && abort_violation.is_none() {
1038                        abort_violation = Some(Error::PropagationAborted {
1039                            table: entry.table.clone(),
1040                            column: state_column.clone(),
1041                            from: from_state,
1042                            to: entry.target_state.clone(),
1043                        });
1044                    }
1045                    continue;
1046                }
1047                Err(err) => return Err(err),
1048            };
1049
1050            self.enqueue_edge_children(
1051                &ctx,
1052                &mut queue,
1053                PropagationSource {
1054                    table: &entry.table,
1055                    uuid: entry.uuid,
1056                    state: reached_state,
1057                    depth: entry.depth,
1058                },
1059            )?;
1060            self.apply_vector_exclusions(
1061                &ctx,
1062                PropagationSource {
1063                    table: &entry.table,
1064                    uuid: entry.uuid,
1065                    state: reached_state,
1066                    depth: entry.depth,
1067                },
1068            )?;
1069
1070            self.enqueue_fk_children(
1071                &ctx,
1072                &mut queue,
1073                PropagationSource {
1074                    table: &entry.table,
1075                    uuid: entry.uuid,
1076                    state: reached_state,
1077                    depth: entry.depth,
1078                },
1079            );
1080        }
1081
1082        if let Some(err) = abort_violation {
1083            return Err(err);
1084        }
1085
1086        Ok(())
1087    }
1088
1089    fn enqueue_fk_children(
1090        &self,
1091        ctx: &PropagationContext<'_>,
1092        queue: &mut VecDeque<PropagationQueueEntry>,
1093        source: PropagationSource<'_>,
1094    ) {
1095        for (owner_table, owner_meta) in ctx.metas {
1096            for rule in &owner_meta.propagation_rules {
1097                let PropagationRule::ForeignKey {
1098                    fk_column,
1099                    referenced_table,
1100                    trigger_state,
1101                    target_state,
1102                    max_depth,
1103                    abort_on_failure,
1104                    ..
1105                } = rule
1106                else {
1107                    continue;
1108                };
1109
1110                if referenced_table != source.table || trigger_state != source.state {
1111                    continue;
1112                }
1113
1114                if source.depth >= *max_depth {
1115                    continue;
1116                }
1117
1118                let rows = match self.relational.scan_filter_with_tx(
1119                    Some(ctx.tx),
1120                    owner_table,
1121                    ctx.snapshot,
1122                    &|row| row.values.get(fk_column) == Some(&Value::Uuid(source.uuid)),
1123                ) {
1124                    Ok(rows) => rows,
1125                    Err(err) => {
1126                        eprintln!(
1127                            "warning: propagation scan failed for {owner_table}.{fk_column}: {err}"
1128                        );
1129                        continue;
1130                    }
1131                };
1132
1133                for row in rows {
1134                    if let Some(id) = row.values.get("id").and_then(Value::as_uuid).copied() {
1135                        queue.push_back(PropagationQueueEntry {
1136                            table: owner_table.clone(),
1137                            uuid: id,
1138                            target_state: target_state.clone(),
1139                            depth: source.depth + 1,
1140                            abort_on_failure: *abort_on_failure,
1141                        });
1142                    }
1143                }
1144            }
1145        }
1146    }
1147
1148    fn enqueue_edge_children(
1149        &self,
1150        ctx: &PropagationContext<'_>,
1151        queue: &mut VecDeque<PropagationQueueEntry>,
1152        source: PropagationSource<'_>,
1153    ) -> Result<()> {
1154        let Some(meta) = ctx.metas.get(source.table) else {
1155            return Ok(());
1156        };
1157
1158        for rule in &meta.propagation_rules {
1159            let PropagationRule::Edge {
1160                edge_type,
1161                direction,
1162                trigger_state,
1163                target_state,
1164                max_depth,
1165                abort_on_failure,
1166            } = rule
1167            else {
1168                continue;
1169            };
1170
1171            if trigger_state != source.state || source.depth >= *max_depth {
1172                continue;
1173            }
1174
1175            let bfs = self.query_bfs(
1176                source.uuid,
1177                Some(std::slice::from_ref(edge_type)),
1178                *direction,
1179                1,
1180                ctx.snapshot,
1181            )?;
1182
1183            for node in bfs.nodes {
1184                if self
1185                    .relational
1186                    .point_lookup_with_tx(
1187                        Some(ctx.tx),
1188                        source.table,
1189                        "id",
1190                        &Value::Uuid(node.id),
1191                        ctx.snapshot,
1192                    )?
1193                    .is_some()
1194                {
1195                    queue.push_back(PropagationQueueEntry {
1196                        table: source.table.to_string(),
1197                        uuid: node.id,
1198                        target_state: target_state.clone(),
1199                        depth: source.depth + 1,
1200                        abort_on_failure: *abort_on_failure,
1201                    });
1202                }
1203            }
1204        }
1205
1206        Ok(())
1207    }
1208
1209    fn apply_vector_exclusions(
1210        &self,
1211        ctx: &PropagationContext<'_>,
1212        source: PropagationSource<'_>,
1213    ) -> Result<()> {
1214        let Some(meta) = ctx.metas.get(source.table) else {
1215            return Ok(());
1216        };
1217
1218        for rule in &meta.propagation_rules {
1219            let PropagationRule::VectorExclusion { trigger_state } = rule else {
1220                continue;
1221            };
1222            if trigger_state != source.state {
1223                continue;
1224            }
1225            for row_id in self.logical_row_ids_for_uuid(ctx.tx, source.table, source.uuid) {
1226                self.delete_vector(ctx.tx, row_id)?;
1227            }
1228        }
1229
1230        Ok(())
1231    }
1232
1233    pub fn delete_row(&self, tx: TxId, table: &str, row_id: RowId) -> Result<()> {
1234        self.relational.delete(tx, table, row_id)
1235    }
1236
1237    pub fn scan(&self, table: &str, snapshot: SnapshotId) -> Result<Vec<VersionedRow>> {
1238        self.relational.scan(table, snapshot)
1239    }
1240
1241    pub fn scan_filter(
1242        &self,
1243        table: &str,
1244        snapshot: SnapshotId,
1245        predicate: &dyn Fn(&VersionedRow) -> bool,
1246    ) -> Result<Vec<VersionedRow>> {
1247        self.relational.scan_filter(table, snapshot, predicate)
1248    }
1249
1250    pub fn point_lookup(
1251        &self,
1252        table: &str,
1253        col: &str,
1254        value: &Value,
1255        snapshot: SnapshotId,
1256    ) -> Result<Option<VersionedRow>> {
1257        self.relational.point_lookup(table, col, value, snapshot)
1258    }
1259
1260    pub(crate) fn point_lookup_in_tx(
1261        &self,
1262        tx: TxId,
1263        table: &str,
1264        col: &str,
1265        value: &Value,
1266        snapshot: SnapshotId,
1267    ) -> Result<Option<VersionedRow>> {
1268        self.relational
1269            .point_lookup_with_tx(Some(tx), table, col, value, snapshot)
1270    }
1271
1272    pub(crate) fn logical_row_ids_for_uuid(
1273        &self,
1274        tx: TxId,
1275        table: &str,
1276        uuid: uuid::Uuid,
1277    ) -> Vec<RowId> {
1278        let mut row_ids = HashSet::new();
1279
1280        if let Some(rows) = self.relational_store.tables.read().get(table) {
1281            for row in rows {
1282                if row.values.get("id") == Some(&Value::Uuid(uuid)) {
1283                    row_ids.insert(row.row_id);
1284                }
1285            }
1286        }
1287
1288        let _ = self.tx_mgr.with_write_set(tx, |ws| {
1289            for (insert_table, row) in &ws.relational_inserts {
1290                if insert_table == table && row.values.get("id") == Some(&Value::Uuid(uuid)) {
1291                    row_ids.insert(row.row_id);
1292                }
1293            }
1294        });
1295
1296        row_ids.into_iter().collect()
1297    }
1298
1299    pub fn insert_edge(
1300        &self,
1301        tx: TxId,
1302        source: NodeId,
1303        target: NodeId,
1304        edge_type: EdgeType,
1305        properties: HashMap<String, Value>,
1306    ) -> Result<bool> {
1307        let bytes = estimate_edge_bytes(source, target, &edge_type, &properties);
1308        self.accountant.try_allocate_for(
1309            bytes,
1310            "graph_insert",
1311            "insert_edge",
1312            "Reduce edge fan-out or raise MEMORY_LIMIT before inserting more graph edges.",
1313        )?;
1314
1315        match self
1316            .graph
1317            .insert_edge(tx, source, target, edge_type, properties)
1318        {
1319            Ok(inserted) => {
1320                if !inserted {
1321                    self.accountant.release(bytes);
1322                }
1323                Ok(inserted)
1324            }
1325            Err(err) => {
1326                self.accountant.release(bytes);
1327                Err(err)
1328            }
1329        }
1330    }
1331
1332    pub fn delete_edge(
1333        &self,
1334        tx: TxId,
1335        source: NodeId,
1336        target: NodeId,
1337        edge_type: &str,
1338    ) -> Result<()> {
1339        self.graph.delete_edge(tx, source, target, edge_type)
1340    }
1341
1342    pub fn query_bfs(
1343        &self,
1344        start: NodeId,
1345        edge_types: Option<&[EdgeType]>,
1346        direction: Direction,
1347        max_depth: u32,
1348        snapshot: SnapshotId,
1349    ) -> Result<TraversalResult> {
1350        self.graph
1351            .bfs(start, edge_types, direction, 1, max_depth, snapshot)
1352    }
1353
1354    pub fn edge_count(
1355        &self,
1356        source: NodeId,
1357        edge_type: &str,
1358        snapshot: SnapshotId,
1359    ) -> Result<usize> {
1360        Ok(self.graph.edge_count(source, edge_type, snapshot))
1361    }
1362
1363    pub fn get_edge_properties(
1364        &self,
1365        source: NodeId,
1366        target: NodeId,
1367        edge_type: &str,
1368        snapshot: SnapshotId,
1369    ) -> Result<Option<HashMap<String, Value>>> {
1370        let props = self
1371            .graph_store
1372            .forward_adj
1373            .read()
1374            .get(&source)
1375            .and_then(|entries| {
1376                entries
1377                    .iter()
1378                    .rev()
1379                    .find(|entry| {
1380                        entry.target == target
1381                            && entry.edge_type == edge_type
1382                            && entry.visible_at(snapshot)
1383                    })
1384                    .map(|entry| entry.properties.clone())
1385            });
1386        Ok(props)
1387    }
1388
1389    pub fn insert_vector(&self, tx: TxId, row_id: RowId, vector: Vec<f32>) -> Result<()> {
1390        let bytes = estimate_vector_bytes(&vector);
1391        self.accountant.try_allocate_for(
1392            bytes,
1393            "insert",
1394            "vector_insert",
1395            "Reduce vector dimensionality, insert fewer rows, or raise MEMORY_LIMIT.",
1396        )?;
1397        self.vector
1398            .insert_vector(tx, row_id, vector)
1399            .inspect_err(|_| self.accountant.release(bytes))
1400    }
1401
1402    pub fn delete_vector(&self, tx: TxId, row_id: RowId) -> Result<()> {
1403        self.vector.delete_vector(tx, row_id)
1404    }
1405
1406    pub fn query_vector(
1407        &self,
1408        query: &[f32],
1409        k: usize,
1410        candidates: Option<&RoaringTreemap>,
1411        snapshot: SnapshotId,
1412    ) -> Result<Vec<(RowId, f32)>> {
1413        self.vector.search(query, k, candidates, snapshot)
1414    }
1415
1416    pub fn has_live_vector(&self, row_id: RowId, snapshot: SnapshotId) -> bool {
1417        self.vector_store
1418            .vectors
1419            .read()
1420            .iter()
1421            .any(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
1422    }
1423
1424    pub fn live_vector_entry(&self, row_id: RowId, snapshot: SnapshotId) -> Option<VectorEntry> {
1425        self.vector_store
1426            .vectors
1427            .read()
1428            .iter()
1429            .rev()
1430            .find(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
1431            .cloned()
1432    }
1433
1434    pub(crate) fn drop_table_aux_state(&self, table: &str) {
1435        let snapshot = self.snapshot();
1436        let rows = self.scan(table, snapshot).unwrap_or_default();
1437        let row_ids: HashSet<RowId> = rows.iter().map(|row| row.row_id).collect();
1438        let edge_keys: HashSet<(NodeId, EdgeType, NodeId)> = rows
1439            .iter()
1440            .filter_map(|row| {
1441                match (
1442                    row.values.get("source_id").and_then(Value::as_uuid),
1443                    row.values.get("target_id").and_then(Value::as_uuid),
1444                    row.values.get("edge_type").and_then(Value::as_text),
1445                ) {
1446                    (Some(source), Some(target), Some(edge_type)) => {
1447                        Some((*source, edge_type.to_string(), *target))
1448                    }
1449                    _ => None,
1450                }
1451            })
1452            .collect();
1453
1454        if !row_ids.is_empty() {
1455            let mut vectors = self.vector_store.vectors.write();
1456            vectors.retain(|entry| !row_ids.contains(&entry.row_id));
1457            self.vector_store.clear_hnsw(self.accountant());
1458        }
1459
1460        if !edge_keys.is_empty() {
1461            {
1462                let mut forward = self.graph_store.forward_adj.write();
1463                for entries in forward.values_mut() {
1464                    entries.retain(|entry| {
1465                        !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
1466                    });
1467                }
1468                forward.retain(|_, entries| !entries.is_empty());
1469            }
1470            {
1471                let mut reverse = self.graph_store.reverse_adj.write();
1472                for entries in reverse.values_mut() {
1473                    entries.retain(|entry| {
1474                        !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
1475                    });
1476                }
1477                reverse.retain(|_, entries| !entries.is_empty());
1478            }
1479        }
1480    }
1481
1482    pub fn table_names(&self) -> Vec<String> {
1483        self.relational_store.table_names()
1484    }
1485
1486    pub fn table_meta(&self, table: &str) -> Option<TableMeta> {
1487        self.relational_store.table_meta(table)
1488    }
1489
1490    /// Run one pruning cycle. Called by the background loop or manually in tests.
1491    pub fn run_pruning_cycle(&self) -> u64 {
1492        let _guard = self.pruning_guard.lock();
1493        prune_expired_rows(
1494            &self.relational_store,
1495            &self.graph_store,
1496            &self.vector_store,
1497            self.accountant(),
1498            self.persistence.as_ref(),
1499            self.sync_watermark(),
1500        )
1501    }
1502
1503    /// Set the pruning loop interval. Test-only API.
1504    pub fn set_pruning_interval(&self, interval: Duration) {
1505        self.stop_pruning_thread();
1506
1507        let shutdown = Arc::new(AtomicBool::new(false));
1508        let relational = self.relational_store.clone();
1509        let graph = self.graph_store.clone();
1510        let vector = self.vector_store.clone();
1511        let accountant = self.accountant.clone();
1512        let persistence = self.persistence.clone();
1513        let sync_watermark = self.sync_watermark.clone();
1514        let pruning_guard = self.pruning_guard.clone();
1515        let thread_shutdown = shutdown.clone();
1516
1517        let handle = thread::spawn(move || {
1518            while !thread_shutdown.load(Ordering::SeqCst) {
1519                {
1520                    let _guard = pruning_guard.lock();
1521                    let _ = prune_expired_rows(
1522                        &relational,
1523                        &graph,
1524                        &vector,
1525                        accountant.as_ref(),
1526                        persistence.as_ref(),
1527                        sync_watermark.load(Ordering::SeqCst),
1528                    );
1529                }
1530                sleep_with_shutdown(&thread_shutdown, interval);
1531            }
1532        });
1533
1534        let mut runtime = self.pruning_runtime.lock();
1535        runtime.shutdown = shutdown;
1536        runtime.handle = Some(handle);
1537    }
1538
1539    pub fn sync_watermark(&self) -> u64 {
1540        self.sync_watermark.load(Ordering::SeqCst)
1541    }
1542
1543    pub fn set_sync_watermark(&self, watermark: u64) {
1544        self.sync_watermark.store(watermark, Ordering::SeqCst);
1545    }
1546
1547    pub fn instance_id(&self) -> uuid::Uuid {
1548        self.instance_id
1549    }
1550
1551    pub fn open_memory_with_plugin_and_accountant(
1552        plugin: Arc<dyn DatabasePlugin>,
1553        accountant: Arc<MemoryAccountant>,
1554    ) -> Result<Self> {
1555        Self::open_memory_internal(plugin, accountant)
1556    }
1557
1558    pub fn open_memory_with_plugin(plugin: Arc<dyn DatabasePlugin>) -> Result<Self> {
1559        let db = Self::open_memory_with_plugin_and_accountant(
1560            plugin,
1561            Arc::new(MemoryAccountant::no_limit()),
1562        )?;
1563        db.plugin.on_open()?;
1564        Ok(db)
1565    }
1566
1567    pub fn close(&self) -> Result<()> {
1568        if self.closed.swap(true, Ordering::SeqCst) {
1569            return Ok(());
1570        }
1571        let tx = self.session_tx.lock().take();
1572        if let Some(tx) = tx {
1573            self.rollback(tx)?;
1574        }
1575        self.stop_pruning_thread();
1576        self.subscriptions.lock().subscribers.clear();
1577        if let Some(persistence) = &self.persistence {
1578            persistence.close();
1579        }
1580        self.plugin.on_close()
1581    }
1582
1583    /// File-backed database with custom plugin.
1584    pub fn open_with_plugin(
1585        path: impl AsRef<Path>,
1586        plugin: Arc<dyn DatabasePlugin>,
1587    ) -> Result<Self> {
1588        let db = Self::open_loaded(path, plugin, Arc::new(MemoryAccountant::no_limit()), None)?;
1589        db.plugin.on_open()?;
1590        Ok(db)
1591    }
1592
1593    /// Full constructor with budget.
1594    pub fn open_with_config(
1595        path: impl AsRef<Path>,
1596        plugin: Arc<dyn DatabasePlugin>,
1597        accountant: Arc<MemoryAccountant>,
1598    ) -> Result<Self> {
1599        Self::open_with_config_and_disk_limit(path, plugin, accountant, None)
1600    }
1601
1602    pub fn open_with_config_and_disk_limit(
1603        path: impl AsRef<Path>,
1604        plugin: Arc<dyn DatabasePlugin>,
1605        accountant: Arc<MemoryAccountant>,
1606        startup_disk_limit: Option<u64>,
1607    ) -> Result<Self> {
1608        let path = path.as_ref();
1609        if path.as_os_str() == ":memory:" {
1610            return Self::open_memory_with_plugin_and_accountant(plugin, accountant);
1611        }
1612        let accountant = if let Some(limit) = persisted_memory_limit(path)? {
1613            let usage = accountant.usage();
1614            if usage.limit.is_none() && usage.startup_ceiling.is_none() {
1615                Arc::new(MemoryAccountant::with_budget(limit))
1616            } else {
1617                accountant
1618            }
1619        } else {
1620            accountant
1621        };
1622        let db = Self::open_loaded(path, plugin, accountant, startup_disk_limit)?;
1623        db.plugin.on_open()?;
1624        Ok(db)
1625    }
1626
1627    /// In-memory database with budget.
1628    pub fn open_memory_with_accountant(accountant: Arc<MemoryAccountant>) -> Self {
1629        Self::open_memory_internal(Arc::new(CorePlugin), accountant)
1630            .expect("failed to open in-memory database with accountant")
1631    }
1632
1633    /// Access the memory accountant.
1634    pub fn accountant(&self) -> &MemoryAccountant {
1635        &self.accountant
1636    }
1637
1638    fn account_loaded_state(&self) -> Result<()> {
1639        let metadata_bytes = self
1640            .relational_store
1641            .table_meta
1642            .read()
1643            .values()
1644            .fold(0usize, |acc, meta| {
1645                acc.saturating_add(meta.estimated_bytes())
1646            });
1647        self.accountant.try_allocate_for(
1648            metadata_bytes,
1649            "open",
1650            "load_table_metadata",
1651            "Open the database with a larger MEMORY_LIMIT or reduce stored schema metadata.",
1652        )?;
1653
1654        let row_bytes =
1655            self.relational_store
1656                .tables
1657                .read()
1658                .iter()
1659                .fold(0usize, |acc, (table, rows)| {
1660                    let meta = self.table_meta(table);
1661                    acc.saturating_add(rows.iter().fold(0usize, |inner, row| {
1662                        inner.saturating_add(meta.as_ref().map_or_else(
1663                            || row.estimated_bytes(),
1664                            |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
1665                        ))
1666                    }))
1667                });
1668        self.accountant.try_allocate_for(
1669            row_bytes,
1670            "open",
1671            "load_rows",
1672            "Open the database with a larger MEMORY_LIMIT or prune retained rows first.",
1673        )?;
1674
1675        let edge_bytes = self
1676            .graph_store
1677            .forward_adj
1678            .read()
1679            .values()
1680            .flatten()
1681            .filter(|edge| edge.deleted_tx.is_none())
1682            .fold(0usize, |acc, edge| {
1683                acc.saturating_add(edge.estimated_bytes())
1684            });
1685        self.accountant.try_allocate_for(
1686            edge_bytes,
1687            "open",
1688            "load_edges",
1689            "Open the database with a larger MEMORY_LIMIT or reduce graph edge volume.",
1690        )?;
1691
1692        let vector_bytes = self
1693            .vector_store
1694            .vectors
1695            .read()
1696            .iter()
1697            .filter(|entry| entry.deleted_tx.is_none())
1698            .fold(0usize, |acc, entry| {
1699                acc.saturating_add(entry.estimated_bytes())
1700            });
1701        self.accountant.try_allocate_for(
1702            vector_bytes,
1703            "open",
1704            "load_vectors",
1705            "Open the database with a larger MEMORY_LIMIT or reduce stored vector data.",
1706        )?;
1707
1708        Ok(())
1709    }
1710
1711    fn release_insert_allocations(&self, ws: &contextdb_tx::WriteSet) {
1712        for (table, row) in &ws.relational_inserts {
1713            let bytes = self
1714                .table_meta(table)
1715                .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
1716                .unwrap_or_else(|| row.estimated_bytes());
1717            self.accountant.release(bytes);
1718        }
1719
1720        for edge in &ws.adj_inserts {
1721            self.accountant.release(edge.estimated_bytes());
1722        }
1723
1724        for entry in &ws.vector_inserts {
1725            self.accountant.release(entry.estimated_bytes());
1726        }
1727    }
1728
1729    fn release_delete_allocations(&self, ws: &contextdb_tx::WriteSet) {
1730        for (table, row_id, _) in &ws.relational_deletes {
1731            if let Some(row) = self.find_row_by_id(table, *row_id) {
1732                let bytes = self
1733                    .table_meta(table)
1734                    .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
1735                    .unwrap_or_else(|| row.estimated_bytes());
1736                self.accountant.release(bytes);
1737            }
1738        }
1739
1740        for (source, edge_type, target, _) in &ws.adj_deletes {
1741            if let Some(edge) = self.find_edge(source, target, edge_type) {
1742                self.accountant.release(edge.estimated_bytes());
1743            }
1744        }
1745
1746        for (row_id, _) in &ws.vector_deletes {
1747            if let Some(vector) = self.find_vector_by_row_id(*row_id) {
1748                self.accountant.release(vector.estimated_bytes());
1749            }
1750        }
1751
1752        if !ws.vector_deletes.is_empty() {
1753            self.vector_store.clear_hnsw(self.accountant());
1754        }
1755    }
1756
1757    fn find_row_by_id(&self, table: &str, row_id: RowId) -> Option<VersionedRow> {
1758        self.relational_store
1759            .tables
1760            .read()
1761            .get(table)
1762            .and_then(|rows| rows.iter().find(|row| row.row_id == row_id))
1763            .cloned()
1764    }
1765
1766    fn find_vector_by_row_id(&self, row_id: RowId) -> Option<VectorEntry> {
1767        self.vector_store
1768            .vectors
1769            .read()
1770            .iter()
1771            .find(|entry| entry.row_id == row_id)
1772            .cloned()
1773    }
1774
1775    fn find_edge(&self, source: &NodeId, target: &NodeId, edge_type: &str) -> Option<AdjEntry> {
1776        self.graph_store
1777            .forward_adj
1778            .read()
1779            .get(source)
1780            .and_then(|entries| {
1781                entries
1782                    .iter()
1783                    .find(|entry| entry.target == *target && entry.edge_type == edge_type)
1784                    .cloned()
1785            })
1786    }
1787
1788    pub(crate) fn write_set_checkpoint(
1789        &self,
1790        tx: TxId,
1791    ) -> Result<(usize, usize, usize, usize, usize)> {
1792        self.tx_mgr.with_write_set(tx, |ws| {
1793            (
1794                ws.relational_inserts.len(),
1795                ws.relational_deletes.len(),
1796                ws.adj_inserts.len(),
1797                ws.vector_inserts.len(),
1798                ws.vector_deletes.len(),
1799            )
1800        })
1801    }
1802
1803    pub(crate) fn restore_write_set_checkpoint(
1804        &self,
1805        tx: TxId,
1806        checkpoint: (usize, usize, usize, usize, usize),
1807    ) -> Result<()> {
1808        self.tx_mgr.with_write_set(tx, |ws| {
1809            ws.relational_inserts.truncate(checkpoint.0);
1810            ws.relational_deletes.truncate(checkpoint.1);
1811            ws.adj_inserts.truncate(checkpoint.2);
1812            ws.vector_inserts.truncate(checkpoint.3);
1813            ws.vector_deletes.truncate(checkpoint.4);
1814        })
1815    }
1816
1817    /// Get a clone of the current conflict policies.
1818    pub fn conflict_policies(&self) -> ConflictPolicies {
1819        self.conflict_policies.read().clone()
1820    }
1821
1822    /// Set the default conflict policy.
1823    pub fn set_default_conflict_policy(&self, policy: ConflictPolicy) {
1824        self.conflict_policies.write().default = policy;
1825    }
1826
1827    /// Set a per-table conflict policy.
1828    pub fn set_table_conflict_policy(&self, table: &str, policy: ConflictPolicy) {
1829        self.conflict_policies
1830            .write()
1831            .per_table
1832            .insert(table.to_string(), policy);
1833    }
1834
1835    /// Remove a per-table conflict policy override.
1836    pub fn drop_table_conflict_policy(&self, table: &str) {
1837        self.conflict_policies.write().per_table.remove(table);
1838    }
1839
1840    pub fn plugin(&self) -> &dyn DatabasePlugin {
1841        self.plugin.as_ref()
1842    }
1843
1844    pub fn plugin_health(&self) -> PluginHealth {
1845        self.plugin.health()
1846    }
1847
1848    pub fn plugin_describe(&self) -> serde_json::Value {
1849        self.plugin.describe()
1850    }
1851
1852    pub(crate) fn graph(&self) -> &MemGraphExecutor<DynStore> {
1853        &self.graph
1854    }
1855
1856    pub(crate) fn relational_store(&self) -> &Arc<RelationalStore> {
1857        &self.relational_store
1858    }
1859
1860    pub(crate) fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
1861    where
1862        F: FnOnce(u64) -> R,
1863    {
1864        self.tx_mgr.allocate_ddl_lsn(f)
1865    }
1866
1867    pub(crate) fn with_commit_lock<F, R>(&self, f: F) -> R
1868    where
1869        F: FnOnce() -> R,
1870    {
1871        self.tx_mgr.with_commit_lock(f)
1872    }
1873
1874    pub(crate) fn log_create_table_ddl(&self, name: &str, meta: &TableMeta, lsn: u64) {
1875        let change = ddl_change_from_meta(name, meta);
1876        self.ddl_log.write().push((lsn, change.clone()));
1877        if let Some(persistence) = &self.persistence {
1878            let _ = persistence.append_ddl_log(lsn, &change);
1879        }
1880    }
1881
1882    pub(crate) fn log_drop_table_ddl(&self, name: &str, lsn: u64) {
1883        let change = DdlChange::DropTable {
1884            name: name.to_string(),
1885        };
1886        self.ddl_log.write().push((lsn, change.clone()));
1887        if let Some(persistence) = &self.persistence {
1888            let _ = persistence.append_ddl_log(lsn, &change);
1889        }
1890    }
1891
1892    pub(crate) fn log_alter_table_ddl(&self, name: &str, meta: &TableMeta, lsn: u64) {
1893        let change = DdlChange::AlterTable {
1894            name: name.to_string(),
1895            columns: meta
1896                .columns
1897                .iter()
1898                .map(|c| {
1899                    (
1900                        c.name.clone(),
1901                        sql_type_for_meta_column(c, &meta.propagation_rules),
1902                    )
1903                })
1904                .collect(),
1905            constraints: create_table_constraints_from_meta(meta),
1906        };
1907        self.ddl_log.write().push((lsn, change.clone()));
1908        if let Some(persistence) = &self.persistence {
1909            let _ = persistence.append_ddl_log(lsn, &change);
1910        }
1911    }
1912
1913    pub(crate) fn persist_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
1914        if let Some(persistence) = &self.persistence {
1915            persistence.flush_table_meta(name, meta)?;
1916        }
1917        Ok(())
1918    }
1919
1920    pub(crate) fn persist_memory_limit(&self, limit: Option<usize>) -> Result<()> {
1921        if let Some(persistence) = &self.persistence {
1922            match limit {
1923                Some(limit) => persistence.flush_config_value("memory_limit", &limit)?,
1924                None => persistence.remove_config_value("memory_limit")?,
1925            }
1926        }
1927        Ok(())
1928    }
1929
1930    pub fn set_disk_limit(&self, limit: Option<u64>) -> Result<()> {
1931        if self.persistence.is_none() {
1932            self.disk_limit.store(0, Ordering::SeqCst);
1933            return Ok(());
1934        }
1935
1936        let ceiling = self.disk_limit_startup_ceiling();
1937        if let Some(ceiling) = ceiling {
1938            match limit {
1939                Some(bytes) if bytes > ceiling => {
1940                    return Err(Error::Other(format!(
1941                        "disk limit {bytes} exceeds startup ceiling {ceiling}"
1942                    )));
1943                }
1944                None => {
1945                    return Err(Error::Other(
1946                        "cannot remove disk limit when a startup ceiling is set".to_string(),
1947                    ));
1948                }
1949                _ => {}
1950            }
1951        }
1952
1953        self.disk_limit.store(limit.unwrap_or(0), Ordering::SeqCst);
1954        Ok(())
1955    }
1956
1957    pub fn disk_limit(&self) -> Option<u64> {
1958        match self.disk_limit.load(Ordering::SeqCst) {
1959            0 => None,
1960            bytes => Some(bytes),
1961        }
1962    }
1963
1964    pub fn disk_limit_startup_ceiling(&self) -> Option<u64> {
1965        match self.disk_limit_startup_ceiling.load(Ordering::SeqCst) {
1966            0 => None,
1967            bytes => Some(bytes),
1968        }
1969    }
1970
1971    pub fn disk_file_size(&self) -> Option<u64> {
1972        self.persistence
1973            .as_ref()
1974            .map(|persistence| std::fs::metadata(persistence.path()).map(|meta| meta.len()))
1975            .transpose()
1976            .ok()
1977            .flatten()
1978    }
1979
1980    pub(crate) fn persist_disk_limit(&self, limit: Option<u64>) -> Result<()> {
1981        if let Some(persistence) = &self.persistence {
1982            match limit {
1983                Some(limit) => persistence.flush_config_value("disk_limit", &limit)?,
1984                None => persistence.remove_config_value("disk_limit")?,
1985            }
1986        }
1987        Ok(())
1988    }
1989
1990    pub fn check_disk_budget(&self, operation: &str) -> Result<()> {
1991        let Some(limit) = self.disk_limit() else {
1992            return Ok(());
1993        };
1994        let Some(current_bytes) = self.disk_file_size() else {
1995            return Ok(());
1996        };
1997        if current_bytes < limit {
1998            return Ok(());
1999        }
2000        Err(Error::DiskBudgetExceeded {
2001            operation: operation.to_string(),
2002            current_bytes,
2003            budget_limit_bytes: limit,
2004            hint: "Reduce retained file-backed data or raise DISK_LIMIT before writing more data."
2005                .to_string(),
2006        })
2007    }
2008
2009    pub fn persisted_sync_watermarks(&self, tenant_id: &str) -> Result<(u64, u64)> {
2010        let Some(persistence) = &self.persistence else {
2011            return Ok((0, 0));
2012        };
2013        let push = persistence
2014            .load_config_value::<u64>(&format!("sync_push_watermark:{tenant_id}"))?
2015            .unwrap_or(0);
2016        let pull = persistence
2017            .load_config_value::<u64>(&format!("sync_pull_watermark:{tenant_id}"))?
2018            .unwrap_or(0);
2019        Ok((push, pull))
2020    }
2021
2022    pub fn persist_sync_push_watermark(&self, tenant_id: &str, watermark: u64) -> Result<()> {
2023        if let Some(persistence) = &self.persistence {
2024            persistence
2025                .flush_config_value(&format!("sync_push_watermark:{tenant_id}"), &watermark)?;
2026        }
2027        Ok(())
2028    }
2029
2030    pub fn persist_sync_pull_watermark(&self, tenant_id: &str, watermark: u64) -> Result<()> {
2031        if let Some(persistence) = &self.persistence {
2032            persistence
2033                .flush_config_value(&format!("sync_pull_watermark:{tenant_id}"), &watermark)?;
2034        }
2035        Ok(())
2036    }
2037
2038    pub(crate) fn persist_table_rows(&self, name: &str) -> Result<()> {
2039        if let Some(persistence) = &self.persistence {
2040            let tables = self.relational_store.tables.read();
2041            if let Some(rows) = tables.get(name) {
2042                persistence.rewrite_table_rows(name, rows)?;
2043            }
2044        }
2045        Ok(())
2046    }
2047
2048    pub(crate) fn remove_persisted_table(&self, name: &str) -> Result<()> {
2049        if let Some(persistence) = &self.persistence {
2050            persistence.remove_table_meta(name)?;
2051            persistence.remove_table_data(name)?;
2052        }
2053        Ok(())
2054    }
2055
2056    pub fn change_log_since(&self, since_lsn: u64) -> Vec<ChangeLogEntry> {
2057        let log = self.change_log.read();
2058        let start = log.partition_point(|e| e.lsn() <= since_lsn);
2059        log[start..].to_vec()
2060    }
2061
2062    pub fn ddl_log_since(&self, since_lsn: u64) -> Vec<DdlChange> {
2063        let ddl = self.ddl_log.read();
2064        let start = ddl.partition_point(|(lsn, _)| *lsn <= since_lsn);
2065        ddl[start..].iter().map(|(_, c)| c.clone()).collect()
2066    }
2067
2068    /// Builds a complete snapshot of all live data as a ChangeSet.
2069    /// Used as fallback when change_log/ddl_log cannot serve a watermark.
2070    #[allow(dead_code)]
2071    fn full_state_snapshot(&self) -> ChangeSet {
2072        let mut rows = Vec::new();
2073        let mut edges = Vec::new();
2074        let mut vectors = Vec::new();
2075        let mut ddl = Vec::new();
2076
2077        let meta_guard = self.relational_store.table_meta.read();
2078        let tables_guard = self.relational_store.tables.read();
2079
2080        // DDL
2081        for (name, meta) in meta_guard.iter() {
2082            ddl.push(ddl_change_from_meta(name, meta));
2083        }
2084
2085        // Rows (live only) — collect row_ids that have live rows for orphan vector filtering
2086        let mut live_row_ids: HashSet<RowId> = HashSet::new();
2087        for (table_name, table_rows) in tables_guard.iter() {
2088            let meta = match meta_guard.get(table_name) {
2089                Some(m) => m,
2090                None => continue,
2091            };
2092            let key_col = meta.natural_key_column.clone().unwrap_or_else(|| {
2093                if meta
2094                    .columns
2095                    .iter()
2096                    .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
2097                {
2098                    "id".to_string()
2099                } else {
2100                    String::new()
2101                }
2102            });
2103            if key_col.is_empty() {
2104                continue;
2105            }
2106            for row in table_rows.iter().filter(|r| r.deleted_tx.is_none()) {
2107                let key_val = match row.values.get(&key_col) {
2108                    Some(v) => v.clone(),
2109                    None => continue,
2110                };
2111                live_row_ids.insert(row.row_id);
2112                rows.push(RowChange {
2113                    table: table_name.clone(),
2114                    natural_key: NaturalKey {
2115                        column: key_col.clone(),
2116                        value: key_val,
2117                    },
2118                    values: row.values.clone(),
2119                    deleted: false,
2120                    lsn: row.lsn,
2121                });
2122            }
2123        }
2124
2125        drop(tables_guard);
2126        drop(meta_guard);
2127
2128        // Edges (live only)
2129        let fwd = self.graph_store.forward_adj.read();
2130        for (_source, entries) in fwd.iter() {
2131            for entry in entries.iter().filter(|e| e.deleted_tx.is_none()) {
2132                edges.push(EdgeChange {
2133                    source: entry.source,
2134                    target: entry.target,
2135                    edge_type: entry.edge_type.clone(),
2136                    properties: entry.properties.clone(),
2137                    lsn: entry.lsn,
2138                });
2139            }
2140        }
2141        drop(fwd);
2142
2143        // Vectors (live only, skip orphans, first entry per row_id)
2144        let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
2145        let vecs = self.vector_store.vectors.read();
2146        for entry in vecs.iter().filter(|v| v.deleted_tx.is_none()) {
2147            if !live_row_ids.contains(&entry.row_id) {
2148                continue; // skip orphan vectors
2149            }
2150            if !seen_vector_rows.insert(entry.row_id) {
2151                continue; // first live entry per row_id only
2152            }
2153            vectors.push(VectorChange {
2154                row_id: entry.row_id,
2155                vector: entry.vector.clone(),
2156                lsn: entry.lsn,
2157            });
2158        }
2159        drop(vecs);
2160
2161        ChangeSet {
2162            rows,
2163            edges,
2164            vectors,
2165            ddl,
2166        }
2167    }
2168
2169    fn persisted_state_since(&self, since_lsn: u64) -> ChangeSet {
2170        if since_lsn == 0 {
2171            return self.full_state_snapshot();
2172        }
2173
2174        let mut rows = Vec::new();
2175        let mut edges = Vec::new();
2176        let mut vectors = Vec::new();
2177        let ddl = Vec::new();
2178
2179        let meta_guard = self.relational_store.table_meta.read();
2180        let tables_guard = self.relational_store.tables.read();
2181
2182        let mut live_row_ids: HashSet<RowId> = HashSet::new();
2183        for (table_name, table_rows) in tables_guard.iter() {
2184            let meta = match meta_guard.get(table_name) {
2185                Some(meta) => meta,
2186                None => continue,
2187            };
2188            let key_col = meta.natural_key_column.clone().unwrap_or_else(|| {
2189                if meta
2190                    .columns
2191                    .iter()
2192                    .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
2193                {
2194                    "id".to_string()
2195                } else {
2196                    String::new()
2197                }
2198            });
2199            if key_col.is_empty() {
2200                continue;
2201            }
2202            for row in table_rows.iter().filter(|row| row.deleted_tx.is_none()) {
2203                live_row_ids.insert(row.row_id);
2204                if row.lsn <= since_lsn {
2205                    continue;
2206                }
2207                let key_val = match row.values.get(&key_col) {
2208                    Some(value) => value.clone(),
2209                    None => continue,
2210                };
2211                rows.push(RowChange {
2212                    table: table_name.clone(),
2213                    natural_key: NaturalKey {
2214                        column: key_col.clone(),
2215                        value: key_val,
2216                    },
2217                    values: row.values.clone(),
2218                    deleted: false,
2219                    lsn: row.lsn,
2220                });
2221            }
2222        }
2223        drop(tables_guard);
2224        drop(meta_guard);
2225
2226        let fwd = self.graph_store.forward_adj.read();
2227        for entries in fwd.values() {
2228            for entry in entries
2229                .iter()
2230                .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
2231            {
2232                edges.push(EdgeChange {
2233                    source: entry.source,
2234                    target: entry.target,
2235                    edge_type: entry.edge_type.clone(),
2236                    properties: entry.properties.clone(),
2237                    lsn: entry.lsn,
2238                });
2239            }
2240        }
2241        drop(fwd);
2242
2243        let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
2244        let vecs = self.vector_store.vectors.read();
2245        for entry in vecs
2246            .iter()
2247            .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
2248        {
2249            if !live_row_ids.contains(&entry.row_id) {
2250                continue;
2251            }
2252            if !seen_vector_rows.insert(entry.row_id) {
2253                continue;
2254            }
2255            vectors.push(VectorChange {
2256                row_id: entry.row_id,
2257                vector: entry.vector.clone(),
2258                lsn: entry.lsn,
2259            });
2260        }
2261        drop(vecs);
2262
2263        ChangeSet {
2264            rows,
2265            edges,
2266            vectors,
2267            ddl,
2268        }
2269    }
2270
2271    fn preflight_sync_apply_memory(
2272        &self,
2273        changes: &ChangeSet,
2274        policies: &ConflictPolicies,
2275    ) -> Result<()> {
2276        let usage = self.accountant.usage();
2277        let Some(limit) = usage.limit else {
2278            return Ok(());
2279        };
2280        let available = usage.available.unwrap_or(limit);
2281        let mut required = 0usize;
2282
2283        for row in &changes.rows {
2284            if row.deleted || row.values.is_empty() {
2285                continue;
2286            }
2287
2288            let policy = policies
2289                .per_table
2290                .get(&row.table)
2291                .copied()
2292                .unwrap_or(policies.default);
2293            let existing = self.point_lookup(
2294                &row.table,
2295                &row.natural_key.column,
2296                &row.natural_key.value,
2297                self.snapshot(),
2298            )?;
2299
2300            if existing.is_some()
2301                && matches!(
2302                    policy,
2303                    ConflictPolicy::InsertIfNotExists | ConflictPolicy::ServerWins
2304                )
2305            {
2306                continue;
2307            }
2308
2309            required = required.saturating_add(
2310                self.table_meta(&row.table)
2311                    .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
2312                    .unwrap_or_else(|| estimate_row_value_bytes(&row.values)),
2313            );
2314        }
2315
2316        for edge in &changes.edges {
2317            required = required.saturating_add(
2318                96 + edge.edge_type.len().saturating_mul(16)
2319                    + estimate_row_value_bytes(&edge.properties),
2320            );
2321        }
2322
2323        for vector in &changes.vectors {
2324            if vector.vector.is_empty() {
2325                continue;
2326            }
2327            required = required.saturating_add(
2328                24 + vector
2329                    .vector
2330                    .len()
2331                    .saturating_mul(std::mem::size_of::<f32>()),
2332            );
2333        }
2334
2335        if required > available {
2336            return Err(Error::MemoryBudgetExceeded {
2337                subsystem: "sync".to_string(),
2338                operation: "apply_changes".to_string(),
2339                requested_bytes: required,
2340                available_bytes: available,
2341                budget_limit_bytes: limit,
2342                hint:
2343                    "Reduce sync batch size, split the push, or raise MEMORY_LIMIT on the server."
2344                        .to_string(),
2345            });
2346        }
2347
2348        Ok(())
2349    }
2350
2351    /// Extracts changes from this database since the given LSN.
2352    pub fn changes_since(&self, since_lsn: u64) -> ChangeSet {
2353        // Future watermark guard
2354        if since_lsn > self.current_lsn() {
2355            return ChangeSet::default();
2356        }
2357
2358        // Check if the ephemeral logs can serve the requested watermark.
2359        // After restart, both logs are empty but stores may have data — fall back to snapshot.
2360        let log = self.change_log.read();
2361        let change_first_lsn = log.first().map(|e| e.lsn());
2362        let change_log_empty = log.is_empty();
2363        drop(log);
2364
2365        let ddl = self.ddl_log.read();
2366        let ddl_first_lsn = ddl.first().map(|(lsn, _)| *lsn);
2367        let ddl_log_empty = ddl.is_empty();
2368        drop(ddl);
2369
2370        let has_table_data = !self
2371            .relational_store
2372            .tables
2373            .read()
2374            .values()
2375            .all(|rows| rows.is_empty());
2376        let has_table_meta = !self.relational_store.table_meta.read().is_empty();
2377
2378        // If both logs are empty but stores have data → post-restart, derive deltas from
2379        // persisted row/edge/vector LSNs instead of replaying a full snapshot.
2380        if change_log_empty && ddl_log_empty && (has_table_data || has_table_meta) {
2381            return self.persisted_state_since(since_lsn);
2382        }
2383
2384        // If logs have entries, check the minimum first-LSN across both covers since_lsn
2385        let min_first_lsn = match (change_first_lsn, ddl_first_lsn) {
2386            (Some(c), Some(d)) => Some(c.min(d)),
2387            (Some(c), None) => Some(c),
2388            (None, Some(d)) => Some(d),
2389            (None, None) => None, // both empty, stores empty — nothing to serve
2390        };
2391
2392        if min_first_lsn.is_some_and(|min_lsn| min_lsn > since_lsn + 1) {
2393            // Log doesn't cover since_lsn — derive the delta from persisted state.
2394            return self.persisted_state_since(since_lsn);
2395        }
2396
2397        let (ddl, change_entries) = self.with_commit_lock(|| {
2398            let ddl = self.ddl_log_since(since_lsn);
2399            let changes = self.change_log_since(since_lsn);
2400            (ddl, changes)
2401        });
2402
2403        let mut rows = Vec::new();
2404        let mut edges = Vec::new();
2405        let mut vectors = Vec::new();
2406
2407        for entry in change_entries {
2408            match entry {
2409                ChangeLogEntry::RowInsert { table, row_id, lsn } => {
2410                    if let Some((natural_key, values)) = self.row_change_values(&table, row_id) {
2411                        rows.push(RowChange {
2412                            table,
2413                            natural_key,
2414                            values,
2415                            deleted: false,
2416                            lsn,
2417                        });
2418                    }
2419                }
2420                ChangeLogEntry::RowDelete {
2421                    table,
2422                    natural_key,
2423                    lsn,
2424                    ..
2425                } => {
2426                    let mut values = HashMap::new();
2427                    values.insert("__deleted".to_string(), Value::Bool(true));
2428                    rows.push(RowChange {
2429                        table,
2430                        natural_key,
2431                        values,
2432                        deleted: true,
2433                        lsn,
2434                    });
2435                }
2436                ChangeLogEntry::EdgeInsert {
2437                    source,
2438                    target,
2439                    edge_type,
2440                    lsn,
2441                } => {
2442                    let properties = self
2443                        .edge_properties(source, target, &edge_type, lsn)
2444                        .unwrap_or_default();
2445                    edges.push(EdgeChange {
2446                        source,
2447                        target,
2448                        edge_type,
2449                        properties,
2450                        lsn,
2451                    });
2452                }
2453                ChangeLogEntry::EdgeDelete {
2454                    source,
2455                    target,
2456                    edge_type,
2457                    lsn,
2458                } => {
2459                    let mut properties = HashMap::new();
2460                    properties.insert("__deleted".to_string(), Value::Bool(true));
2461                    edges.push(EdgeChange {
2462                        source,
2463                        target,
2464                        edge_type,
2465                        properties,
2466                        lsn,
2467                    });
2468                }
2469                ChangeLogEntry::VectorInsert { row_id, lsn } => {
2470                    if let Some(vector) = self.vector_for_row_lsn(row_id, lsn) {
2471                        vectors.push(VectorChange {
2472                            row_id,
2473                            vector,
2474                            lsn,
2475                        });
2476                    }
2477                }
2478                ChangeLogEntry::VectorDelete { row_id, lsn } => vectors.push(VectorChange {
2479                    row_id,
2480                    vector: Vec::new(),
2481                    lsn,
2482                }),
2483            }
2484        }
2485
2486        // Deduplicate upserts: when a RowDelete is followed by a RowInsert for the same
2487        // (table, natural_key), the delete is part of an upsert — remove it.
2488        // Only remove a delete if there is a non-delete entry with a HIGHER LSN
2489        // (i.e., the insert came after the delete, indicating an upsert).
2490        // If the insert has a lower LSN, the delete is genuine and must be kept.
2491        let insert_max_lsn: HashMap<(String, String, String), u64> = {
2492            let mut map: HashMap<(String, String, String), u64> = HashMap::new();
2493            for r in rows.iter().filter(|r| !r.deleted) {
2494                let key = (
2495                    r.table.clone(),
2496                    r.natural_key.column.clone(),
2497                    format!("{:?}", r.natural_key.value),
2498                );
2499                let entry = map.entry(key).or_insert(0);
2500                if r.lsn > *entry {
2501                    *entry = r.lsn;
2502                }
2503            }
2504            map
2505        };
2506        rows.retain(|r| {
2507            if r.deleted {
2508                let key = (
2509                    r.table.clone(),
2510                    r.natural_key.column.clone(),
2511                    format!("{:?}", r.natural_key.value),
2512                );
2513                // Keep the delete unless there is a subsequent insert (higher or equal LSN).
2514                // Equal LSN means the delete+insert are part of the same upsert transaction.
2515                match insert_max_lsn.get(&key) {
2516                    Some(&insert_lsn) => insert_lsn < r.lsn,
2517                    None => true,
2518                }
2519            } else {
2520                true
2521            }
2522        });
2523
2524        ChangeSet {
2525            rows,
2526            edges,
2527            vectors,
2528            ddl,
2529        }
2530    }
2531
2532    /// Returns the current LSN of this database.
2533    pub fn current_lsn(&self) -> u64 {
2534        self.tx_mgr.current_lsn()
2535    }
2536
2537    /// Subscribe to commit events. Returns a receiver that yields a `CommitEvent`
2538    /// after each commit.
2539    pub fn subscribe(&self) -> Receiver<CommitEvent> {
2540        self.subscribe_with_capacity(DEFAULT_SUBSCRIPTION_CAPACITY)
2541    }
2542
2543    /// Subscribe with a custom channel capacity.
2544    pub fn subscribe_with_capacity(&self, capacity: usize) -> Receiver<CommitEvent> {
2545        let (tx, rx) = mpsc::sync_channel(capacity.max(1));
2546        self.subscriptions.lock().subscribers.push(tx);
2547        rx
2548    }
2549
2550    /// Returns health metrics for the subscription system.
2551    pub fn subscription_health(&self) -> SubscriptionMetrics {
2552        let subscriptions = self.subscriptions.lock();
2553        SubscriptionMetrics {
2554            active_channels: subscriptions.subscribers.len(),
2555            events_sent: subscriptions.events_sent,
2556            events_dropped: subscriptions.events_dropped,
2557        }
2558    }
2559
2560    /// Applies a ChangeSet to this database with the given conflict policies.
2561    pub fn apply_changes(
2562        &self,
2563        mut changes: ChangeSet,
2564        policies: &ConflictPolicies,
2565    ) -> Result<ApplyResult> {
2566        self.plugin.on_sync_pull(&mut changes)?;
2567        self.check_disk_budget("sync_pull")?;
2568        self.preflight_sync_apply_memory(&changes, policies)?;
2569
2570        let mut tx = self.begin();
2571        let mut result = ApplyResult {
2572            applied_rows: 0,
2573            skipped_rows: 0,
2574            conflicts: Vec::new(),
2575            new_lsn: self.current_lsn(),
2576        };
2577        let vector_row_ids = changes.vectors.iter().map(|v| v.row_id).collect::<Vec<_>>();
2578        let mut vector_row_map: HashMap<u64, u64> = HashMap::new();
2579        let mut vector_row_idx = 0usize;
2580        let mut failed_row_ids: HashSet<u64> = HashSet::new();
2581        let mut table_meta_cache: HashMap<String, Option<TableMeta>> = HashMap::new();
2582        let mut visible_rows_cache: HashMap<String, Vec<VersionedRow>> = HashMap::new();
2583
2584        for ddl in changes.ddl.clone() {
2585            match ddl {
2586                DdlChange::CreateTable {
2587                    name,
2588                    columns,
2589                    constraints,
2590                } => {
2591                    if self.table_meta(&name).is_some() {
2592                        if let Some(local_meta) = self.table_meta(&name) {
2593                            let local_cols: Vec<(String, String)> = local_meta
2594                                .columns
2595                                .iter()
2596                                .map(|c| {
2597                                    let ty = match c.column_type {
2598                                        ColumnType::Integer => "INTEGER".to_string(),
2599                                        ColumnType::Real => "REAL".to_string(),
2600                                        ColumnType::Text => "TEXT".to_string(),
2601                                        ColumnType::Boolean => "BOOLEAN".to_string(),
2602                                        ColumnType::Json => "JSON".to_string(),
2603                                        ColumnType::Uuid => "UUID".to_string(),
2604                                        ColumnType::Vector(dim) => format!("VECTOR({dim})"),
2605                                        ColumnType::Timestamp => "TIMESTAMP".to_string(),
2606                                    };
2607                                    (c.name.clone(), ty)
2608                                })
2609                                .collect();
2610                            let remote_cols: Vec<(String, String)> = columns
2611                                .iter()
2612                                .map(|(col_name, col_type)| {
2613                                    let base_type = col_type
2614                                        .split_whitespace()
2615                                        .next()
2616                                        .unwrap_or(col_type)
2617                                        .to_string();
2618                                    (col_name.clone(), base_type)
2619                                })
2620                                .collect();
2621                            let mut local_sorted = local_cols.clone();
2622                            local_sorted.sort();
2623                            let mut remote_sorted = remote_cols.clone();
2624                            remote_sorted.sort();
2625                            if local_sorted != remote_sorted {
2626                                result.conflicts.push(Conflict {
2627                                    natural_key: NaturalKey {
2628                                        column: "table".to_string(),
2629                                        value: Value::Text(name.clone()),
2630                                    },
2631                                    resolution: ConflictPolicy::ServerWins,
2632                                    reason: Some(format!(
2633                                        "schema mismatch: local columns {:?} differ from remote {:?}",
2634                                        local_cols, remote_cols
2635                                    )),
2636                                });
2637                            }
2638                        }
2639                        continue;
2640                    }
2641                    let mut sql = format!(
2642                        "CREATE TABLE {} ({})",
2643                        name,
2644                        columns
2645                            .iter()
2646                            .map(|(col, ty)| format!("{col} {ty}"))
2647                            .collect::<Vec<_>>()
2648                            .join(", ")
2649                    );
2650                    if !constraints.is_empty() {
2651                        sql.push(' ');
2652                        sql.push_str(&constraints.join(" "));
2653                    }
2654                    self.execute_in_tx(tx, &sql, &HashMap::new())?;
2655                    table_meta_cache.remove(&name);
2656                    visible_rows_cache.remove(&name);
2657                }
2658                DdlChange::DropTable { name } => {
2659                    if self.table_meta(&name).is_some() {
2660                        self.drop_table_aux_state(&name);
2661                        self.relational_store().drop_table(&name);
2662                        self.remove_persisted_table(&name)?;
2663                    }
2664                    table_meta_cache.remove(&name);
2665                    visible_rows_cache.remove(&name);
2666                }
2667                DdlChange::AlterTable {
2668                    name,
2669                    columns,
2670                    constraints,
2671                } => {
2672                    if self.table_meta(&name).is_none() {
2673                        continue;
2674                    }
2675                    self.relational_store().table_meta.write().remove(&name);
2676                    let mut sql = format!(
2677                        "CREATE TABLE {} ({})",
2678                        name,
2679                        columns
2680                            .iter()
2681                            .map(|(col, ty)| format!("{col} {ty}"))
2682                            .collect::<Vec<_>>()
2683                            .join(", ")
2684                    );
2685                    if !constraints.is_empty() {
2686                        sql.push(' ');
2687                        sql.push_str(&constraints.join(" "));
2688                    }
2689                    self.execute_in_tx(tx, &sql, &HashMap::new())?;
2690                    table_meta_cache.remove(&name);
2691                    visible_rows_cache.remove(&name);
2692                }
2693            }
2694        }
2695
2696        self.preflight_sync_apply_memory(&changes, policies)?;
2697
2698        for row in changes.rows {
2699            if row.values.is_empty() {
2700                result.skipped_rows += 1;
2701                self.commit_with_source(tx, CommitSource::SyncPull)?;
2702                tx = self.begin();
2703                continue;
2704            }
2705
2706            let policy = policies
2707                .per_table
2708                .get(&row.table)
2709                .copied()
2710                .unwrap_or(policies.default);
2711
2712            let existing = cached_point_lookup(
2713                self,
2714                &mut visible_rows_cache,
2715                &row.table,
2716                &row.natural_key.column,
2717                &row.natural_key.value,
2718            )?;
2719            let is_delete = row.deleted;
2720            let row_has_vector = cached_table_meta(self, &mut table_meta_cache, &row.table)
2721                .is_some_and(|meta| {
2722                    meta.columns
2723                        .iter()
2724                        .any(|col| matches!(col.column_type, ColumnType::Vector(_)))
2725                });
2726
2727            if is_delete {
2728                if let Some(local) = existing {
2729                    if row_has_vector
2730                        && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2731                    {
2732                        vector_row_map.insert(*remote_row_id, local.row_id);
2733                        vector_row_idx += 1;
2734                    }
2735                    if let Err(err) = self.delete_row(tx, &row.table, local.row_id) {
2736                        result.conflicts.push(Conflict {
2737                            natural_key: row.natural_key.clone(),
2738                            resolution: policy,
2739                            reason: Some(format!("delete failed: {err}")),
2740                        });
2741                        result.skipped_rows += 1;
2742                    } else {
2743                        remove_cached_row(&mut visible_rows_cache, &row.table, local.row_id);
2744                        result.applied_rows += 1;
2745                    }
2746                } else {
2747                    result.skipped_rows += 1;
2748                }
2749                self.commit_with_source(tx, CommitSource::SyncPull)?;
2750                tx = self.begin();
2751                continue;
2752            }
2753
2754            let mut values = row.values.clone();
2755            values.remove("__deleted");
2756
2757            match (existing, policy) {
2758                (None, _) => {
2759                    if let Some(meta) = cached_table_meta(self, &mut table_meta_cache, &row.table) {
2760                        let mut constraint_error: Option<String> = None;
2761
2762                        for col_def in &meta.columns {
2763                            if !col_def.nullable
2764                                && !col_def.primary_key
2765                                && col_def.default.is_none()
2766                            {
2767                                match values.get(&col_def.name) {
2768                                    None | Some(Value::Null) => {
2769                                        constraint_error = Some(format!(
2770                                            "NOT NULL constraint violated: {}.{}",
2771                                            row.table, col_def.name
2772                                        ));
2773                                        break;
2774                                    }
2775                                    _ => {}
2776                                }
2777                            }
2778                        }
2779
2780                        let has_unique = meta.columns.iter().any(|c| c.unique && !c.primary_key);
2781                        if constraint_error.is_none() && has_unique {
2782                            for col_def in &meta.columns {
2783                                if col_def.unique
2784                                    && !col_def.primary_key
2785                                    && let Some(new_val) = values.get(&col_def.name)
2786                                    && *new_val != Value::Null
2787                                    && cached_visible_rows(
2788                                        self,
2789                                        &mut visible_rows_cache,
2790                                        &row.table,
2791                                    )?
2792                                    .iter()
2793                                    .any(|r| r.values.get(&col_def.name) == Some(new_val))
2794                                {
2795                                    constraint_error = Some(format!(
2796                                        "UNIQUE constraint violated: {}.{}",
2797                                        row.table, col_def.name
2798                                    ));
2799                                    break;
2800                                }
2801                            }
2802                        }
2803
2804                        if let Some(err_msg) = constraint_error {
2805                            result.skipped_rows += 1;
2806                            if row_has_vector
2807                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2808                            {
2809                                failed_row_ids.insert(*remote_row_id);
2810                                vector_row_idx += 1;
2811                            }
2812                            result.conflicts.push(Conflict {
2813                                natural_key: row.natural_key.clone(),
2814                                resolution: policy,
2815                                reason: Some(err_msg),
2816                            });
2817                            self.commit_with_source(tx, CommitSource::SyncPull)?;
2818                            tx = self.begin();
2819                            continue;
2820                        }
2821                    }
2822
2823                    match self.insert_row(tx, &row.table, values.clone()) {
2824                        Ok(new_row_id) => {
2825                            record_cached_insert(
2826                                &mut visible_rows_cache,
2827                                &row.table,
2828                                VersionedRow {
2829                                    row_id: new_row_id,
2830                                    values: values.clone(),
2831                                    created_tx: tx,
2832                                    deleted_tx: None,
2833                                    lsn: row.lsn,
2834                                    created_at: None,
2835                                },
2836                            );
2837                            result.applied_rows += 1;
2838                            if row_has_vector
2839                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2840                            {
2841                                vector_row_map.insert(*remote_row_id, new_row_id);
2842                                vector_row_idx += 1;
2843                            }
2844                        }
2845                        Err(err) => {
2846                            if is_fatal_sync_apply_error(&err) {
2847                                return Err(err);
2848                            }
2849                            result.skipped_rows += 1;
2850                            if row_has_vector
2851                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2852                            {
2853                                failed_row_ids.insert(*remote_row_id);
2854                                vector_row_idx += 1;
2855                            }
2856                            result.conflicts.push(Conflict {
2857                                natural_key: row.natural_key.clone(),
2858                                resolution: policy,
2859                                reason: Some(format!("{err}")),
2860                            });
2861                        }
2862                    }
2863                }
2864                (Some(local), ConflictPolicy::InsertIfNotExists) => {
2865                    if row_has_vector
2866                        && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2867                    {
2868                        vector_row_map.insert(*remote_row_id, local.row_id);
2869                        vector_row_idx += 1;
2870                    }
2871                    result.skipped_rows += 1;
2872                }
2873                (Some(_), ConflictPolicy::ServerWins) => {
2874                    result.skipped_rows += 1;
2875                    if row_has_vector
2876                        && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2877                    {
2878                        failed_row_ids.insert(*remote_row_id);
2879                        vector_row_idx += 1;
2880                    }
2881                    result.conflicts.push(Conflict {
2882                        natural_key: row.natural_key.clone(),
2883                        resolution: ConflictPolicy::ServerWins,
2884                        reason: Some("server_wins".to_string()),
2885                    });
2886                }
2887                (Some(local), ConflictPolicy::LatestWins) => {
2888                    if row.lsn <= local.lsn {
2889                        result.skipped_rows += 1;
2890                        if row_has_vector
2891                            && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2892                        {
2893                            failed_row_ids.insert(*remote_row_id);
2894                            vector_row_idx += 1;
2895                        }
2896                        result.conflicts.push(Conflict {
2897                            natural_key: row.natural_key.clone(),
2898                            resolution: ConflictPolicy::LatestWins,
2899                            reason: Some("local_lsn_newer_or_equal".to_string()),
2900                        });
2901                    } else {
2902                        // State machine conflict detection
2903                        if let Some(meta) = self.table_meta(&row.table)
2904                            && let Some(sm) = &meta.state_machine
2905                        {
2906                            let sm_col = sm.column.clone();
2907                            let transitions = sm.transitions.clone();
2908                            let incoming_state = values.get(&sm_col).and_then(|v| match v {
2909                                Value::Text(s) => Some(s.clone()),
2910                                _ => None,
2911                            });
2912                            let local_state = local.values.get(&sm_col).and_then(|v| match v {
2913                                Value::Text(s) => Some(s.clone()),
2914                                _ => None,
2915                            });
2916
2917                            if let (Some(incoming), Some(current)) = (incoming_state, local_state) {
2918                                // Check if the transition from current to incoming is valid
2919                                let valid = transitions
2920                                    .get(&current)
2921                                    .is_some_and(|targets| targets.contains(&incoming));
2922                                if !valid && incoming != current {
2923                                    result.skipped_rows += 1;
2924                                    if row_has_vector
2925                                        && let Some(remote_row_id) =
2926                                            vector_row_ids.get(vector_row_idx)
2927                                    {
2928                                        failed_row_ids.insert(*remote_row_id);
2929                                        vector_row_idx += 1;
2930                                    }
2931                                    result.conflicts.push(Conflict {
2932                                        natural_key: row.natural_key.clone(),
2933                                        resolution: ConflictPolicy::LatestWins,
2934                                        reason: Some(format!(
2935                                            "state_machine: invalid transition {} -> {} (current: {})",
2936                                            current, incoming, current
2937                                        )),
2938                                    });
2939                                    self.commit_with_source(tx, CommitSource::SyncPull)?;
2940                                    tx = self.begin();
2941                                    continue;
2942                                }
2943                            }
2944                        }
2945
2946                        match self.upsert_row(
2947                            tx,
2948                            &row.table,
2949                            &row.natural_key.column,
2950                            values.clone(),
2951                        ) {
2952                            Ok(_) => {
2953                                visible_rows_cache.remove(&row.table);
2954                                result.applied_rows += 1;
2955                                if row_has_vector
2956                                    && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2957                                {
2958                                    if let Ok(Some(found)) = self.point_lookup_in_tx(
2959                                        tx,
2960                                        &row.table,
2961                                        &row.natural_key.column,
2962                                        &row.natural_key.value,
2963                                        self.snapshot(),
2964                                    ) {
2965                                        vector_row_map.insert(*remote_row_id, found.row_id);
2966                                    }
2967                                    vector_row_idx += 1;
2968                                }
2969                            }
2970                            Err(err) => {
2971                                if is_fatal_sync_apply_error(&err) {
2972                                    return Err(err);
2973                                }
2974                                result.skipped_rows += 1;
2975                                if row_has_vector
2976                                    && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2977                                {
2978                                    failed_row_ids.insert(*remote_row_id);
2979                                    vector_row_idx += 1;
2980                                }
2981                                result.conflicts.push(Conflict {
2982                                    natural_key: row.natural_key.clone(),
2983                                    resolution: ConflictPolicy::LatestWins,
2984                                    reason: Some(format!("state_machine_or_constraint: {err}")),
2985                                });
2986                            }
2987                        }
2988                    }
2989                }
2990                (Some(_), ConflictPolicy::EdgeWins) => {
2991                    result.conflicts.push(Conflict {
2992                        natural_key: row.natural_key.clone(),
2993                        resolution: ConflictPolicy::EdgeWins,
2994                        reason: Some("edge_wins".to_string()),
2995                    });
2996                    match self.upsert_row(tx, &row.table, &row.natural_key.column, values.clone()) {
2997                        Ok(_) => {
2998                            visible_rows_cache.remove(&row.table);
2999                            result.applied_rows += 1;
3000                            if row_has_vector
3001                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3002                            {
3003                                if let Ok(Some(found)) = self.point_lookup_in_tx(
3004                                    tx,
3005                                    &row.table,
3006                                    &row.natural_key.column,
3007                                    &row.natural_key.value,
3008                                    self.snapshot(),
3009                                ) {
3010                                    vector_row_map.insert(*remote_row_id, found.row_id);
3011                                }
3012                                vector_row_idx += 1;
3013                            }
3014                        }
3015                        Err(err) => {
3016                            if is_fatal_sync_apply_error(&err) {
3017                                return Err(err);
3018                            }
3019                            result.skipped_rows += 1;
3020                            if row_has_vector
3021                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3022                            {
3023                                failed_row_ids.insert(*remote_row_id);
3024                                vector_row_idx += 1;
3025                            }
3026                            if let Some(last) = result.conflicts.last_mut() {
3027                                last.reason = Some(format!("state_machine_or_constraint: {err}"));
3028                            }
3029                        }
3030                    }
3031                }
3032            }
3033
3034            self.commit_with_source(tx, CommitSource::SyncPull)?;
3035            tx = self.begin();
3036        }
3037
3038        for edge in changes.edges {
3039            let is_delete = matches!(edge.properties.get("__deleted"), Some(Value::Bool(true)));
3040            if is_delete {
3041                let _ = self.delete_edge(tx, edge.source, edge.target, &edge.edge_type);
3042            } else {
3043                let _ = self.insert_edge(
3044                    tx,
3045                    edge.source,
3046                    edge.target,
3047                    edge.edge_type,
3048                    edge.properties,
3049                );
3050            }
3051        }
3052
3053        for vector in changes.vectors {
3054            if failed_row_ids.contains(&vector.row_id) {
3055                continue; // skip vectors for rows that failed to insert
3056            }
3057            let local_row_id = vector_row_map
3058                .get(&vector.row_id)
3059                .copied()
3060                .unwrap_or(vector.row_id);
3061            if vector.vector.is_empty() {
3062                let _ = self.vector.delete_vector(tx, local_row_id);
3063            } else {
3064                if self.has_live_vector(local_row_id, self.snapshot()) {
3065                    let _ = self.delete_vector(tx, local_row_id);
3066                }
3067                if let Err(err) = self.insert_vector(tx, local_row_id, vector.vector)
3068                    && is_fatal_sync_apply_error(&err)
3069                {
3070                    return Err(err);
3071                }
3072            }
3073        }
3074
3075        self.commit_with_source(tx, CommitSource::SyncPull)?;
3076        result.new_lsn = self.current_lsn();
3077        Ok(result)
3078    }
3079
3080    fn row_change_values(
3081        &self,
3082        table: &str,
3083        row_id: RowId,
3084    ) -> Option<(NaturalKey, HashMap<String, Value>)> {
3085        let tables = self.relational_store.tables.read();
3086        let meta = self.relational_store.table_meta.read();
3087        let rows = tables.get(table)?;
3088        let row = rows.iter().find(|r| r.row_id == row_id)?;
3089        let key_col = meta
3090            .get(table)
3091            .and_then(|m| m.natural_key_column.clone())
3092            .or_else(|| {
3093                meta.get(table).and_then(|m| {
3094                    m.columns
3095                        .iter()
3096                        .find(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3097                        .map(|_| "id".to_string())
3098                })
3099            })?;
3100
3101        let key_val = row.values.get(&key_col)?.clone();
3102        let values = row
3103            .values
3104            .iter()
3105            .map(|(k, v)| (k.clone(), v.clone()))
3106            .collect::<HashMap<_, _>>();
3107        Some((
3108            NaturalKey {
3109                column: key_col,
3110                value: key_val,
3111            },
3112            values,
3113        ))
3114    }
3115
3116    fn edge_properties(
3117        &self,
3118        source: NodeId,
3119        target: NodeId,
3120        edge_type: &str,
3121        lsn: u64,
3122    ) -> Option<HashMap<String, Value>> {
3123        self.graph_store
3124            .forward_adj
3125            .read()
3126            .get(&source)
3127            .and_then(|entries| {
3128                entries
3129                    .iter()
3130                    .find(|e| e.target == target && e.edge_type == edge_type && e.lsn == lsn)
3131                    .map(|e| e.properties.clone())
3132            })
3133    }
3134
3135    fn vector_for_row_lsn(&self, row_id: RowId, lsn: u64) -> Option<Vec<f32>> {
3136        self.vector_store
3137            .vectors
3138            .read()
3139            .iter()
3140            .find(|v| v.row_id == row_id && v.lsn == lsn)
3141            .map(|v| v.vector.clone())
3142    }
3143}
3144
3145fn strip_internal_row_id(mut qr: QueryResult) -> QueryResult {
3146    if let Some(pos) = qr.columns.iter().position(|c| c == "row_id") {
3147        qr.columns.remove(pos);
3148        for row in &mut qr.rows {
3149            if pos < row.len() {
3150                row.remove(pos);
3151            }
3152        }
3153    }
3154    qr
3155}
3156
3157fn cached_table_meta(
3158    db: &Database,
3159    cache: &mut HashMap<String, Option<TableMeta>>,
3160    table: &str,
3161) -> Option<TableMeta> {
3162    cache
3163        .entry(table.to_string())
3164        .or_insert_with(|| db.table_meta(table))
3165        .clone()
3166}
3167
3168fn cached_visible_rows<'a>(
3169    db: &Database,
3170    cache: &'a mut HashMap<String, Vec<VersionedRow>>,
3171    table: &str,
3172) -> Result<&'a mut Vec<VersionedRow>> {
3173    if !cache.contains_key(table) {
3174        let rows = db.scan(table, db.snapshot())?;
3175        cache.insert(table.to_string(), rows);
3176    }
3177    Ok(cache.get_mut(table).expect("cached visible rows"))
3178}
3179
3180fn cached_point_lookup(
3181    db: &Database,
3182    cache: &mut HashMap<String, Vec<VersionedRow>>,
3183    table: &str,
3184    col: &str,
3185    value: &Value,
3186) -> Result<Option<VersionedRow>> {
3187    let rows = cached_visible_rows(db, cache, table)?;
3188    Ok(rows
3189        .iter()
3190        .find(|r| r.values.get(col) == Some(value))
3191        .cloned())
3192}
3193
3194fn record_cached_insert(
3195    cache: &mut HashMap<String, Vec<VersionedRow>>,
3196    table: &str,
3197    row: VersionedRow,
3198) {
3199    if let Some(rows) = cache.get_mut(table) {
3200        rows.push(row);
3201    }
3202}
3203
3204fn remove_cached_row(cache: &mut HashMap<String, Vec<VersionedRow>>, table: &str, row_id: RowId) {
3205    if let Some(rows) = cache.get_mut(table) {
3206        rows.retain(|row| row.row_id != row_id);
3207    }
3208}
3209
3210fn query_outcome_from_result(result: &Result<QueryResult>) -> QueryOutcome {
3211    match result {
3212        Ok(query_result) => QueryOutcome::Success {
3213            row_count: if query_result.rows.is_empty() {
3214                query_result.rows_affected as usize
3215            } else {
3216                query_result.rows.len()
3217            },
3218        },
3219        Err(error) => QueryOutcome::Error {
3220            error: error.to_string(),
3221        },
3222    }
3223}
3224
3225fn maybe_prebuild_hnsw(vector_store: &VectorStore, accountant: &MemoryAccountant) {
3226    if vector_store.vector_count() >= 1000 {
3227        let entries = vector_store.all_entries();
3228        let dim = vector_store.dimension().unwrap_or(0);
3229        let estimated_bytes = estimate_hnsw_index_bytes(entries.len(), dim);
3230        if accountant
3231            .try_allocate_for(
3232                estimated_bytes,
3233                "vector_index",
3234                "prebuild_hnsw",
3235                "Open the database with a larger MEMORY_LIMIT to enable HNSW indexing.",
3236            )
3237            .is_err()
3238        {
3239            return;
3240        }
3241
3242        let hnsw_opt = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
3243            HnswIndex::new(&entries, dim)
3244        }))
3245        .ok();
3246        if hnsw_opt.is_some() {
3247            vector_store.set_hnsw_bytes(estimated_bytes);
3248        } else {
3249            accountant.release(estimated_bytes);
3250        }
3251        let _ = vector_store.hnsw.set(RwLock::new(hnsw_opt));
3252    }
3253}
3254
3255fn estimate_row_bytes_for_meta(
3256    values: &HashMap<ColName, Value>,
3257    meta: &TableMeta,
3258    include_vectors: bool,
3259) -> usize {
3260    let mut bytes = 96usize;
3261    for column in &meta.columns {
3262        let Some(value) = values.get(&column.name) else {
3263            continue;
3264        };
3265        if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
3266            continue;
3267        }
3268        bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
3269    }
3270    bytes
3271}
3272
3273fn estimate_vector_bytes(vector: &[f32]) -> usize {
3274    24 + vector.len().saturating_mul(std::mem::size_of::<f32>())
3275}
3276
3277fn estimate_edge_bytes(
3278    source: NodeId,
3279    target: NodeId,
3280    edge_type: &str,
3281    properties: &HashMap<String, Value>,
3282) -> usize {
3283    AdjEntry {
3284        source,
3285        target,
3286        edge_type: edge_type.to_string(),
3287        properties: properties.clone(),
3288        created_tx: 0,
3289        deleted_tx: None,
3290        lsn: 0,
3291    }
3292    .estimated_bytes()
3293}
3294
3295fn estimate_hnsw_index_bytes(entry_count: usize, dimension: usize) -> usize {
3296    entry_count
3297        .saturating_mul(dimension)
3298        .saturating_mul(std::mem::size_of::<f32>())
3299        .saturating_mul(3)
3300}
3301
3302impl Drop for Database {
3303    fn drop(&mut self) {
3304        if self.closed.swap(true, Ordering::SeqCst) {
3305            return;
3306        }
3307        let runtime = self.pruning_runtime.get_mut();
3308        runtime.shutdown.store(true, Ordering::SeqCst);
3309        if let Some(handle) = runtime.handle.take() {
3310            let _ = handle.join();
3311        }
3312        self.subscriptions.lock().subscribers.clear();
3313        if let Some(persistence) = &self.persistence {
3314            persistence.close();
3315        }
3316    }
3317}
3318
3319fn sleep_with_shutdown(shutdown: &AtomicBool, interval: Duration) {
3320    let deadline = Instant::now() + interval;
3321    while !shutdown.load(Ordering::SeqCst) {
3322        let now = Instant::now();
3323        if now >= deadline {
3324            break;
3325        }
3326        let remaining = deadline.saturating_duration_since(now);
3327        thread::sleep(remaining.min(Duration::from_millis(50)));
3328    }
3329}
3330
3331fn prune_expired_rows(
3332    relational_store: &Arc<RelationalStore>,
3333    graph_store: &Arc<GraphStore>,
3334    vector_store: &Arc<VectorStore>,
3335    accountant: &MemoryAccountant,
3336    persistence: Option<&Arc<RedbPersistence>>,
3337    sync_watermark: u64,
3338) -> u64 {
3339    let now_millis = current_epoch_millis();
3340    let metas = relational_store.table_meta.read().clone();
3341    let mut pruned_by_table: HashMap<String, Vec<RowId>> = HashMap::new();
3342    let mut pruned_node_ids = HashSet::new();
3343    let mut released_row_bytes = 0usize;
3344
3345    {
3346        let mut tables = relational_store.tables.write();
3347        for (table_name, rows) in tables.iter_mut() {
3348            let Some(meta) = metas.get(table_name) else {
3349                continue;
3350            };
3351            if meta.default_ttl_seconds.is_none() {
3352                continue;
3353            }
3354
3355            rows.retain(|row| {
3356                if !row_is_prunable(row, meta, now_millis, sync_watermark) {
3357                    return true;
3358                }
3359
3360                pruned_by_table
3361                    .entry(table_name.clone())
3362                    .or_default()
3363                    .push(row.row_id);
3364                released_row_bytes = released_row_bytes
3365                    .saturating_add(estimate_row_bytes_for_meta(&row.values, meta, false));
3366                if let Some(Value::Uuid(id)) = row.values.get("id") {
3367                    pruned_node_ids.insert(*id);
3368                }
3369                false
3370            });
3371        }
3372    }
3373
3374    let pruned_row_ids: HashSet<RowId> = pruned_by_table
3375        .values()
3376        .flat_map(|rows| rows.iter().copied())
3377        .collect();
3378    if pruned_row_ids.is_empty() {
3379        return 0;
3380    }
3381
3382    let mut released_vector_bytes = 0usize;
3383    {
3384        let mut vectors = vector_store.vectors.write();
3385        vectors.retain(|entry| {
3386            if pruned_row_ids.contains(&entry.row_id) {
3387                released_vector_bytes =
3388                    released_vector_bytes.saturating_add(entry.estimated_bytes());
3389                false
3390            } else {
3391                true
3392            }
3393        });
3394    }
3395    if let Some(hnsw) = vector_store.hnsw.get() {
3396        *hnsw.write() = None;
3397    }
3398
3399    let mut released_edge_bytes = 0usize;
3400    {
3401        let mut forward = graph_store.forward_adj.write();
3402        for entries in forward.values_mut() {
3403            entries.retain(|entry| {
3404                if pruned_node_ids.contains(&entry.source)
3405                    || pruned_node_ids.contains(&entry.target)
3406                {
3407                    released_edge_bytes =
3408                        released_edge_bytes.saturating_add(entry.estimated_bytes());
3409                    false
3410                } else {
3411                    true
3412                }
3413            });
3414        }
3415        forward.retain(|_, entries| !entries.is_empty());
3416    }
3417    {
3418        let mut reverse = graph_store.reverse_adj.write();
3419        for entries in reverse.values_mut() {
3420            entries.retain(|entry| {
3421                !pruned_node_ids.contains(&entry.source) && !pruned_node_ids.contains(&entry.target)
3422            });
3423        }
3424        reverse.retain(|_, entries| !entries.is_empty());
3425    }
3426
3427    if let Some(persistence) = persistence {
3428        for table_name in pruned_by_table.keys() {
3429            let rows = relational_store
3430                .tables
3431                .read()
3432                .get(table_name)
3433                .cloned()
3434                .unwrap_or_default();
3435            let _ = persistence.rewrite_table_rows(table_name, &rows);
3436        }
3437
3438        let vectors = vector_store.vectors.read().clone();
3439        let edges = graph_store
3440            .forward_adj
3441            .read()
3442            .values()
3443            .flat_map(|entries| entries.iter().cloned())
3444            .collect::<Vec<_>>();
3445        let _ = persistence.rewrite_vectors(&vectors);
3446        let _ = persistence.rewrite_graph_edges(&edges);
3447    }
3448
3449    accountant.release(
3450        released_row_bytes
3451            .saturating_add(released_vector_bytes)
3452            .saturating_add(released_edge_bytes),
3453    );
3454
3455    pruned_row_ids.len() as u64
3456}
3457
3458fn row_is_prunable(
3459    row: &VersionedRow,
3460    meta: &TableMeta,
3461    now_millis: u64,
3462    sync_watermark: u64,
3463) -> bool {
3464    if meta.sync_safe && row.lsn >= sync_watermark {
3465        return false;
3466    }
3467
3468    let Some(default_ttl_seconds) = meta.default_ttl_seconds else {
3469        return false;
3470    };
3471
3472    if let Some(expires_column) = &meta.expires_column {
3473        match row.values.get(expires_column) {
3474            Some(Value::Timestamp(millis)) if *millis == i64::MAX => return false,
3475            Some(Value::Timestamp(millis)) if *millis < 0 => return true,
3476            Some(Value::Timestamp(millis)) => return (*millis as u64) <= now_millis,
3477            Some(Value::Null) | None => {}
3478            Some(_) => {}
3479        }
3480    }
3481
3482    let ttl_millis = default_ttl_seconds.saturating_mul(1000);
3483    row.created_at
3484        .map(|created_at| now_millis.saturating_sub(created_at) > ttl_millis)
3485        .unwrap_or(false)
3486}
3487
3488fn current_epoch_millis() -> u64 {
3489    std::time::SystemTime::now()
3490        .duration_since(std::time::UNIX_EPOCH)
3491        .unwrap_or_default()
3492        .as_millis() as u64
3493}
3494
3495fn max_tx_across_all(
3496    relational: &RelationalStore,
3497    graph: &GraphStore,
3498    vector: &VectorStore,
3499) -> TxId {
3500    let relational_max = relational
3501        .tables
3502        .read()
3503        .values()
3504        .flat_map(|rows| rows.iter())
3505        .flat_map(|row| std::iter::once(row.created_tx).chain(row.deleted_tx))
3506        .max()
3507        .unwrap_or(0);
3508    let graph_max = graph
3509        .forward_adj
3510        .read()
3511        .values()
3512        .flat_map(|entries| entries.iter())
3513        .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
3514        .max()
3515        .unwrap_or(0);
3516    let vector_max = vector
3517        .vectors
3518        .read()
3519        .iter()
3520        .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
3521        .max()
3522        .unwrap_or(0);
3523
3524    relational_max.max(graph_max).max(vector_max)
3525}
3526
3527fn max_lsn_across_all(
3528    relational: &RelationalStore,
3529    graph: &GraphStore,
3530    vector: &VectorStore,
3531) -> u64 {
3532    let relational_max = relational
3533        .tables
3534        .read()
3535        .values()
3536        .flat_map(|rows| rows.iter().map(|row| row.lsn))
3537        .max()
3538        .unwrap_or(0);
3539    let graph_max = graph
3540        .forward_adj
3541        .read()
3542        .values()
3543        .flat_map(|entries| entries.iter().map(|entry| entry.lsn))
3544        .max()
3545        .unwrap_or(0);
3546    let vector_max = vector
3547        .vectors
3548        .read()
3549        .iter()
3550        .map(|entry| entry.lsn)
3551        .max()
3552        .unwrap_or(0);
3553
3554    relational_max.max(graph_max).max(vector_max)
3555}
3556
3557fn persisted_memory_limit(path: &Path) -> Result<Option<usize>> {
3558    if !path.exists() {
3559        return Ok(None);
3560    }
3561    let persistence = RedbPersistence::open(path)?;
3562    let limit = persistence.load_config_value::<usize>("memory_limit")?;
3563    persistence.close();
3564    Ok(limit)
3565}
3566
3567fn is_fatal_sync_apply_error(err: &Error) -> bool {
3568    matches!(
3569        err,
3570        Error::MemoryBudgetExceeded { .. } | Error::DiskBudgetExceeded { .. }
3571    )
3572}
3573
3574fn ddl_change_from_create_table(ct: &CreateTable) -> DdlChange {
3575    DdlChange::CreateTable {
3576        name: ct.name.clone(),
3577        columns: ct
3578            .columns
3579            .iter()
3580            .map(|col| {
3581                (
3582                    col.name.clone(),
3583                    sql_type_for_ast_column(col, &ct.propagation_rules),
3584                )
3585            })
3586            .collect(),
3587        constraints: create_table_constraints_from_ast(ct),
3588    }
3589}
3590
3591fn ddl_change_from_meta(name: &str, meta: &TableMeta) -> DdlChange {
3592    DdlChange::CreateTable {
3593        name: name.to_string(),
3594        columns: meta
3595            .columns
3596            .iter()
3597            .map(|col| {
3598                (
3599                    col.name.clone(),
3600                    sql_type_for_meta_column(col, &meta.propagation_rules),
3601                )
3602            })
3603            .collect(),
3604        constraints: create_table_constraints_from_meta(meta),
3605    }
3606}
3607
3608fn sql_type_for_ast(data_type: &DataType) -> String {
3609    match data_type {
3610        DataType::Uuid => "UUID".to_string(),
3611        DataType::Text => "TEXT".to_string(),
3612        DataType::Integer => "INTEGER".to_string(),
3613        DataType::Real => "REAL".to_string(),
3614        DataType::Boolean => "BOOLEAN".to_string(),
3615        DataType::Timestamp => "TIMESTAMP".to_string(),
3616        DataType::Json => "JSON".to_string(),
3617        DataType::Vector(dim) => format!("VECTOR({dim})"),
3618    }
3619}
3620
3621fn sql_type_for_ast_column(
3622    col: &contextdb_parser::ast::ColumnDef,
3623    _rules: &[contextdb_parser::ast::AstPropagationRule],
3624) -> String {
3625    let mut ty = sql_type_for_ast(&col.data_type);
3626    if let Some(reference) = &col.references {
3627        ty.push_str(&format!(
3628            " REFERENCES {}({})",
3629            reference.table, reference.column
3630        ));
3631        for rule in &reference.propagation_rules {
3632            if let contextdb_parser::ast::AstPropagationRule::FkState {
3633                trigger_state,
3634                target_state,
3635                max_depth,
3636                abort_on_failure,
3637            } = rule
3638            {
3639                ty.push_str(&format!(
3640                    " ON STATE {} PROPAGATE SET {}",
3641                    trigger_state, target_state
3642                ));
3643                if max_depth.unwrap_or(10) != 10 {
3644                    ty.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
3645                }
3646                if *abort_on_failure {
3647                    ty.push_str(" ABORT ON FAILURE");
3648                }
3649            }
3650        }
3651    }
3652    if col.primary_key {
3653        ty.push_str(" PRIMARY KEY");
3654    }
3655    if !col.nullable && !col.primary_key {
3656        ty.push_str(" NOT NULL");
3657    }
3658    if col.unique {
3659        ty.push_str(" UNIQUE");
3660    }
3661    ty
3662}
3663
3664fn sql_type_for_meta_column(col: &contextdb_core::ColumnDef, rules: &[PropagationRule]) -> String {
3665    let mut ty = match col.column_type {
3666        ColumnType::Integer => "INTEGER".to_string(),
3667        ColumnType::Real => "REAL".to_string(),
3668        ColumnType::Text => "TEXT".to_string(),
3669        ColumnType::Boolean => "BOOLEAN".to_string(),
3670        ColumnType::Json => "JSON".to_string(),
3671        ColumnType::Uuid => "UUID".to_string(),
3672        ColumnType::Vector(dim) => format!("VECTOR({dim})"),
3673        ColumnType::Timestamp => "TIMESTAMP".to_string(),
3674    };
3675
3676    let fk_rules = rules
3677        .iter()
3678        .filter_map(|rule| match rule {
3679            PropagationRule::ForeignKey {
3680                fk_column,
3681                referenced_table,
3682                referenced_column,
3683                trigger_state,
3684                target_state,
3685                max_depth,
3686                abort_on_failure,
3687            } if fk_column == &col.name => Some((
3688                referenced_table,
3689                referenced_column,
3690                trigger_state,
3691                target_state,
3692                *max_depth,
3693                *abort_on_failure,
3694            )),
3695            _ => None,
3696        })
3697        .collect::<Vec<_>>();
3698
3699    if let Some(reference) = &col.references {
3700        ty.push_str(&format!(
3701            " REFERENCES {}({})",
3702            reference.table, reference.column
3703        ));
3704    } else if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
3705        ty.push_str(&format!(
3706            " REFERENCES {}({})",
3707            referenced_table, referenced_column
3708        ));
3709    }
3710
3711    if col.references.is_some() || !fk_rules.is_empty() {
3712        for (_, _, trigger_state, target_state, max_depth, abort_on_failure) in fk_rules {
3713            ty.push_str(&format!(
3714                " ON STATE {} PROPAGATE SET {}",
3715                trigger_state, target_state
3716            ));
3717            if max_depth != 10 {
3718                ty.push_str(&format!(" MAX DEPTH {max_depth}"));
3719            }
3720            if abort_on_failure {
3721                ty.push_str(" ABORT ON FAILURE");
3722            }
3723        }
3724    }
3725    if col.primary_key {
3726        ty.push_str(" PRIMARY KEY");
3727    }
3728    if !col.nullable && !col.primary_key {
3729        ty.push_str(" NOT NULL");
3730    }
3731    if col.unique {
3732        ty.push_str(" UNIQUE");
3733    }
3734    if col.expires {
3735        ty.push_str(" EXPIRES");
3736    }
3737
3738    ty
3739}
3740
3741fn create_table_constraints_from_ast(ct: &CreateTable) -> Vec<String> {
3742    let mut constraints = Vec::new();
3743
3744    if ct.immutable {
3745        constraints.push("IMMUTABLE".to_string());
3746    }
3747
3748    if let Some(sm) = &ct.state_machine {
3749        let transitions = sm
3750            .transitions
3751            .iter()
3752            .map(|(from, tos)| format!("{from} -> [{}]", tos.join(", ")))
3753            .collect::<Vec<_>>()
3754            .join(", ");
3755        constraints.push(format!("STATE MACHINE ({}: {})", sm.column, transitions));
3756    }
3757
3758    if !ct.dag_edge_types.is_empty() {
3759        let edge_types = ct
3760            .dag_edge_types
3761            .iter()
3762            .map(|edge_type| format!("'{edge_type}'"))
3763            .collect::<Vec<_>>()
3764            .join(", ");
3765        constraints.push(format!("DAG({edge_types})"));
3766    }
3767
3768    if let Some(retain) = &ct.retain {
3769        let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(retain.duration_seconds));
3770        if retain.sync_safe {
3771            clause.push_str(" SYNC SAFE");
3772        }
3773        constraints.push(clause);
3774    }
3775
3776    for unique_constraint in &ct.unique_constraints {
3777        constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
3778    }
3779
3780    for rule in &ct.propagation_rules {
3781        match rule {
3782            contextdb_parser::ast::AstPropagationRule::EdgeState {
3783                edge_type,
3784                direction,
3785                trigger_state,
3786                target_state,
3787                max_depth,
3788                abort_on_failure,
3789            } => {
3790                let mut clause = format!(
3791                    "PROPAGATE ON EDGE {} {} STATE {} SET {}",
3792                    edge_type, direction, trigger_state, target_state
3793                );
3794                if max_depth.unwrap_or(10) != 10 {
3795                    clause.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
3796                }
3797                if *abort_on_failure {
3798                    clause.push_str(" ABORT ON FAILURE");
3799                }
3800                constraints.push(clause);
3801            }
3802            contextdb_parser::ast::AstPropagationRule::VectorExclusion { trigger_state } => {
3803                constraints.push(format!(
3804                    "PROPAGATE ON STATE {} EXCLUDE VECTOR",
3805                    trigger_state
3806                ));
3807            }
3808            contextdb_parser::ast::AstPropagationRule::FkState { .. } => {}
3809        }
3810    }
3811
3812    constraints
3813}
3814
3815fn create_table_constraints_from_meta(meta: &TableMeta) -> Vec<String> {
3816    let mut constraints = Vec::new();
3817
3818    if meta.immutable {
3819        constraints.push("IMMUTABLE".to_string());
3820    }
3821
3822    if let Some(sm) = &meta.state_machine {
3823        let states = sm
3824            .transitions
3825            .iter()
3826            .map(|(from, to)| format!("{from} -> [{}]", to.join(", ")))
3827            .collect::<Vec<_>>()
3828            .join(", ");
3829        constraints.push(format!("STATE MACHINE ({}: {})", sm.column, states));
3830    }
3831
3832    if !meta.dag_edge_types.is_empty() {
3833        let edge_types = meta
3834            .dag_edge_types
3835            .iter()
3836            .map(|edge_type| format!("'{edge_type}'"))
3837            .collect::<Vec<_>>()
3838            .join(", ");
3839        constraints.push(format!("DAG({edge_types})"));
3840    }
3841
3842    if let Some(ttl_seconds) = meta.default_ttl_seconds {
3843        let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(ttl_seconds));
3844        if meta.sync_safe {
3845            clause.push_str(" SYNC SAFE");
3846        }
3847        constraints.push(clause);
3848    }
3849
3850    for unique_constraint in &meta.unique_constraints {
3851        constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
3852    }
3853
3854    for rule in &meta.propagation_rules {
3855        match rule {
3856            PropagationRule::Edge {
3857                edge_type,
3858                direction,
3859                trigger_state,
3860                target_state,
3861                max_depth,
3862                abort_on_failure,
3863            } => {
3864                let dir = match direction {
3865                    Direction::Incoming => "INCOMING",
3866                    Direction::Outgoing => "OUTGOING",
3867                    Direction::Both => "BOTH",
3868                };
3869                let mut clause = format!(
3870                    "PROPAGATE ON EDGE {} {} STATE {} SET {}",
3871                    edge_type, dir, trigger_state, target_state
3872                );
3873                if *max_depth != 10 {
3874                    clause.push_str(&format!(" MAX DEPTH {max_depth}"));
3875                }
3876                if *abort_on_failure {
3877                    clause.push_str(" ABORT ON FAILURE");
3878                }
3879                constraints.push(clause);
3880            }
3881            PropagationRule::VectorExclusion { trigger_state } => {
3882                constraints.push(format!(
3883                    "PROPAGATE ON STATE {} EXCLUDE VECTOR",
3884                    trigger_state
3885                ));
3886            }
3887            PropagationRule::ForeignKey { .. } => {}
3888        }
3889    }
3890
3891    constraints
3892}
3893
3894fn ttl_seconds_to_sql(seconds: u64) -> String {
3895    if seconds.is_multiple_of(24 * 60 * 60) {
3896        format!("{} DAYS", seconds / (24 * 60 * 60))
3897    } else if seconds.is_multiple_of(60 * 60) {
3898        format!("{} HOURS", seconds / (60 * 60))
3899    } else if seconds.is_multiple_of(60) {
3900        format!("{} MINUTES", seconds / 60)
3901    } else {
3902        format!("{seconds} SECONDS")
3903    }
3904}