Skip to main content

contextdb_engine/
database.rs

1use crate::composite_store::{ChangeLogEntry, CompositeStore};
2use crate::executor::execute_plan;
3use crate::persistence::RedbPersistence;
4use crate::persistent_store::PersistentCompositeStore;
5use crate::plugin::{
6    CommitEvent, CommitSource, CorePlugin, DatabasePlugin, PluginHealth, QueryOutcome,
7    SubscriptionMetrics,
8};
9use crate::schema_enforcer::validate_dml;
10use crate::sync_types::{
11    ApplyResult, ChangeSet, Conflict, ConflictPolicies, ConflictPolicy, DdlChange, EdgeChange,
12    NaturalKey, RowChange, VectorChange,
13};
14use contextdb_core::*;
15use contextdb_graph::{GraphStore, MemGraphExecutor};
16use contextdb_parser::Statement;
17use contextdb_parser::ast::{AlterAction, CreateTable, DataType};
18use contextdb_planner::PhysicalPlan;
19use contextdb_relational::{MemRelationalExecutor, RelationalStore};
20use contextdb_tx::{TxManager, WriteSetApplicator};
21use contextdb_vector::{HnswIndex, MemVectorExecutor, VectorStore};
22use parking_lot::{Mutex, RwLock};
23use roaring::RoaringTreemap;
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::path::Path;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
28use std::sync::{Arc, OnceLock};
29use std::thread::{self, JoinHandle};
30use std::time::{Duration, Instant};
31
32type DynStore = Box<dyn WriteSetApplicator>;
33const DEFAULT_SUBSCRIPTION_CAPACITY: usize = 64;
34
35#[derive(Debug, Clone)]
36pub struct QueryResult {
37    pub columns: Vec<String>,
38    pub rows: Vec<Vec<Value>>,
39    pub rows_affected: u64,
40}
41
42impl QueryResult {
43    pub fn empty() -> Self {
44        Self {
45            columns: vec![],
46            rows: vec![],
47            rows_affected: 0,
48        }
49    }
50
51    pub fn empty_with_affected(rows_affected: u64) -> Self {
52        Self {
53            columns: vec![],
54            rows: vec![],
55            rows_affected,
56        }
57    }
58}
59
60pub struct Database {
61    tx_mgr: Arc<TxManager<DynStore>>,
62    relational_store: Arc<RelationalStore>,
63    graph_store: Arc<GraphStore>,
64    vector_store: Arc<VectorStore>,
65    change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
66    ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
67    persistence: Option<Arc<RedbPersistence>>,
68    relational: MemRelationalExecutor<DynStore>,
69    graph: MemGraphExecutor<DynStore>,
70    vector: MemVectorExecutor<DynStore>,
71    session_tx: Mutex<Option<TxId>>,
72    instance_id: uuid::Uuid,
73    plugin: Arc<dyn DatabasePlugin>,
74    accountant: Arc<MemoryAccountant>,
75    conflict_policies: RwLock<ConflictPolicies>,
76    subscriptions: Mutex<SubscriptionState>,
77    pruning_runtime: Mutex<PruningRuntime>,
78    pruning_guard: Arc<Mutex<()>>,
79    disk_limit: AtomicU64,
80    disk_limit_startup_ceiling: AtomicU64,
81    sync_watermark: Arc<AtomicU64>,
82    closed: AtomicBool,
83}
84
85impl std::fmt::Debug for Database {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct("Database")
88            .field("instance_id", &self.instance_id)
89            .finish_non_exhaustive()
90    }
91}
92
93#[derive(Debug, Clone)]
94struct PropagationQueueEntry {
95    table: String,
96    uuid: uuid::Uuid,
97    target_state: String,
98    depth: u32,
99    abort_on_failure: bool,
100}
101
102#[derive(Debug, Clone, Copy)]
103struct PropagationSource<'a> {
104    table: &'a str,
105    uuid: uuid::Uuid,
106    state: &'a str,
107    depth: u32,
108}
109
110#[derive(Debug, Clone, Copy)]
111struct PropagationContext<'a> {
112    tx: TxId,
113    snapshot: SnapshotId,
114    metas: &'a HashMap<String, TableMeta>,
115}
116
117#[derive(Debug)]
118struct SubscriptionState {
119    subscribers: Vec<SyncSender<CommitEvent>>,
120    events_sent: u64,
121    events_dropped: u64,
122}
123
124impl SubscriptionState {
125    fn new() -> Self {
126        Self {
127            subscribers: Vec::new(),
128            events_sent: 0,
129            events_dropped: 0,
130        }
131    }
132}
133
134#[derive(Debug)]
135struct PruningRuntime {
136    shutdown: Arc<AtomicBool>,
137    handle: Option<JoinHandle<()>>,
138}
139
140impl PruningRuntime {
141    fn new() -> Self {
142        Self {
143            shutdown: Arc::new(AtomicBool::new(false)),
144            handle: None,
145        }
146    }
147}
148
149impl Database {
150    #[allow(clippy::too_many_arguments)]
151    fn build_db(
152        tx_mgr: Arc<TxManager<DynStore>>,
153        relational: Arc<RelationalStore>,
154        graph: Arc<GraphStore>,
155        vector_store: Arc<VectorStore>,
156        hnsw: Arc<OnceLock<parking_lot::RwLock<Option<HnswIndex>>>>,
157        change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
158        ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
159        persistence: Option<Arc<RedbPersistence>>,
160        plugin: Arc<dyn DatabasePlugin>,
161        accountant: Arc<MemoryAccountant>,
162        disk_limit: Option<u64>,
163        disk_limit_startup_ceiling: Option<u64>,
164    ) -> Self {
165        Self {
166            tx_mgr: tx_mgr.clone(),
167            relational_store: relational.clone(),
168            graph_store: graph.clone(),
169            vector_store: vector_store.clone(),
170            change_log,
171            ddl_log,
172            persistence,
173            relational: MemRelationalExecutor::new(relational, tx_mgr.clone()),
174            graph: MemGraphExecutor::new(graph, tx_mgr.clone()),
175            vector: MemVectorExecutor::new_with_accountant(
176                vector_store,
177                tx_mgr.clone(),
178                hnsw,
179                accountant.clone(),
180            ),
181            session_tx: Mutex::new(None),
182            instance_id: uuid::Uuid::new_v4(),
183            plugin,
184            accountant,
185            conflict_policies: RwLock::new(ConflictPolicies::uniform(ConflictPolicy::LatestWins)),
186            subscriptions: Mutex::new(SubscriptionState::new()),
187            pruning_runtime: Mutex::new(PruningRuntime::new()),
188            pruning_guard: Arc::new(Mutex::new(())),
189            disk_limit: AtomicU64::new(disk_limit.unwrap_or(0)),
190            disk_limit_startup_ceiling: AtomicU64::new(disk_limit_startup_ceiling.unwrap_or(0)),
191            sync_watermark: Arc::new(AtomicU64::new(0)),
192            closed: AtomicBool::new(false),
193        }
194    }
195
196    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
197        Self::open_with_config(
198            path,
199            Arc::new(CorePlugin),
200            Arc::new(MemoryAccountant::no_limit()),
201        )
202    }
203
204    pub fn open_memory() -> Self {
205        Self::open_memory_with_plugin_and_accountant(
206            Arc::new(CorePlugin),
207            Arc::new(MemoryAccountant::no_limit()),
208        )
209        .expect("failed to open in-memory database")
210    }
211
212    fn open_loaded(
213        path: impl AsRef<Path>,
214        plugin: Arc<dyn DatabasePlugin>,
215        mut accountant: Arc<MemoryAccountant>,
216        startup_disk_limit: Option<u64>,
217    ) -> Result<Self> {
218        let path = path.as_ref();
219        let persistence = if path.exists() {
220            Arc::new(RedbPersistence::open(path)?)
221        } else {
222            Arc::new(RedbPersistence::create(path)?)
223        };
224        if accountant.usage().limit.is_none()
225            && let Some(limit) = persistence.load_config_value::<usize>("memory_limit")?
226        {
227            accountant = Arc::new(MemoryAccountant::with_budget(limit));
228        }
229        let persisted_disk_limit = persistence.load_config_value::<u64>("disk_limit")?;
230        let startup_disk_ceiling = startup_disk_limit;
231        let effective_disk_limit = match (persisted_disk_limit, startup_disk_limit) {
232            (Some(persisted), Some(ceiling)) => Some(persisted.min(ceiling)),
233            (Some(persisted), None) => Some(persisted),
234            (None, Some(ceiling)) => Some(ceiling),
235            (None, None) => None,
236        };
237
238        let all_meta = persistence.load_all_table_meta()?;
239
240        let relational = Arc::new(RelationalStore::new());
241        for (name, meta) in &all_meta {
242            relational.create_table(name, meta.clone());
243            for row in persistence.load_relational_table(name)? {
244                relational.insert_loaded_row(name, row);
245            }
246        }
247
248        let graph = Arc::new(GraphStore::new());
249        for edge in persistence.load_forward_edges()? {
250            graph.insert_loaded_edge(edge);
251        }
252
253        let hnsw = Arc::new(OnceLock::new());
254        let vector = Arc::new(VectorStore::new(hnsw.clone()));
255        for meta in all_meta.values() {
256            for column in &meta.columns {
257                if let ColumnType::Vector(dimension) = column.column_type {
258                    vector.set_dimension(dimension);
259                    break;
260                }
261            }
262        }
263        for entry in persistence.load_vectors()? {
264            vector.insert_loaded_vector(entry);
265        }
266
267        let max_row_id = relational.max_row_id();
268        let max_tx = max_tx_across_all(&relational, &graph, &vector);
269        let max_lsn = max_lsn_across_all(&relational, &graph, &vector);
270        relational.set_next_row_id(max_row_id.saturating_add(1));
271
272        let change_log = Arc::new(RwLock::new(persistence.load_change_log()?));
273        let ddl_log = Arc::new(RwLock::new(persistence.load_ddl_log()?));
274        let composite = CompositeStore::new(
275            relational.clone(),
276            graph.clone(),
277            vector.clone(),
278            change_log.clone(),
279            ddl_log.clone(),
280        );
281        let persistent = PersistentCompositeStore::new(composite, persistence.clone());
282        let store: DynStore = Box::new(persistent);
283        let tx_mgr = Arc::new(TxManager::new_with_counters(
284            store,
285            max_tx.saturating_add(1),
286            max_lsn.saturating_add(1),
287            max_tx,
288        ));
289
290        let db = Self::build_db(
291            tx_mgr,
292            relational,
293            graph,
294            vector,
295            hnsw,
296            change_log,
297            ddl_log,
298            Some(persistence),
299            plugin,
300            accountant,
301            effective_disk_limit,
302            startup_disk_ceiling,
303        );
304
305        for meta in all_meta.values() {
306            if !meta.dag_edge_types.is_empty() {
307                db.graph.register_dag_edge_types(&meta.dag_edge_types);
308            }
309        }
310
311        db.account_loaded_state()?;
312        maybe_prebuild_hnsw(&db.vector_store, db.accountant());
313
314        Ok(db)
315    }
316
317    fn open_memory_internal(
318        plugin: Arc<dyn DatabasePlugin>,
319        accountant: Arc<MemoryAccountant>,
320    ) -> Result<Self> {
321        let relational = Arc::new(RelationalStore::new());
322        let graph = Arc::new(GraphStore::new());
323        let hnsw = Arc::new(OnceLock::new());
324        let vector = Arc::new(VectorStore::new(hnsw.clone()));
325        let change_log = Arc::new(RwLock::new(Vec::new()));
326        let ddl_log = Arc::new(RwLock::new(Vec::new()));
327        let store: DynStore = Box::new(CompositeStore::new(
328            relational.clone(),
329            graph.clone(),
330            vector.clone(),
331            change_log.clone(),
332            ddl_log.clone(),
333        ));
334        let tx_mgr = Arc::new(TxManager::new(store));
335
336        let db = Self::build_db(
337            tx_mgr, relational, graph, vector, hnsw, change_log, ddl_log, None, plugin, accountant,
338            None, None,
339        );
340        maybe_prebuild_hnsw(&db.vector_store, db.accountant());
341        Ok(db)
342    }
343
344    pub fn begin(&self) -> TxId {
345        self.tx_mgr.begin()
346    }
347
348    pub fn commit(&self, tx: TxId) -> Result<()> {
349        self.commit_with_source(tx, CommitSource::User)
350    }
351
352    pub fn rollback(&self, tx: TxId) -> Result<()> {
353        let ws = self.tx_mgr.cloned_write_set(tx)?;
354        self.release_insert_allocations(&ws);
355        self.tx_mgr.rollback(tx)
356    }
357
358    pub fn snapshot(&self) -> SnapshotId {
359        self.tx_mgr.snapshot()
360    }
361
362    pub fn execute(&self, sql: &str, params: &HashMap<String, Value>) -> Result<QueryResult> {
363        let stmt = contextdb_parser::parse(sql)?;
364
365        match &stmt {
366            Statement::Begin => {
367                let mut session = self.session_tx.lock();
368                if session.is_none() {
369                    *session = Some(self.begin());
370                }
371                return Ok(QueryResult::empty());
372            }
373            Statement::Commit => {
374                let mut session = self.session_tx.lock();
375                if let Some(tx) = session.take() {
376                    self.commit_with_source(tx, CommitSource::User)?;
377                }
378                return Ok(QueryResult::empty());
379            }
380            Statement::Rollback => {
381                let mut session = self.session_tx.lock();
382                if let Some(tx) = *session {
383                    self.rollback(tx)?;
384                    *session = None;
385                }
386                return Ok(QueryResult::empty());
387            }
388            _ => {}
389        }
390
391        let active_tx = *self.session_tx.lock();
392        self.execute_statement(&stmt, sql, params, active_tx)
393    }
394
395    fn execute_autocommit(
396        &self,
397        plan: &PhysicalPlan,
398        params: &HashMap<String, Value>,
399    ) -> Result<QueryResult> {
400        match plan {
401            PhysicalPlan::Insert(_) | PhysicalPlan::Delete(_) | PhysicalPlan::Update(_) => {
402                let tx = self.begin();
403                let result = execute_plan(self, plan, params, Some(tx));
404                match result {
405                    Ok(qr) => {
406                        self.commit_with_source(tx, CommitSource::AutoCommit)?;
407                        Ok(qr)
408                    }
409                    Err(e) => {
410                        let _ = self.rollback(tx);
411                        Err(e)
412                    }
413                }
414            }
415            _ => execute_plan(self, plan, params, None),
416        }
417    }
418
419    pub fn explain(&self, sql: &str) -> Result<String> {
420        let stmt = contextdb_parser::parse(sql)?;
421        let plan = contextdb_planner::plan(&stmt)?;
422        if self.vector_store.vector_count() >= 1000 && !self.vector_store.has_hnsw_index() {
423            maybe_prebuild_hnsw(&self.vector_store, self.accountant());
424        }
425        let mut output = plan.explain();
426        if self.vector_store.has_hnsw_index() {
427            output = output.replace("VectorSearch(", "HNSWSearch(");
428            output = output.replace("VectorSearch {", "HNSWSearch {");
429        } else {
430            output = output.replace("VectorSearch(", "VectorSearch(strategy=BruteForce, ");
431            output = output.replace("VectorSearch {", "VectorSearch { strategy: BruteForce,");
432        }
433        Ok(output)
434    }
435
436    pub fn execute_in_tx(
437        &self,
438        tx: TxId,
439        sql: &str,
440        params: &HashMap<String, Value>,
441    ) -> Result<QueryResult> {
442        let stmt = contextdb_parser::parse(sql)?;
443        self.execute_statement(&stmt, sql, params, Some(tx))
444    }
445
446    fn commit_with_source(&self, tx: TxId, source: CommitSource) -> Result<()> {
447        let mut ws = self.tx_mgr.cloned_write_set(tx)?;
448
449        if !ws.is_empty()
450            && let Err(err) = self.plugin.pre_commit(&ws, source)
451        {
452            let _ = self.rollback(tx);
453            return Err(err);
454        }
455
456        let lsn = self.tx_mgr.commit_with_lsn(tx)?;
457
458        if !ws.is_empty() {
459            self.release_delete_allocations(&ws);
460            ws.stamp_lsn(lsn);
461            self.plugin.post_commit(&ws, source);
462            self.publish_commit_event(Self::build_commit_event(&ws, source, lsn));
463        }
464
465        Ok(())
466    }
467
468    fn build_commit_event(
469        ws: &contextdb_tx::WriteSet,
470        source: CommitSource,
471        lsn: u64,
472    ) -> CommitEvent {
473        let mut tables_changed: Vec<String> = ws
474            .relational_inserts
475            .iter()
476            .map(|(table, _)| table.clone())
477            .chain(
478                ws.relational_deletes
479                    .iter()
480                    .map(|(table, _, _)| table.clone()),
481            )
482            .collect::<HashSet<_>>()
483            .into_iter()
484            .collect();
485        tables_changed.sort();
486
487        CommitEvent {
488            source,
489            lsn,
490            tables_changed,
491            row_count: ws.relational_inserts.len()
492                + ws.relational_deletes.len()
493                + ws.adj_inserts.len()
494                + ws.adj_deletes.len()
495                + ws.vector_inserts.len()
496                + ws.vector_deletes.len(),
497        }
498    }
499
500    fn publish_commit_event(&self, event: CommitEvent) {
501        let mut subscriptions = self.subscriptions.lock();
502        let subscribers = std::mem::take(&mut subscriptions.subscribers);
503        let mut live_subscribers = Vec::with_capacity(subscribers.len());
504
505        for sender in subscribers {
506            match sender.try_send(event.clone()) {
507                Ok(()) => {
508                    subscriptions.events_sent += 1;
509                    live_subscribers.push(sender);
510                }
511                Err(TrySendError::Full(_)) => {
512                    subscriptions.events_dropped += 1;
513                    live_subscribers.push(sender);
514                }
515                Err(TrySendError::Disconnected(_)) => {}
516            }
517        }
518
519        subscriptions.subscribers = live_subscribers;
520    }
521
522    fn stop_pruning_thread(&self) {
523        let handle = {
524            let mut runtime = self.pruning_runtime.lock();
525            runtime.shutdown.store(true, Ordering::SeqCst);
526            let handle = runtime.handle.take();
527            runtime.shutdown = Arc::new(AtomicBool::new(false));
528            handle
529        };
530
531        if let Some(handle) = handle {
532            let _ = handle.join();
533        }
534    }
535
536    fn execute_statement(
537        &self,
538        stmt: &Statement,
539        sql: &str,
540        params: &HashMap<String, Value>,
541        tx: Option<TxId>,
542    ) -> Result<QueryResult> {
543        self.plugin.on_query(sql)?;
544
545        if let Some(change) = self.ddl_change_for_statement(stmt).as_ref() {
546            self.plugin.on_ddl(change)?;
547        }
548
549        // Handle INSERT INTO GRAPH / __edges as a virtual table routing to the graph store.
550        if let Statement::Insert(ins) = stmt
551            && (ins.table.eq_ignore_ascii_case("GRAPH")
552                || ins.table.eq_ignore_ascii_case("__edges"))
553        {
554            return self.execute_graph_insert(ins, params, tx);
555        }
556
557        let started = Instant::now();
558        let result = (|| {
559            // Pre-resolve InSubquery expressions with CTE context before planning
560            let stmt = self.pre_resolve_cte_subqueries(stmt, params, tx)?;
561            let plan = contextdb_planner::plan(&stmt)?;
562            validate_dml(&plan, self, params)?;
563            let result = match tx {
564                Some(tx) => execute_plan(self, &plan, params, Some(tx)),
565                None => self.execute_autocommit(&plan, params),
566            };
567            if result.is_ok()
568                && let Statement::CreateTable(ct) = &stmt
569                && !ct.dag_edge_types.is_empty()
570            {
571                self.graph.register_dag_edge_types(&ct.dag_edge_types);
572            }
573            result
574        })();
575        let duration = started.elapsed();
576        let outcome = query_outcome_from_result(&result);
577        self.plugin.post_query(sql, duration, &outcome);
578        result.map(strip_internal_row_id)
579    }
580
581    /// Handle `INSERT INTO GRAPH (source_id, target_id, edge_type) VALUES (...)`.
582    fn execute_graph_insert(
583        &self,
584        ins: &contextdb_parser::ast::Insert,
585        params: &HashMap<String, Value>,
586        tx: Option<TxId>,
587    ) -> Result<QueryResult> {
588        use crate::executor::resolve_expr;
589
590        let col_index = |name: &str| {
591            ins.columns
592                .iter()
593                .position(|c| c.eq_ignore_ascii_case(name))
594        };
595        let source_idx = col_index("source_id")
596            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires source_id column".into()))?;
597        let target_idx = col_index("target_id")
598            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires target_id column".into()))?;
599        let edge_type_idx = col_index("edge_type")
600            .ok_or_else(|| Error::PlanError("GRAPH INSERT requires edge_type column".into()))?;
601
602        let auto_commit = tx.is_none();
603        let tx = tx.unwrap_or_else(|| self.begin());
604        let mut count = 0u64;
605        for row_exprs in &ins.values {
606            let source = resolve_expr(&row_exprs[source_idx], params)?;
607            let target = resolve_expr(&row_exprs[target_idx], params)?;
608            let edge_type = resolve_expr(&row_exprs[edge_type_idx], params)?;
609
610            let source_uuid = match &source {
611                Value::Uuid(u) => *u,
612                Value::Text(t) => uuid::Uuid::parse_str(t)
613                    .map_err(|e| Error::PlanError(format!("invalid source_id uuid: {e}")))?,
614                _ => return Err(Error::PlanError("source_id must be UUID".into())),
615            };
616            let target_uuid = match &target {
617                Value::Uuid(u) => *u,
618                Value::Text(t) => uuid::Uuid::parse_str(t)
619                    .map_err(|e| Error::PlanError(format!("invalid target_id uuid: {e}")))?,
620                _ => return Err(Error::PlanError("target_id must be UUID".into())),
621            };
622            let edge_type_str = match &edge_type {
623                Value::Text(t) => t.clone(),
624                _ => return Err(Error::PlanError("edge_type must be TEXT".into())),
625            };
626
627            self.insert_edge(
628                tx,
629                source_uuid,
630                target_uuid,
631                edge_type_str,
632                Default::default(),
633            )?;
634            count += 1;
635        }
636
637        if auto_commit {
638            self.commit_with_source(tx, CommitSource::AutoCommit)?;
639        }
640
641        Ok(QueryResult::empty_with_affected(count))
642    }
643
644    fn ddl_change_for_statement(&self, stmt: &Statement) -> Option<DdlChange> {
645        match stmt {
646            Statement::CreateTable(ct) => Some(ddl_change_from_create_table(ct)),
647            Statement::DropTable(dt) => Some(DdlChange::DropTable {
648                name: dt.name.clone(),
649            }),
650            Statement::AlterTable(at) => {
651                let mut meta = self.table_meta(&at.table).unwrap_or_default();
652                // Simulate the alter action on a cloned meta to get post-alteration columns
653                match &at.action {
654                    AlterAction::AddColumn(col) => {
655                        meta.columns.push(contextdb_core::ColumnDef {
656                            name: col.name.clone(),
657                            column_type: crate::executor::map_column_type(&col.data_type),
658                            nullable: col.nullable,
659                            primary_key: col.primary_key,
660                            unique: col.unique,
661                            default: col
662                                .default
663                                .as_ref()
664                                .map(crate::executor::stored_default_expr),
665                            expires: col.expires,
666                        });
667                        if col.expires {
668                            meta.expires_column = Some(col.name.clone());
669                        }
670                    }
671                    AlterAction::DropColumn(name) => {
672                        meta.columns.retain(|c| c.name != *name);
673                        if meta.expires_column.as_deref() == Some(name.as_str()) {
674                            meta.expires_column = None;
675                        }
676                    }
677                    AlterAction::RenameColumn { from, to } => {
678                        if let Some(c) = meta.columns.iter_mut().find(|c| c.name == *from) {
679                            c.name = to.clone();
680                        }
681                        if meta.expires_column.as_deref() == Some(from.as_str()) {
682                            meta.expires_column = Some(to.clone());
683                        }
684                    }
685                    AlterAction::SetRetain {
686                        duration_seconds,
687                        sync_safe,
688                    } => {
689                        meta.default_ttl_seconds = Some(*duration_seconds);
690                        meta.sync_safe = *sync_safe;
691                    }
692                    AlterAction::DropRetain => {
693                        meta.default_ttl_seconds = None;
694                        meta.sync_safe = false;
695                    }
696                    AlterAction::SetSyncConflictPolicy(_) | AlterAction::DropSyncConflictPolicy => { /* handled in executor */
697                    }
698                }
699                Some(DdlChange::AlterTable {
700                    name: at.table.clone(),
701                    columns: meta
702                        .columns
703                        .iter()
704                        .map(|c| {
705                            (
706                                c.name.clone(),
707                                sql_type_for_meta_column(c, &meta.propagation_rules),
708                            )
709                        })
710                        .collect(),
711                    constraints: create_table_constraints_from_meta(&meta),
712                })
713            }
714            _ => None,
715        }
716    }
717
718    /// Pre-resolve InSubquery expressions within SELECT statements that have CTEs.
719    /// This allows CTE-backed subqueries in WHERE clauses to be evaluated before planning.
720    fn pre_resolve_cte_subqueries(
721        &self,
722        stmt: &Statement,
723        params: &HashMap<String, Value>,
724        tx: Option<TxId>,
725    ) -> Result<Statement> {
726        if let Statement::Select(sel) = stmt
727            && !sel.ctes.is_empty()
728            && sel.body.where_clause.is_some()
729        {
730            use crate::executor::resolve_in_subqueries_with_ctes;
731            let resolved_where = sel
732                .body
733                .where_clause
734                .as_ref()
735                .map(|expr| resolve_in_subqueries_with_ctes(self, expr, params, tx, &sel.ctes))
736                .transpose()?;
737            let mut new_body = sel.body.clone();
738            new_body.where_clause = resolved_where;
739            Ok(Statement::Select(contextdb_parser::ast::SelectStatement {
740                ctes: sel.ctes.clone(),
741                body: new_body,
742            }))
743        } else {
744            Ok(stmt.clone())
745        }
746    }
747
748    pub fn insert_row(
749        &self,
750        tx: TxId,
751        table: &str,
752        values: HashMap<ColName, Value>,
753    ) -> Result<RowId> {
754        self.relational.insert(tx, table, values)
755    }
756
757    pub fn upsert_row(
758        &self,
759        tx: TxId,
760        table: &str,
761        conflict_col: &str,
762        values: HashMap<ColName, Value>,
763    ) -> Result<UpsertResult> {
764        let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
765        let meta = self.table_meta(table);
766        let new_state = meta
767            .as_ref()
768            .and_then(|m| m.state_machine.as_ref())
769            .and_then(|sm| values.get(&sm.column))
770            .and_then(Value::as_text)
771            .map(std::borrow::ToOwned::to_owned);
772
773        let result = self
774            .relational
775            .upsert(tx, table, conflict_col, values, self.snapshot())?;
776
777        if let (Some(uuid), Some(state), Some(_meta)) =
778            (row_uuid, new_state.as_deref(), meta.as_ref())
779            && matches!(result, UpsertResult::Updated)
780        {
781            self.propagate_state_change_if_needed(tx, table, Some(uuid), Some(state))?;
782        }
783
784        Ok(result)
785    }
786
787    pub(crate) fn propagate_state_change_if_needed(
788        &self,
789        tx: TxId,
790        table: &str,
791        row_uuid: Option<uuid::Uuid>,
792        new_state: Option<&str>,
793    ) -> Result<()> {
794        if let (Some(uuid), Some(state)) = (row_uuid, new_state) {
795            let already_propagating = self
796                .tx_mgr
797                .with_write_set(tx, |ws| ws.propagation_in_progress)?;
798            if !already_propagating {
799                self.tx_mgr
800                    .with_write_set(tx, |ws| ws.propagation_in_progress = true)?;
801                let propagate_result = self.propagate(tx, table, uuid, state);
802                self.tx_mgr
803                    .with_write_set(tx, |ws| ws.propagation_in_progress = false)?;
804                propagate_result?;
805            }
806        }
807
808        Ok(())
809    }
810
811    fn propagate(
812        &self,
813        tx: TxId,
814        table: &str,
815        row_uuid: uuid::Uuid,
816        new_state: &str,
817    ) -> Result<()> {
818        let snapshot = self.snapshot();
819        let metas = self.relational_store().table_meta.read().clone();
820        let mut queue: VecDeque<PropagationQueueEntry> = VecDeque::new();
821        let mut visited: HashSet<(String, uuid::Uuid)> = HashSet::new();
822        let mut abort_violation: Option<Error> = None;
823        let ctx = PropagationContext {
824            tx,
825            snapshot,
826            metas: &metas,
827        };
828        let root = PropagationSource {
829            table,
830            uuid: row_uuid,
831            state: new_state,
832            depth: 0,
833        };
834
835        self.enqueue_fk_children(&ctx, &mut queue, root);
836        self.enqueue_edge_children(&ctx, &mut queue, root)?;
837        self.apply_vector_exclusions(&ctx, root)?;
838
839        while let Some(entry) = queue.pop_front() {
840            if !visited.insert((entry.table.clone(), entry.uuid)) {
841                continue;
842            }
843
844            let Some(meta) = metas.get(&entry.table) else {
845                continue;
846            };
847
848            let Some(state_machine) = &meta.state_machine else {
849                let msg = format!(
850                    "warning: propagation target table {} has no state machine",
851                    entry.table
852                );
853                eprintln!("{msg}");
854                if entry.abort_on_failure && abort_violation.is_none() {
855                    abort_violation = Some(Error::PropagationAborted {
856                        table: entry.table.clone(),
857                        column: String::new(),
858                        from: String::new(),
859                        to: entry.target_state.clone(),
860                    });
861                }
862                continue;
863            };
864
865            let state_column = state_machine.column.clone();
866            let Some(existing) = self.relational.point_lookup_with_tx(
867                Some(tx),
868                &entry.table,
869                "id",
870                &Value::Uuid(entry.uuid),
871                snapshot,
872            )?
873            else {
874                continue;
875            };
876
877            let from_state = existing
878                .values
879                .get(&state_column)
880                .and_then(Value::as_text)
881                .unwrap_or("")
882                .to_string();
883
884            let mut next_values = existing.values.clone();
885            next_values.insert(
886                state_column.clone(),
887                Value::Text(entry.target_state.clone()),
888            );
889
890            let upsert_outcome =
891                self.relational
892                    .upsert(tx, &entry.table, "id", next_values, snapshot);
893
894            let reached_state = match upsert_outcome {
895                Ok(UpsertResult::Updated) => entry.target_state.as_str(),
896                Ok(UpsertResult::NoOp) | Ok(UpsertResult::Inserted) => continue,
897                Err(Error::InvalidStateTransition(_)) => {
898                    eprintln!(
899                        "warning: skipped invalid propagated transition {}.{} {} -> {}",
900                        entry.table, state_column, from_state, entry.target_state
901                    );
902                    if entry.abort_on_failure && abort_violation.is_none() {
903                        abort_violation = Some(Error::PropagationAborted {
904                            table: entry.table.clone(),
905                            column: state_column.clone(),
906                            from: from_state,
907                            to: entry.target_state.clone(),
908                        });
909                    }
910                    continue;
911                }
912                Err(err) => return Err(err),
913            };
914
915            self.enqueue_edge_children(
916                &ctx,
917                &mut queue,
918                PropagationSource {
919                    table: &entry.table,
920                    uuid: entry.uuid,
921                    state: reached_state,
922                    depth: entry.depth,
923                },
924            )?;
925            self.apply_vector_exclusions(
926                &ctx,
927                PropagationSource {
928                    table: &entry.table,
929                    uuid: entry.uuid,
930                    state: reached_state,
931                    depth: entry.depth,
932                },
933            )?;
934
935            self.enqueue_fk_children(
936                &ctx,
937                &mut queue,
938                PropagationSource {
939                    table: &entry.table,
940                    uuid: entry.uuid,
941                    state: reached_state,
942                    depth: entry.depth,
943                },
944            );
945        }
946
947        if let Some(err) = abort_violation {
948            return Err(err);
949        }
950
951        Ok(())
952    }
953
954    fn enqueue_fk_children(
955        &self,
956        ctx: &PropagationContext<'_>,
957        queue: &mut VecDeque<PropagationQueueEntry>,
958        source: PropagationSource<'_>,
959    ) {
960        for (owner_table, owner_meta) in ctx.metas {
961            for rule in &owner_meta.propagation_rules {
962                let PropagationRule::ForeignKey {
963                    fk_column,
964                    referenced_table,
965                    trigger_state,
966                    target_state,
967                    max_depth,
968                    abort_on_failure,
969                    ..
970                } = rule
971                else {
972                    continue;
973                };
974
975                if referenced_table != source.table || trigger_state != source.state {
976                    continue;
977                }
978
979                if source.depth >= *max_depth {
980                    continue;
981                }
982
983                let rows = match self.relational.scan_filter_with_tx(
984                    Some(ctx.tx),
985                    owner_table,
986                    ctx.snapshot,
987                    &|row| row.values.get(fk_column) == Some(&Value::Uuid(source.uuid)),
988                ) {
989                    Ok(rows) => rows,
990                    Err(err) => {
991                        eprintln!(
992                            "warning: propagation scan failed for {owner_table}.{fk_column}: {err}"
993                        );
994                        continue;
995                    }
996                };
997
998                for row in rows {
999                    if let Some(id) = row.values.get("id").and_then(Value::as_uuid).copied() {
1000                        queue.push_back(PropagationQueueEntry {
1001                            table: owner_table.clone(),
1002                            uuid: id,
1003                            target_state: target_state.clone(),
1004                            depth: source.depth + 1,
1005                            abort_on_failure: *abort_on_failure,
1006                        });
1007                    }
1008                }
1009            }
1010        }
1011    }
1012
1013    fn enqueue_edge_children(
1014        &self,
1015        ctx: &PropagationContext<'_>,
1016        queue: &mut VecDeque<PropagationQueueEntry>,
1017        source: PropagationSource<'_>,
1018    ) -> Result<()> {
1019        let Some(meta) = ctx.metas.get(source.table) else {
1020            return Ok(());
1021        };
1022
1023        for rule in &meta.propagation_rules {
1024            let PropagationRule::Edge {
1025                edge_type,
1026                direction,
1027                trigger_state,
1028                target_state,
1029                max_depth,
1030                abort_on_failure,
1031            } = rule
1032            else {
1033                continue;
1034            };
1035
1036            if trigger_state != source.state || source.depth >= *max_depth {
1037                continue;
1038            }
1039
1040            let bfs = self.query_bfs(
1041                source.uuid,
1042                Some(std::slice::from_ref(edge_type)),
1043                *direction,
1044                1,
1045                ctx.snapshot,
1046            )?;
1047
1048            for node in bfs.nodes {
1049                if self
1050                    .relational
1051                    .point_lookup_with_tx(
1052                        Some(ctx.tx),
1053                        source.table,
1054                        "id",
1055                        &Value::Uuid(node.id),
1056                        ctx.snapshot,
1057                    )?
1058                    .is_some()
1059                {
1060                    queue.push_back(PropagationQueueEntry {
1061                        table: source.table.to_string(),
1062                        uuid: node.id,
1063                        target_state: target_state.clone(),
1064                        depth: source.depth + 1,
1065                        abort_on_failure: *abort_on_failure,
1066                    });
1067                }
1068            }
1069        }
1070
1071        Ok(())
1072    }
1073
1074    fn apply_vector_exclusions(
1075        &self,
1076        ctx: &PropagationContext<'_>,
1077        source: PropagationSource<'_>,
1078    ) -> Result<()> {
1079        let Some(meta) = ctx.metas.get(source.table) else {
1080            return Ok(());
1081        };
1082
1083        for rule in &meta.propagation_rules {
1084            let PropagationRule::VectorExclusion { trigger_state } = rule else {
1085                continue;
1086            };
1087            if trigger_state != source.state {
1088                continue;
1089            }
1090            for row_id in self.logical_row_ids_for_uuid(ctx.tx, source.table, source.uuid) {
1091                self.delete_vector(ctx.tx, row_id)?;
1092            }
1093        }
1094
1095        Ok(())
1096    }
1097
1098    pub fn delete_row(&self, tx: TxId, table: &str, row_id: RowId) -> Result<()> {
1099        self.relational.delete(tx, table, row_id)
1100    }
1101
1102    pub fn scan(&self, table: &str, snapshot: SnapshotId) -> Result<Vec<VersionedRow>> {
1103        self.relational.scan(table, snapshot)
1104    }
1105
1106    pub fn scan_filter(
1107        &self,
1108        table: &str,
1109        snapshot: SnapshotId,
1110        predicate: &dyn Fn(&VersionedRow) -> bool,
1111    ) -> Result<Vec<VersionedRow>> {
1112        self.relational.scan_filter(table, snapshot, predicate)
1113    }
1114
1115    pub fn point_lookup(
1116        &self,
1117        table: &str,
1118        col: &str,
1119        value: &Value,
1120        snapshot: SnapshotId,
1121    ) -> Result<Option<VersionedRow>> {
1122        self.relational.point_lookup(table, col, value, snapshot)
1123    }
1124
1125    pub(crate) fn point_lookup_in_tx(
1126        &self,
1127        tx: TxId,
1128        table: &str,
1129        col: &str,
1130        value: &Value,
1131        snapshot: SnapshotId,
1132    ) -> Result<Option<VersionedRow>> {
1133        self.relational
1134            .point_lookup_with_tx(Some(tx), table, col, value, snapshot)
1135    }
1136
1137    pub(crate) fn logical_row_ids_for_uuid(
1138        &self,
1139        tx: TxId,
1140        table: &str,
1141        uuid: uuid::Uuid,
1142    ) -> Vec<RowId> {
1143        let mut row_ids = HashSet::new();
1144
1145        if let Some(rows) = self.relational_store.tables.read().get(table) {
1146            for row in rows {
1147                if row.values.get("id") == Some(&Value::Uuid(uuid)) {
1148                    row_ids.insert(row.row_id);
1149                }
1150            }
1151        }
1152
1153        let _ = self.tx_mgr.with_write_set(tx, |ws| {
1154            for (insert_table, row) in &ws.relational_inserts {
1155                if insert_table == table && row.values.get("id") == Some(&Value::Uuid(uuid)) {
1156                    row_ids.insert(row.row_id);
1157                }
1158            }
1159        });
1160
1161        row_ids.into_iter().collect()
1162    }
1163
1164    pub fn insert_edge(
1165        &self,
1166        tx: TxId,
1167        source: NodeId,
1168        target: NodeId,
1169        edge_type: EdgeType,
1170        properties: HashMap<String, Value>,
1171    ) -> Result<bool> {
1172        let bytes = estimate_edge_bytes(source, target, &edge_type, &properties);
1173        self.accountant.try_allocate_for(
1174            bytes,
1175            "graph_insert",
1176            "insert_edge",
1177            "Reduce edge fan-out or raise MEMORY_LIMIT before inserting more graph edges.",
1178        )?;
1179
1180        match self
1181            .graph
1182            .insert_edge(tx, source, target, edge_type, properties)
1183        {
1184            Ok(inserted) => {
1185                if !inserted {
1186                    self.accountant.release(bytes);
1187                }
1188                Ok(inserted)
1189            }
1190            Err(err) => {
1191                self.accountant.release(bytes);
1192                Err(err)
1193            }
1194        }
1195    }
1196
1197    pub fn delete_edge(
1198        &self,
1199        tx: TxId,
1200        source: NodeId,
1201        target: NodeId,
1202        edge_type: &str,
1203    ) -> Result<()> {
1204        self.graph.delete_edge(tx, source, target, edge_type)
1205    }
1206
1207    pub fn query_bfs(
1208        &self,
1209        start: NodeId,
1210        edge_types: Option<&[EdgeType]>,
1211        direction: Direction,
1212        max_depth: u32,
1213        snapshot: SnapshotId,
1214    ) -> Result<TraversalResult> {
1215        self.graph
1216            .bfs(start, edge_types, direction, 1, max_depth, snapshot)
1217    }
1218
1219    pub fn edge_count(
1220        &self,
1221        source: NodeId,
1222        edge_type: &str,
1223        snapshot: SnapshotId,
1224    ) -> Result<usize> {
1225        Ok(self.graph.edge_count(source, edge_type, snapshot))
1226    }
1227
1228    pub fn get_edge_properties(
1229        &self,
1230        source: NodeId,
1231        target: NodeId,
1232        edge_type: &str,
1233        snapshot: SnapshotId,
1234    ) -> Result<Option<HashMap<String, Value>>> {
1235        let props = self
1236            .graph_store
1237            .forward_adj
1238            .read()
1239            .get(&source)
1240            .and_then(|entries| {
1241                entries
1242                    .iter()
1243                    .rev()
1244                    .find(|entry| {
1245                        entry.target == target
1246                            && entry.edge_type == edge_type
1247                            && entry.visible_at(snapshot)
1248                    })
1249                    .map(|entry| entry.properties.clone())
1250            });
1251        Ok(props)
1252    }
1253
1254    pub fn insert_vector(&self, tx: TxId, row_id: RowId, vector: Vec<f32>) -> Result<()> {
1255        let bytes = estimate_vector_bytes(&vector);
1256        self.accountant.try_allocate_for(
1257            bytes,
1258            "insert",
1259            "vector_insert",
1260            "Reduce vector dimensionality, insert fewer rows, or raise MEMORY_LIMIT.",
1261        )?;
1262        self.vector
1263            .insert_vector(tx, row_id, vector)
1264            .inspect_err(|_| self.accountant.release(bytes))
1265    }
1266
1267    pub fn delete_vector(&self, tx: TxId, row_id: RowId) -> Result<()> {
1268        self.vector.delete_vector(tx, row_id)
1269    }
1270
1271    pub fn query_vector(
1272        &self,
1273        query: &[f32],
1274        k: usize,
1275        candidates: Option<&RoaringTreemap>,
1276        snapshot: SnapshotId,
1277    ) -> Result<Vec<(RowId, f32)>> {
1278        self.vector.search(query, k, candidates, snapshot)
1279    }
1280
1281    pub fn has_live_vector(&self, row_id: RowId, snapshot: SnapshotId) -> bool {
1282        self.vector_store
1283            .vectors
1284            .read()
1285            .iter()
1286            .any(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
1287    }
1288
1289    pub fn live_vector_entry(&self, row_id: RowId, snapshot: SnapshotId) -> Option<VectorEntry> {
1290        self.vector_store
1291            .vectors
1292            .read()
1293            .iter()
1294            .rev()
1295            .find(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
1296            .cloned()
1297    }
1298
1299    pub(crate) fn drop_table_aux_state(&self, table: &str) {
1300        let snapshot = self.snapshot();
1301        let rows = self.scan(table, snapshot).unwrap_or_default();
1302        let row_ids: HashSet<RowId> = rows.iter().map(|row| row.row_id).collect();
1303        let edge_keys: HashSet<(NodeId, EdgeType, NodeId)> = rows
1304            .iter()
1305            .filter_map(|row| {
1306                match (
1307                    row.values.get("source_id").and_then(Value::as_uuid),
1308                    row.values.get("target_id").and_then(Value::as_uuid),
1309                    row.values.get("edge_type").and_then(Value::as_text),
1310                ) {
1311                    (Some(source), Some(target), Some(edge_type)) => {
1312                        Some((*source, edge_type.to_string(), *target))
1313                    }
1314                    _ => None,
1315                }
1316            })
1317            .collect();
1318
1319        if !row_ids.is_empty() {
1320            let mut vectors = self.vector_store.vectors.write();
1321            vectors.retain(|entry| !row_ids.contains(&entry.row_id));
1322            self.vector_store.clear_hnsw(self.accountant());
1323        }
1324
1325        if !edge_keys.is_empty() {
1326            {
1327                let mut forward = self.graph_store.forward_adj.write();
1328                for entries in forward.values_mut() {
1329                    entries.retain(|entry| {
1330                        !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
1331                    });
1332                }
1333                forward.retain(|_, entries| !entries.is_empty());
1334            }
1335            {
1336                let mut reverse = self.graph_store.reverse_adj.write();
1337                for entries in reverse.values_mut() {
1338                    entries.retain(|entry| {
1339                        !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
1340                    });
1341                }
1342                reverse.retain(|_, entries| !entries.is_empty());
1343            }
1344        }
1345    }
1346
1347    pub fn table_names(&self) -> Vec<String> {
1348        self.relational_store.table_names()
1349    }
1350
1351    pub fn table_meta(&self, table: &str) -> Option<TableMeta> {
1352        self.relational_store.table_meta(table)
1353    }
1354
1355    /// Run one pruning cycle. Called by the background loop or manually in tests.
1356    pub fn run_pruning_cycle(&self) -> u64 {
1357        let _guard = self.pruning_guard.lock();
1358        prune_expired_rows(
1359            &self.relational_store,
1360            &self.graph_store,
1361            &self.vector_store,
1362            self.accountant(),
1363            self.persistence.as_ref(),
1364            self.sync_watermark(),
1365        )
1366    }
1367
1368    /// Set the pruning loop interval. Test-only API.
1369    pub fn set_pruning_interval(&self, interval: Duration) {
1370        self.stop_pruning_thread();
1371
1372        let shutdown = Arc::new(AtomicBool::new(false));
1373        let relational = self.relational_store.clone();
1374        let graph = self.graph_store.clone();
1375        let vector = self.vector_store.clone();
1376        let accountant = self.accountant.clone();
1377        let persistence = self.persistence.clone();
1378        let sync_watermark = self.sync_watermark.clone();
1379        let pruning_guard = self.pruning_guard.clone();
1380        let thread_shutdown = shutdown.clone();
1381
1382        let handle = thread::spawn(move || {
1383            while !thread_shutdown.load(Ordering::SeqCst) {
1384                {
1385                    let _guard = pruning_guard.lock();
1386                    let _ = prune_expired_rows(
1387                        &relational,
1388                        &graph,
1389                        &vector,
1390                        accountant.as_ref(),
1391                        persistence.as_ref(),
1392                        sync_watermark.load(Ordering::SeqCst),
1393                    );
1394                }
1395                sleep_with_shutdown(&thread_shutdown, interval);
1396            }
1397        });
1398
1399        let mut runtime = self.pruning_runtime.lock();
1400        runtime.shutdown = shutdown;
1401        runtime.handle = Some(handle);
1402    }
1403
1404    pub fn sync_watermark(&self) -> u64 {
1405        self.sync_watermark.load(Ordering::SeqCst)
1406    }
1407
1408    pub fn set_sync_watermark(&self, watermark: u64) {
1409        self.sync_watermark.store(watermark, Ordering::SeqCst);
1410    }
1411
1412    pub fn instance_id(&self) -> uuid::Uuid {
1413        self.instance_id
1414    }
1415
1416    pub fn open_memory_with_plugin_and_accountant(
1417        plugin: Arc<dyn DatabasePlugin>,
1418        accountant: Arc<MemoryAccountant>,
1419    ) -> Result<Self> {
1420        Self::open_memory_internal(plugin, accountant)
1421    }
1422
1423    pub fn open_memory_with_plugin(plugin: Arc<dyn DatabasePlugin>) -> Result<Self> {
1424        let db = Self::open_memory_with_plugin_and_accountant(
1425            plugin,
1426            Arc::new(MemoryAccountant::no_limit()),
1427        )?;
1428        db.plugin.on_open()?;
1429        Ok(db)
1430    }
1431
1432    pub fn close(&self) -> Result<()> {
1433        if self.closed.swap(true, Ordering::SeqCst) {
1434            return Ok(());
1435        }
1436        let tx = self.session_tx.lock().take();
1437        if let Some(tx) = tx {
1438            self.rollback(tx)?;
1439        }
1440        self.stop_pruning_thread();
1441        self.subscriptions.lock().subscribers.clear();
1442        if let Some(persistence) = &self.persistence {
1443            persistence.close();
1444        }
1445        self.plugin.on_close()
1446    }
1447
1448    /// File-backed database with custom plugin.
1449    pub fn open_with_plugin(
1450        path: impl AsRef<Path>,
1451        plugin: Arc<dyn DatabasePlugin>,
1452    ) -> Result<Self> {
1453        let db = Self::open_loaded(path, plugin, Arc::new(MemoryAccountant::no_limit()), None)?;
1454        db.plugin.on_open()?;
1455        Ok(db)
1456    }
1457
1458    /// Full constructor with budget.
1459    pub fn open_with_config(
1460        path: impl AsRef<Path>,
1461        plugin: Arc<dyn DatabasePlugin>,
1462        accountant: Arc<MemoryAccountant>,
1463    ) -> Result<Self> {
1464        Self::open_with_config_and_disk_limit(path, plugin, accountant, None)
1465    }
1466
1467    pub fn open_with_config_and_disk_limit(
1468        path: impl AsRef<Path>,
1469        plugin: Arc<dyn DatabasePlugin>,
1470        accountant: Arc<MemoryAccountant>,
1471        startup_disk_limit: Option<u64>,
1472    ) -> Result<Self> {
1473        let path = path.as_ref();
1474        if path.as_os_str() == ":memory:" {
1475            return Self::open_memory_with_plugin_and_accountant(plugin, accountant);
1476        }
1477        let accountant = if let Some(limit) = persisted_memory_limit(path)? {
1478            let usage = accountant.usage();
1479            if usage.limit.is_none() && usage.startup_ceiling.is_none() {
1480                Arc::new(MemoryAccountant::with_budget(limit))
1481            } else {
1482                accountant
1483            }
1484        } else {
1485            accountant
1486        };
1487        let db = Self::open_loaded(path, plugin, accountant, startup_disk_limit)?;
1488        db.plugin.on_open()?;
1489        Ok(db)
1490    }
1491
1492    /// In-memory database with budget.
1493    pub fn open_memory_with_accountant(accountant: Arc<MemoryAccountant>) -> Self {
1494        Self::open_memory_internal(Arc::new(CorePlugin), accountant)
1495            .expect("failed to open in-memory database with accountant")
1496    }
1497
1498    /// Access the memory accountant.
1499    pub fn accountant(&self) -> &MemoryAccountant {
1500        &self.accountant
1501    }
1502
1503    fn account_loaded_state(&self) -> Result<()> {
1504        let metadata_bytes = self
1505            .relational_store
1506            .table_meta
1507            .read()
1508            .values()
1509            .fold(0usize, |acc, meta| {
1510                acc.saturating_add(meta.estimated_bytes())
1511            });
1512        self.accountant.try_allocate_for(
1513            metadata_bytes,
1514            "open",
1515            "load_table_metadata",
1516            "Open the database with a larger MEMORY_LIMIT or reduce stored schema metadata.",
1517        )?;
1518
1519        let row_bytes =
1520            self.relational_store
1521                .tables
1522                .read()
1523                .iter()
1524                .fold(0usize, |acc, (table, rows)| {
1525                    let meta = self.table_meta(table);
1526                    acc.saturating_add(rows.iter().fold(0usize, |inner, row| {
1527                        inner.saturating_add(meta.as_ref().map_or_else(
1528                            || row.estimated_bytes(),
1529                            |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
1530                        ))
1531                    }))
1532                });
1533        self.accountant.try_allocate_for(
1534            row_bytes,
1535            "open",
1536            "load_rows",
1537            "Open the database with a larger MEMORY_LIMIT or prune retained rows first.",
1538        )?;
1539
1540        let edge_bytes = self
1541            .graph_store
1542            .forward_adj
1543            .read()
1544            .values()
1545            .flatten()
1546            .filter(|edge| edge.deleted_tx.is_none())
1547            .fold(0usize, |acc, edge| {
1548                acc.saturating_add(edge.estimated_bytes())
1549            });
1550        self.accountant.try_allocate_for(
1551            edge_bytes,
1552            "open",
1553            "load_edges",
1554            "Open the database with a larger MEMORY_LIMIT or reduce graph edge volume.",
1555        )?;
1556
1557        let vector_bytes = self
1558            .vector_store
1559            .vectors
1560            .read()
1561            .iter()
1562            .filter(|entry| entry.deleted_tx.is_none())
1563            .fold(0usize, |acc, entry| {
1564                acc.saturating_add(entry.estimated_bytes())
1565            });
1566        self.accountant.try_allocate_for(
1567            vector_bytes,
1568            "open",
1569            "load_vectors",
1570            "Open the database with a larger MEMORY_LIMIT or reduce stored vector data.",
1571        )?;
1572
1573        Ok(())
1574    }
1575
1576    fn release_insert_allocations(&self, ws: &contextdb_tx::WriteSet) {
1577        for (table, row) in &ws.relational_inserts {
1578            let bytes = self
1579                .table_meta(table)
1580                .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
1581                .unwrap_or_else(|| row.estimated_bytes());
1582            self.accountant.release(bytes);
1583        }
1584
1585        for edge in &ws.adj_inserts {
1586            self.accountant.release(edge.estimated_bytes());
1587        }
1588
1589        for entry in &ws.vector_inserts {
1590            self.accountant.release(entry.estimated_bytes());
1591        }
1592    }
1593
1594    fn release_delete_allocations(&self, ws: &contextdb_tx::WriteSet) {
1595        for (table, row_id, _) in &ws.relational_deletes {
1596            if let Some(row) = self.find_row_by_id(table, *row_id) {
1597                let bytes = self
1598                    .table_meta(table)
1599                    .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
1600                    .unwrap_or_else(|| row.estimated_bytes());
1601                self.accountant.release(bytes);
1602            }
1603        }
1604
1605        for (source, edge_type, target, _) in &ws.adj_deletes {
1606            if let Some(edge) = self.find_edge(source, target, edge_type) {
1607                self.accountant.release(edge.estimated_bytes());
1608            }
1609        }
1610
1611        for (row_id, _) in &ws.vector_deletes {
1612            if let Some(vector) = self.find_vector_by_row_id(*row_id) {
1613                self.accountant.release(vector.estimated_bytes());
1614            }
1615        }
1616
1617        if !ws.vector_deletes.is_empty() {
1618            self.vector_store.clear_hnsw(self.accountant());
1619        }
1620    }
1621
1622    fn find_row_by_id(&self, table: &str, row_id: RowId) -> Option<VersionedRow> {
1623        self.relational_store
1624            .tables
1625            .read()
1626            .get(table)
1627            .and_then(|rows| rows.iter().find(|row| row.row_id == row_id))
1628            .cloned()
1629    }
1630
1631    fn find_vector_by_row_id(&self, row_id: RowId) -> Option<VectorEntry> {
1632        self.vector_store
1633            .vectors
1634            .read()
1635            .iter()
1636            .find(|entry| entry.row_id == row_id)
1637            .cloned()
1638    }
1639
1640    fn find_edge(&self, source: &NodeId, target: &NodeId, edge_type: &str) -> Option<AdjEntry> {
1641        self.graph_store
1642            .forward_adj
1643            .read()
1644            .get(source)
1645            .and_then(|entries| {
1646                entries
1647                    .iter()
1648                    .find(|entry| entry.target == *target && entry.edge_type == edge_type)
1649                    .cloned()
1650            })
1651    }
1652
1653    pub(crate) fn write_set_checkpoint(
1654        &self,
1655        tx: TxId,
1656    ) -> Result<(usize, usize, usize, usize, usize)> {
1657        self.tx_mgr.with_write_set(tx, |ws| {
1658            (
1659                ws.relational_inserts.len(),
1660                ws.relational_deletes.len(),
1661                ws.adj_inserts.len(),
1662                ws.vector_inserts.len(),
1663                ws.vector_deletes.len(),
1664            )
1665        })
1666    }
1667
1668    pub(crate) fn restore_write_set_checkpoint(
1669        &self,
1670        tx: TxId,
1671        checkpoint: (usize, usize, usize, usize, usize),
1672    ) -> Result<()> {
1673        self.tx_mgr.with_write_set(tx, |ws| {
1674            ws.relational_inserts.truncate(checkpoint.0);
1675            ws.relational_deletes.truncate(checkpoint.1);
1676            ws.adj_inserts.truncate(checkpoint.2);
1677            ws.vector_inserts.truncate(checkpoint.3);
1678            ws.vector_deletes.truncate(checkpoint.4);
1679        })
1680    }
1681
1682    /// Get a clone of the current conflict policies.
1683    pub fn conflict_policies(&self) -> ConflictPolicies {
1684        self.conflict_policies.read().clone()
1685    }
1686
1687    /// Set the default conflict policy.
1688    pub fn set_default_conflict_policy(&self, policy: ConflictPolicy) {
1689        self.conflict_policies.write().default = policy;
1690    }
1691
1692    /// Set a per-table conflict policy.
1693    pub fn set_table_conflict_policy(&self, table: &str, policy: ConflictPolicy) {
1694        self.conflict_policies
1695            .write()
1696            .per_table
1697            .insert(table.to_string(), policy);
1698    }
1699
1700    /// Remove a per-table conflict policy override.
1701    pub fn drop_table_conflict_policy(&self, table: &str) {
1702        self.conflict_policies.write().per_table.remove(table);
1703    }
1704
1705    pub fn plugin(&self) -> &dyn DatabasePlugin {
1706        self.plugin.as_ref()
1707    }
1708
1709    pub fn plugin_health(&self) -> PluginHealth {
1710        self.plugin.health()
1711    }
1712
1713    pub fn plugin_describe(&self) -> serde_json::Value {
1714        self.plugin.describe()
1715    }
1716
1717    pub(crate) fn graph(&self) -> &MemGraphExecutor<DynStore> {
1718        &self.graph
1719    }
1720
1721    pub(crate) fn relational_store(&self) -> &Arc<RelationalStore> {
1722        &self.relational_store
1723    }
1724
1725    pub(crate) fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
1726    where
1727        F: FnOnce(u64) -> R,
1728    {
1729        self.tx_mgr.allocate_ddl_lsn(f)
1730    }
1731
1732    pub(crate) fn with_commit_lock<F, R>(&self, f: F) -> R
1733    where
1734        F: FnOnce() -> R,
1735    {
1736        self.tx_mgr.with_commit_lock(f)
1737    }
1738
1739    pub(crate) fn log_create_table_ddl(&self, name: &str, meta: &TableMeta, lsn: u64) {
1740        let change = ddl_change_from_meta(name, meta);
1741        self.ddl_log.write().push((lsn, change.clone()));
1742        if let Some(persistence) = &self.persistence {
1743            let _ = persistence.append_ddl_log(lsn, &change);
1744        }
1745    }
1746
1747    pub(crate) fn log_drop_table_ddl(&self, name: &str, lsn: u64) {
1748        let change = DdlChange::DropTable {
1749            name: name.to_string(),
1750        };
1751        self.ddl_log.write().push((lsn, change.clone()));
1752        if let Some(persistence) = &self.persistence {
1753            let _ = persistence.append_ddl_log(lsn, &change);
1754        }
1755    }
1756
1757    pub(crate) fn log_alter_table_ddl(&self, name: &str, meta: &TableMeta, lsn: u64) {
1758        let change = DdlChange::AlterTable {
1759            name: name.to_string(),
1760            columns: meta
1761                .columns
1762                .iter()
1763                .map(|c| {
1764                    (
1765                        c.name.clone(),
1766                        sql_type_for_meta_column(c, &meta.propagation_rules),
1767                    )
1768                })
1769                .collect(),
1770            constraints: create_table_constraints_from_meta(meta),
1771        };
1772        self.ddl_log.write().push((lsn, change.clone()));
1773        if let Some(persistence) = &self.persistence {
1774            let _ = persistence.append_ddl_log(lsn, &change);
1775        }
1776    }
1777
1778    pub(crate) fn persist_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
1779        if let Some(persistence) = &self.persistence {
1780            persistence.flush_table_meta(name, meta)?;
1781        }
1782        Ok(())
1783    }
1784
1785    pub(crate) fn persist_memory_limit(&self, limit: Option<usize>) -> Result<()> {
1786        if let Some(persistence) = &self.persistence {
1787            match limit {
1788                Some(limit) => persistence.flush_config_value("memory_limit", &limit)?,
1789                None => persistence.remove_config_value("memory_limit")?,
1790            }
1791        }
1792        Ok(())
1793    }
1794
1795    pub fn set_disk_limit(&self, limit: Option<u64>) -> Result<()> {
1796        if self.persistence.is_none() {
1797            self.disk_limit.store(0, Ordering::SeqCst);
1798            return Ok(());
1799        }
1800
1801        let ceiling = self.disk_limit_startup_ceiling();
1802        if let Some(ceiling) = ceiling {
1803            match limit {
1804                Some(bytes) if bytes > ceiling => {
1805                    return Err(Error::Other(format!(
1806                        "disk limit {bytes} exceeds startup ceiling {ceiling}"
1807                    )));
1808                }
1809                None => {
1810                    return Err(Error::Other(
1811                        "cannot remove disk limit when a startup ceiling is set".to_string(),
1812                    ));
1813                }
1814                _ => {}
1815            }
1816        }
1817
1818        self.disk_limit.store(limit.unwrap_or(0), Ordering::SeqCst);
1819        Ok(())
1820    }
1821
1822    pub fn disk_limit(&self) -> Option<u64> {
1823        match self.disk_limit.load(Ordering::SeqCst) {
1824            0 => None,
1825            bytes => Some(bytes),
1826        }
1827    }
1828
1829    pub fn disk_limit_startup_ceiling(&self) -> Option<u64> {
1830        match self.disk_limit_startup_ceiling.load(Ordering::SeqCst) {
1831            0 => None,
1832            bytes => Some(bytes),
1833        }
1834    }
1835
1836    pub fn disk_file_size(&self) -> Option<u64> {
1837        self.persistence
1838            .as_ref()
1839            .map(|persistence| std::fs::metadata(persistence.path()).map(|meta| meta.len()))
1840            .transpose()
1841            .ok()
1842            .flatten()
1843    }
1844
1845    pub(crate) fn persist_disk_limit(&self, limit: Option<u64>) -> Result<()> {
1846        if let Some(persistence) = &self.persistence {
1847            match limit {
1848                Some(limit) => persistence.flush_config_value("disk_limit", &limit)?,
1849                None => persistence.remove_config_value("disk_limit")?,
1850            }
1851        }
1852        Ok(())
1853    }
1854
1855    pub fn check_disk_budget(&self, operation: &str) -> Result<()> {
1856        let Some(limit) = self.disk_limit() else {
1857            return Ok(());
1858        };
1859        let Some(current_bytes) = self.disk_file_size() else {
1860            return Ok(());
1861        };
1862        if current_bytes < limit {
1863            return Ok(());
1864        }
1865        Err(Error::DiskBudgetExceeded {
1866            operation: operation.to_string(),
1867            current_bytes,
1868            budget_limit_bytes: limit,
1869            hint: "Reduce retained file-backed data or raise DISK_LIMIT before writing more data."
1870                .to_string(),
1871        })
1872    }
1873
1874    pub fn persisted_sync_watermarks(&self, tenant_id: &str) -> Result<(u64, u64)> {
1875        let Some(persistence) = &self.persistence else {
1876            return Ok((0, 0));
1877        };
1878        let push = persistence
1879            .load_config_value::<u64>(&format!("sync_push_watermark:{tenant_id}"))?
1880            .unwrap_or(0);
1881        let pull = persistence
1882            .load_config_value::<u64>(&format!("sync_pull_watermark:{tenant_id}"))?
1883            .unwrap_or(0);
1884        Ok((push, pull))
1885    }
1886
1887    pub fn persist_sync_push_watermark(&self, tenant_id: &str, watermark: u64) -> Result<()> {
1888        if let Some(persistence) = &self.persistence {
1889            persistence
1890                .flush_config_value(&format!("sync_push_watermark:{tenant_id}"), &watermark)?;
1891        }
1892        Ok(())
1893    }
1894
1895    pub fn persist_sync_pull_watermark(&self, tenant_id: &str, watermark: u64) -> Result<()> {
1896        if let Some(persistence) = &self.persistence {
1897            persistence
1898                .flush_config_value(&format!("sync_pull_watermark:{tenant_id}"), &watermark)?;
1899        }
1900        Ok(())
1901    }
1902
1903    pub(crate) fn persist_table_rows(&self, name: &str) -> Result<()> {
1904        if let Some(persistence) = &self.persistence {
1905            let tables = self.relational_store.tables.read();
1906            if let Some(rows) = tables.get(name) {
1907                persistence.rewrite_table_rows(name, rows)?;
1908            }
1909        }
1910        Ok(())
1911    }
1912
1913    pub(crate) fn remove_persisted_table(&self, name: &str) -> Result<()> {
1914        if let Some(persistence) = &self.persistence {
1915            persistence.remove_table_meta(name)?;
1916            persistence.remove_table_data(name)?;
1917        }
1918        Ok(())
1919    }
1920
1921    pub fn change_log_since(&self, since_lsn: u64) -> Vec<ChangeLogEntry> {
1922        let log = self.change_log.read();
1923        let start = log.partition_point(|e| e.lsn() <= since_lsn);
1924        log[start..].to_vec()
1925    }
1926
1927    pub fn ddl_log_since(&self, since_lsn: u64) -> Vec<DdlChange> {
1928        let ddl = self.ddl_log.read();
1929        let start = ddl.partition_point(|(lsn, _)| *lsn <= since_lsn);
1930        ddl[start..].iter().map(|(_, c)| c.clone()).collect()
1931    }
1932
1933    /// Builds a complete snapshot of all live data as a ChangeSet.
1934    /// Used as fallback when change_log/ddl_log cannot serve a watermark.
1935    #[allow(dead_code)]
1936    fn full_state_snapshot(&self) -> ChangeSet {
1937        let mut rows = Vec::new();
1938        let mut edges = Vec::new();
1939        let mut vectors = Vec::new();
1940        let mut ddl = Vec::new();
1941
1942        let meta_guard = self.relational_store.table_meta.read();
1943        let tables_guard = self.relational_store.tables.read();
1944
1945        // DDL
1946        for (name, meta) in meta_guard.iter() {
1947            ddl.push(ddl_change_from_meta(name, meta));
1948        }
1949
1950        // Rows (live only) — collect row_ids that have live rows for orphan vector filtering
1951        let mut live_row_ids: HashSet<RowId> = HashSet::new();
1952        for (table_name, table_rows) in tables_guard.iter() {
1953            let meta = match meta_guard.get(table_name) {
1954                Some(m) => m,
1955                None => continue,
1956            };
1957            let key_col = meta.natural_key_column.clone().unwrap_or_else(|| {
1958                if meta
1959                    .columns
1960                    .iter()
1961                    .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
1962                {
1963                    "id".to_string()
1964                } else {
1965                    String::new()
1966                }
1967            });
1968            if key_col.is_empty() {
1969                continue;
1970            }
1971            for row in table_rows.iter().filter(|r| r.deleted_tx.is_none()) {
1972                let key_val = match row.values.get(&key_col) {
1973                    Some(v) => v.clone(),
1974                    None => continue,
1975                };
1976                live_row_ids.insert(row.row_id);
1977                rows.push(RowChange {
1978                    table: table_name.clone(),
1979                    natural_key: NaturalKey {
1980                        column: key_col.clone(),
1981                        value: key_val,
1982                    },
1983                    values: row.values.clone(),
1984                    deleted: false,
1985                    lsn: row.lsn,
1986                });
1987            }
1988        }
1989
1990        drop(tables_guard);
1991        drop(meta_guard);
1992
1993        // Edges (live only)
1994        let fwd = self.graph_store.forward_adj.read();
1995        for (_source, entries) in fwd.iter() {
1996            for entry in entries.iter().filter(|e| e.deleted_tx.is_none()) {
1997                edges.push(EdgeChange {
1998                    source: entry.source,
1999                    target: entry.target,
2000                    edge_type: entry.edge_type.clone(),
2001                    properties: entry.properties.clone(),
2002                    lsn: entry.lsn,
2003                });
2004            }
2005        }
2006        drop(fwd);
2007
2008        // Vectors (live only, skip orphans, first entry per row_id)
2009        let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
2010        let vecs = self.vector_store.vectors.read();
2011        for entry in vecs.iter().filter(|v| v.deleted_tx.is_none()) {
2012            if !live_row_ids.contains(&entry.row_id) {
2013                continue; // skip orphan vectors
2014            }
2015            if !seen_vector_rows.insert(entry.row_id) {
2016                continue; // first live entry per row_id only
2017            }
2018            vectors.push(VectorChange {
2019                row_id: entry.row_id,
2020                vector: entry.vector.clone(),
2021                lsn: entry.lsn,
2022            });
2023        }
2024        drop(vecs);
2025
2026        ChangeSet {
2027            rows,
2028            edges,
2029            vectors,
2030            ddl,
2031        }
2032    }
2033
2034    fn persisted_state_since(&self, since_lsn: u64) -> ChangeSet {
2035        if since_lsn == 0 {
2036            return self.full_state_snapshot();
2037        }
2038
2039        let mut rows = Vec::new();
2040        let mut edges = Vec::new();
2041        let mut vectors = Vec::new();
2042        let ddl = Vec::new();
2043
2044        let meta_guard = self.relational_store.table_meta.read();
2045        let tables_guard = self.relational_store.tables.read();
2046
2047        let mut live_row_ids: HashSet<RowId> = HashSet::new();
2048        for (table_name, table_rows) in tables_guard.iter() {
2049            let meta = match meta_guard.get(table_name) {
2050                Some(meta) => meta,
2051                None => continue,
2052            };
2053            let key_col = meta.natural_key_column.clone().unwrap_or_else(|| {
2054                if meta
2055                    .columns
2056                    .iter()
2057                    .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
2058                {
2059                    "id".to_string()
2060                } else {
2061                    String::new()
2062                }
2063            });
2064            if key_col.is_empty() {
2065                continue;
2066            }
2067            for row in table_rows.iter().filter(|row| row.deleted_tx.is_none()) {
2068                live_row_ids.insert(row.row_id);
2069                if row.lsn <= since_lsn {
2070                    continue;
2071                }
2072                let key_val = match row.values.get(&key_col) {
2073                    Some(value) => value.clone(),
2074                    None => continue,
2075                };
2076                rows.push(RowChange {
2077                    table: table_name.clone(),
2078                    natural_key: NaturalKey {
2079                        column: key_col.clone(),
2080                        value: key_val,
2081                    },
2082                    values: row.values.clone(),
2083                    deleted: false,
2084                    lsn: row.lsn,
2085                });
2086            }
2087        }
2088        drop(tables_guard);
2089        drop(meta_guard);
2090
2091        let fwd = self.graph_store.forward_adj.read();
2092        for entries in fwd.values() {
2093            for entry in entries
2094                .iter()
2095                .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
2096            {
2097                edges.push(EdgeChange {
2098                    source: entry.source,
2099                    target: entry.target,
2100                    edge_type: entry.edge_type.clone(),
2101                    properties: entry.properties.clone(),
2102                    lsn: entry.lsn,
2103                });
2104            }
2105        }
2106        drop(fwd);
2107
2108        let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
2109        let vecs = self.vector_store.vectors.read();
2110        for entry in vecs
2111            .iter()
2112            .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
2113        {
2114            if !live_row_ids.contains(&entry.row_id) {
2115                continue;
2116            }
2117            if !seen_vector_rows.insert(entry.row_id) {
2118                continue;
2119            }
2120            vectors.push(VectorChange {
2121                row_id: entry.row_id,
2122                vector: entry.vector.clone(),
2123                lsn: entry.lsn,
2124            });
2125        }
2126        drop(vecs);
2127
2128        ChangeSet {
2129            rows,
2130            edges,
2131            vectors,
2132            ddl,
2133        }
2134    }
2135
2136    fn preflight_sync_apply_memory(
2137        &self,
2138        changes: &ChangeSet,
2139        policies: &ConflictPolicies,
2140    ) -> Result<()> {
2141        let usage = self.accountant.usage();
2142        let Some(limit) = usage.limit else {
2143            return Ok(());
2144        };
2145        let available = usage.available.unwrap_or(limit);
2146        let mut required = 0usize;
2147
2148        for row in &changes.rows {
2149            if row.deleted || row.values.is_empty() {
2150                continue;
2151            }
2152
2153            let policy = policies
2154                .per_table
2155                .get(&row.table)
2156                .copied()
2157                .unwrap_or(policies.default);
2158            let existing = self.point_lookup(
2159                &row.table,
2160                &row.natural_key.column,
2161                &row.natural_key.value,
2162                self.snapshot(),
2163            )?;
2164
2165            if existing.is_some()
2166                && matches!(
2167                    policy,
2168                    ConflictPolicy::InsertIfNotExists | ConflictPolicy::ServerWins
2169                )
2170            {
2171                continue;
2172            }
2173
2174            required = required.saturating_add(
2175                self.table_meta(&row.table)
2176                    .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
2177                    .unwrap_or_else(|| estimate_row_value_bytes(&row.values)),
2178            );
2179        }
2180
2181        for edge in &changes.edges {
2182            required = required.saturating_add(
2183                96 + edge.edge_type.len().saturating_mul(16)
2184                    + estimate_row_value_bytes(&edge.properties),
2185            );
2186        }
2187
2188        for vector in &changes.vectors {
2189            if vector.vector.is_empty() {
2190                continue;
2191            }
2192            required = required.saturating_add(
2193                24 + vector
2194                    .vector
2195                    .len()
2196                    .saturating_mul(std::mem::size_of::<f32>()),
2197            );
2198        }
2199
2200        if required > available {
2201            return Err(Error::MemoryBudgetExceeded {
2202                subsystem: "sync".to_string(),
2203                operation: "apply_changes".to_string(),
2204                requested_bytes: required,
2205                available_bytes: available,
2206                budget_limit_bytes: limit,
2207                hint:
2208                    "Reduce sync batch size, split the push, or raise MEMORY_LIMIT on the server."
2209                        .to_string(),
2210            });
2211        }
2212
2213        Ok(())
2214    }
2215
2216    /// Extracts changes from this database since the given LSN.
2217    pub fn changes_since(&self, since_lsn: u64) -> ChangeSet {
2218        // Future watermark guard
2219        if since_lsn > self.current_lsn() {
2220            return ChangeSet::default();
2221        }
2222
2223        // Check if the ephemeral logs can serve the requested watermark.
2224        // After restart, both logs are empty but stores may have data — fall back to snapshot.
2225        let log = self.change_log.read();
2226        let change_first_lsn = log.first().map(|e| e.lsn());
2227        let change_log_empty = log.is_empty();
2228        drop(log);
2229
2230        let ddl = self.ddl_log.read();
2231        let ddl_first_lsn = ddl.first().map(|(lsn, _)| *lsn);
2232        let ddl_log_empty = ddl.is_empty();
2233        drop(ddl);
2234
2235        let has_table_data = !self
2236            .relational_store
2237            .tables
2238            .read()
2239            .values()
2240            .all(|rows| rows.is_empty());
2241        let has_table_meta = !self.relational_store.table_meta.read().is_empty();
2242
2243        // If both logs are empty but stores have data → post-restart, derive deltas from
2244        // persisted row/edge/vector LSNs instead of replaying a full snapshot.
2245        if change_log_empty && ddl_log_empty && (has_table_data || has_table_meta) {
2246            return self.persisted_state_since(since_lsn);
2247        }
2248
2249        // If logs have entries, check the minimum first-LSN across both covers since_lsn
2250        let min_first_lsn = match (change_first_lsn, ddl_first_lsn) {
2251            (Some(c), Some(d)) => Some(c.min(d)),
2252            (Some(c), None) => Some(c),
2253            (None, Some(d)) => Some(d),
2254            (None, None) => None, // both empty, stores empty — nothing to serve
2255        };
2256
2257        if min_first_lsn.is_some_and(|min_lsn| min_lsn > since_lsn + 1) {
2258            // Log doesn't cover since_lsn — derive the delta from persisted state.
2259            return self.persisted_state_since(since_lsn);
2260        }
2261
2262        let (ddl, change_entries) = self.with_commit_lock(|| {
2263            let ddl = self.ddl_log_since(since_lsn);
2264            let changes = self.change_log_since(since_lsn);
2265            (ddl, changes)
2266        });
2267
2268        let mut rows = Vec::new();
2269        let mut edges = Vec::new();
2270        let mut vectors = Vec::new();
2271
2272        for entry in change_entries {
2273            match entry {
2274                ChangeLogEntry::RowInsert { table, row_id, lsn } => {
2275                    if let Some((natural_key, values)) = self.row_change_values(&table, row_id) {
2276                        rows.push(RowChange {
2277                            table,
2278                            natural_key,
2279                            values,
2280                            deleted: false,
2281                            lsn,
2282                        });
2283                    }
2284                }
2285                ChangeLogEntry::RowDelete {
2286                    table,
2287                    natural_key,
2288                    lsn,
2289                    ..
2290                } => {
2291                    let mut values = HashMap::new();
2292                    values.insert("__deleted".to_string(), Value::Bool(true));
2293                    rows.push(RowChange {
2294                        table,
2295                        natural_key,
2296                        values,
2297                        deleted: true,
2298                        lsn,
2299                    });
2300                }
2301                ChangeLogEntry::EdgeInsert {
2302                    source,
2303                    target,
2304                    edge_type,
2305                    lsn,
2306                } => {
2307                    let properties = self
2308                        .edge_properties(source, target, &edge_type, lsn)
2309                        .unwrap_or_default();
2310                    edges.push(EdgeChange {
2311                        source,
2312                        target,
2313                        edge_type,
2314                        properties,
2315                        lsn,
2316                    });
2317                }
2318                ChangeLogEntry::EdgeDelete {
2319                    source,
2320                    target,
2321                    edge_type,
2322                    lsn,
2323                } => {
2324                    let mut properties = HashMap::new();
2325                    properties.insert("__deleted".to_string(), Value::Bool(true));
2326                    edges.push(EdgeChange {
2327                        source,
2328                        target,
2329                        edge_type,
2330                        properties,
2331                        lsn,
2332                    });
2333                }
2334                ChangeLogEntry::VectorInsert { row_id, lsn } => {
2335                    if let Some(vector) = self.vector_for_row_lsn(row_id, lsn) {
2336                        vectors.push(VectorChange {
2337                            row_id,
2338                            vector,
2339                            lsn,
2340                        });
2341                    }
2342                }
2343                ChangeLogEntry::VectorDelete { row_id, lsn } => vectors.push(VectorChange {
2344                    row_id,
2345                    vector: Vec::new(),
2346                    lsn,
2347                }),
2348            }
2349        }
2350
2351        // Deduplicate upserts: when a RowDelete is followed by a RowInsert for the same
2352        // (table, natural_key), the delete is part of an upsert — remove it.
2353        // Only remove a delete if there is a non-delete entry with a HIGHER LSN
2354        // (i.e., the insert came after the delete, indicating an upsert).
2355        // If the insert has a lower LSN, the delete is genuine and must be kept.
2356        let insert_max_lsn: HashMap<(String, String, String), u64> = {
2357            let mut map: HashMap<(String, String, String), u64> = HashMap::new();
2358            for r in rows.iter().filter(|r| !r.deleted) {
2359                let key = (
2360                    r.table.clone(),
2361                    r.natural_key.column.clone(),
2362                    format!("{:?}", r.natural_key.value),
2363                );
2364                let entry = map.entry(key).or_insert(0);
2365                if r.lsn > *entry {
2366                    *entry = r.lsn;
2367                }
2368            }
2369            map
2370        };
2371        rows.retain(|r| {
2372            if r.deleted {
2373                let key = (
2374                    r.table.clone(),
2375                    r.natural_key.column.clone(),
2376                    format!("{:?}", r.natural_key.value),
2377                );
2378                // Keep the delete unless there is a subsequent insert (higher or equal LSN).
2379                // Equal LSN means the delete+insert are part of the same upsert transaction.
2380                match insert_max_lsn.get(&key) {
2381                    Some(&insert_lsn) => insert_lsn < r.lsn,
2382                    None => true,
2383                }
2384            } else {
2385                true
2386            }
2387        });
2388
2389        ChangeSet {
2390            rows,
2391            edges,
2392            vectors,
2393            ddl,
2394        }
2395    }
2396
2397    /// Returns the current LSN of this database.
2398    pub fn current_lsn(&self) -> u64 {
2399        self.tx_mgr.current_lsn()
2400    }
2401
2402    /// Subscribe to commit events. Returns a receiver that yields a `CommitEvent`
2403    /// after each commit.
2404    pub fn subscribe(&self) -> Receiver<CommitEvent> {
2405        self.subscribe_with_capacity(DEFAULT_SUBSCRIPTION_CAPACITY)
2406    }
2407
2408    /// Subscribe with a custom channel capacity.
2409    pub fn subscribe_with_capacity(&self, capacity: usize) -> Receiver<CommitEvent> {
2410        let (tx, rx) = mpsc::sync_channel(capacity.max(1));
2411        self.subscriptions.lock().subscribers.push(tx);
2412        rx
2413    }
2414
2415    /// Returns health metrics for the subscription system.
2416    pub fn subscription_health(&self) -> SubscriptionMetrics {
2417        let subscriptions = self.subscriptions.lock();
2418        SubscriptionMetrics {
2419            active_channels: subscriptions.subscribers.len(),
2420            events_sent: subscriptions.events_sent,
2421            events_dropped: subscriptions.events_dropped,
2422        }
2423    }
2424
2425    /// Applies a ChangeSet to this database with the given conflict policies.
2426    pub fn apply_changes(
2427        &self,
2428        mut changes: ChangeSet,
2429        policies: &ConflictPolicies,
2430    ) -> Result<ApplyResult> {
2431        self.plugin.on_sync_pull(&mut changes)?;
2432        self.check_disk_budget("sync_pull")?;
2433        self.preflight_sync_apply_memory(&changes, policies)?;
2434
2435        let mut tx = self.begin();
2436        let mut result = ApplyResult {
2437            applied_rows: 0,
2438            skipped_rows: 0,
2439            conflicts: Vec::new(),
2440            new_lsn: self.current_lsn(),
2441        };
2442        let vector_row_ids = changes.vectors.iter().map(|v| v.row_id).collect::<Vec<_>>();
2443        let mut vector_row_map: HashMap<u64, u64> = HashMap::new();
2444        let mut vector_row_idx = 0usize;
2445        let mut failed_row_ids: HashSet<u64> = HashSet::new();
2446        let mut table_meta_cache: HashMap<String, Option<TableMeta>> = HashMap::new();
2447        let mut visible_rows_cache: HashMap<String, Vec<VersionedRow>> = HashMap::new();
2448
2449        for ddl in changes.ddl.clone() {
2450            match ddl {
2451                DdlChange::CreateTable {
2452                    name,
2453                    columns,
2454                    constraints,
2455                } => {
2456                    if self.table_meta(&name).is_some() {
2457                        if let Some(local_meta) = self.table_meta(&name) {
2458                            let local_cols: Vec<(String, String)> = local_meta
2459                                .columns
2460                                .iter()
2461                                .map(|c| {
2462                                    let ty = match c.column_type {
2463                                        ColumnType::Integer => "INTEGER".to_string(),
2464                                        ColumnType::Real => "REAL".to_string(),
2465                                        ColumnType::Text => "TEXT".to_string(),
2466                                        ColumnType::Boolean => "BOOLEAN".to_string(),
2467                                        ColumnType::Json => "JSON".to_string(),
2468                                        ColumnType::Uuid => "UUID".to_string(),
2469                                        ColumnType::Vector(dim) => format!("VECTOR({dim})"),
2470                                        ColumnType::Timestamp => "TIMESTAMP".to_string(),
2471                                    };
2472                                    (c.name.clone(), ty)
2473                                })
2474                                .collect();
2475                            let remote_cols: Vec<(String, String)> = columns
2476                                .iter()
2477                                .map(|(col_name, col_type)| {
2478                                    let base_type = col_type
2479                                        .split_whitespace()
2480                                        .next()
2481                                        .unwrap_or(col_type)
2482                                        .to_string();
2483                                    (col_name.clone(), base_type)
2484                                })
2485                                .collect();
2486                            let mut local_sorted = local_cols.clone();
2487                            local_sorted.sort();
2488                            let mut remote_sorted = remote_cols.clone();
2489                            remote_sorted.sort();
2490                            if local_sorted != remote_sorted {
2491                                result.conflicts.push(Conflict {
2492                                    natural_key: NaturalKey {
2493                                        column: "table".to_string(),
2494                                        value: Value::Text(name.clone()),
2495                                    },
2496                                    resolution: ConflictPolicy::ServerWins,
2497                                    reason: Some(format!(
2498                                        "schema mismatch: local columns {:?} differ from remote {:?}",
2499                                        local_cols, remote_cols
2500                                    )),
2501                                });
2502                            }
2503                        }
2504                        continue;
2505                    }
2506                    let mut sql = format!(
2507                        "CREATE TABLE {} ({})",
2508                        name,
2509                        columns
2510                            .iter()
2511                            .map(|(col, ty)| format!("{col} {ty}"))
2512                            .collect::<Vec<_>>()
2513                            .join(", ")
2514                    );
2515                    if !constraints.is_empty() {
2516                        sql.push(' ');
2517                        sql.push_str(&constraints.join(" "));
2518                    }
2519                    self.execute_in_tx(tx, &sql, &HashMap::new())?;
2520                    table_meta_cache.remove(&name);
2521                    visible_rows_cache.remove(&name);
2522                }
2523                DdlChange::DropTable { name } => {
2524                    if self.table_meta(&name).is_some() {
2525                        self.drop_table_aux_state(&name);
2526                        self.relational_store().drop_table(&name);
2527                        self.remove_persisted_table(&name)?;
2528                    }
2529                    table_meta_cache.remove(&name);
2530                    visible_rows_cache.remove(&name);
2531                }
2532                DdlChange::AlterTable {
2533                    name,
2534                    columns,
2535                    constraints,
2536                } => {
2537                    if self.table_meta(&name).is_none() {
2538                        continue;
2539                    }
2540                    self.relational_store().table_meta.write().remove(&name);
2541                    let mut sql = format!(
2542                        "CREATE TABLE {} ({})",
2543                        name,
2544                        columns
2545                            .iter()
2546                            .map(|(col, ty)| format!("{col} {ty}"))
2547                            .collect::<Vec<_>>()
2548                            .join(", ")
2549                    );
2550                    if !constraints.is_empty() {
2551                        sql.push(' ');
2552                        sql.push_str(&constraints.join(" "));
2553                    }
2554                    self.execute_in_tx(tx, &sql, &HashMap::new())?;
2555                    table_meta_cache.remove(&name);
2556                    visible_rows_cache.remove(&name);
2557                }
2558            }
2559        }
2560
2561        self.preflight_sync_apply_memory(&changes, policies)?;
2562
2563        for row in changes.rows {
2564            if row.values.is_empty() {
2565                result.skipped_rows += 1;
2566                self.commit_with_source(tx, CommitSource::SyncPull)?;
2567                tx = self.begin();
2568                continue;
2569            }
2570
2571            let policy = policies
2572                .per_table
2573                .get(&row.table)
2574                .copied()
2575                .unwrap_or(policies.default);
2576
2577            let existing = cached_point_lookup(
2578                self,
2579                &mut visible_rows_cache,
2580                &row.table,
2581                &row.natural_key.column,
2582                &row.natural_key.value,
2583            )?;
2584            let is_delete = row.deleted;
2585            let row_has_vector = cached_table_meta(self, &mut table_meta_cache, &row.table)
2586                .is_some_and(|meta| {
2587                    meta.columns
2588                        .iter()
2589                        .any(|col| matches!(col.column_type, ColumnType::Vector(_)))
2590                });
2591
2592            if is_delete {
2593                if let Some(local) = existing {
2594                    if row_has_vector
2595                        && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2596                    {
2597                        vector_row_map.insert(*remote_row_id, local.row_id);
2598                        vector_row_idx += 1;
2599                    }
2600                    if let Err(err) = self.delete_row(tx, &row.table, local.row_id) {
2601                        result.conflicts.push(Conflict {
2602                            natural_key: row.natural_key.clone(),
2603                            resolution: policy,
2604                            reason: Some(format!("delete failed: {err}")),
2605                        });
2606                        result.skipped_rows += 1;
2607                    } else {
2608                        remove_cached_row(&mut visible_rows_cache, &row.table, local.row_id);
2609                        result.applied_rows += 1;
2610                    }
2611                } else {
2612                    result.skipped_rows += 1;
2613                }
2614                self.commit_with_source(tx, CommitSource::SyncPull)?;
2615                tx = self.begin();
2616                continue;
2617            }
2618
2619            let mut values = row.values.clone();
2620            values.remove("__deleted");
2621
2622            match (existing, policy) {
2623                (None, _) => {
2624                    if let Some(meta) = cached_table_meta(self, &mut table_meta_cache, &row.table) {
2625                        let mut constraint_error: Option<String> = None;
2626
2627                        for col_def in &meta.columns {
2628                            if !col_def.nullable
2629                                && !col_def.primary_key
2630                                && col_def.default.is_none()
2631                            {
2632                                match values.get(&col_def.name) {
2633                                    None | Some(Value::Null) => {
2634                                        constraint_error = Some(format!(
2635                                            "NOT NULL constraint violated: {}.{}",
2636                                            row.table, col_def.name
2637                                        ));
2638                                        break;
2639                                    }
2640                                    _ => {}
2641                                }
2642                            }
2643                        }
2644
2645                        let has_unique = meta.columns.iter().any(|c| c.unique && !c.primary_key);
2646                        if constraint_error.is_none() && has_unique {
2647                            for col_def in &meta.columns {
2648                                if col_def.unique
2649                                    && !col_def.primary_key
2650                                    && let Some(new_val) = values.get(&col_def.name)
2651                                    && *new_val != Value::Null
2652                                    && cached_visible_rows(
2653                                        self,
2654                                        &mut visible_rows_cache,
2655                                        &row.table,
2656                                    )?
2657                                    .iter()
2658                                    .any(|r| r.values.get(&col_def.name) == Some(new_val))
2659                                {
2660                                    constraint_error = Some(format!(
2661                                        "UNIQUE constraint violated: {}.{}",
2662                                        row.table, col_def.name
2663                                    ));
2664                                    break;
2665                                }
2666                            }
2667                        }
2668
2669                        if let Some(err_msg) = constraint_error {
2670                            result.skipped_rows += 1;
2671                            if row_has_vector
2672                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2673                            {
2674                                failed_row_ids.insert(*remote_row_id);
2675                                vector_row_idx += 1;
2676                            }
2677                            result.conflicts.push(Conflict {
2678                                natural_key: row.natural_key.clone(),
2679                                resolution: policy,
2680                                reason: Some(err_msg),
2681                            });
2682                            self.commit_with_source(tx, CommitSource::SyncPull)?;
2683                            tx = self.begin();
2684                            continue;
2685                        }
2686                    }
2687
2688                    match self.insert_row(tx, &row.table, values.clone()) {
2689                        Ok(new_row_id) => {
2690                            record_cached_insert(
2691                                &mut visible_rows_cache,
2692                                &row.table,
2693                                VersionedRow {
2694                                    row_id: new_row_id,
2695                                    values: values.clone(),
2696                                    created_tx: tx,
2697                                    deleted_tx: None,
2698                                    lsn: row.lsn,
2699                                    created_at: None,
2700                                },
2701                            );
2702                            result.applied_rows += 1;
2703                            if row_has_vector
2704                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2705                            {
2706                                vector_row_map.insert(*remote_row_id, new_row_id);
2707                                vector_row_idx += 1;
2708                            }
2709                        }
2710                        Err(err) => {
2711                            if is_fatal_sync_apply_error(&err) {
2712                                return Err(err);
2713                            }
2714                            result.skipped_rows += 1;
2715                            if row_has_vector
2716                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2717                            {
2718                                failed_row_ids.insert(*remote_row_id);
2719                                vector_row_idx += 1;
2720                            }
2721                            result.conflicts.push(Conflict {
2722                                natural_key: row.natural_key.clone(),
2723                                resolution: policy,
2724                                reason: Some(format!("{err}")),
2725                            });
2726                        }
2727                    }
2728                }
2729                (Some(local), ConflictPolicy::InsertIfNotExists) => {
2730                    if row_has_vector
2731                        && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2732                    {
2733                        vector_row_map.insert(*remote_row_id, local.row_id);
2734                        vector_row_idx += 1;
2735                    }
2736                    result.skipped_rows += 1;
2737                }
2738                (Some(_), ConflictPolicy::ServerWins) => {
2739                    result.skipped_rows += 1;
2740                    if row_has_vector
2741                        && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2742                    {
2743                        failed_row_ids.insert(*remote_row_id);
2744                        vector_row_idx += 1;
2745                    }
2746                    result.conflicts.push(Conflict {
2747                        natural_key: row.natural_key.clone(),
2748                        resolution: ConflictPolicy::ServerWins,
2749                        reason: Some("server_wins".to_string()),
2750                    });
2751                }
2752                (Some(local), ConflictPolicy::LatestWins) => {
2753                    if row.lsn <= local.lsn {
2754                        result.skipped_rows += 1;
2755                        if row_has_vector
2756                            && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2757                        {
2758                            failed_row_ids.insert(*remote_row_id);
2759                            vector_row_idx += 1;
2760                        }
2761                        result.conflicts.push(Conflict {
2762                            natural_key: row.natural_key.clone(),
2763                            resolution: ConflictPolicy::LatestWins,
2764                            reason: Some("local_lsn_newer_or_equal".to_string()),
2765                        });
2766                    } else {
2767                        // State machine conflict detection
2768                        if let Some(meta) = self.table_meta(&row.table)
2769                            && let Some(sm) = &meta.state_machine
2770                        {
2771                            let sm_col = sm.column.clone();
2772                            let transitions = sm.transitions.clone();
2773                            let incoming_state = values.get(&sm_col).and_then(|v| match v {
2774                                Value::Text(s) => Some(s.clone()),
2775                                _ => None,
2776                            });
2777                            let local_state = local.values.get(&sm_col).and_then(|v| match v {
2778                                Value::Text(s) => Some(s.clone()),
2779                                _ => None,
2780                            });
2781
2782                            if let (Some(incoming), Some(current)) = (incoming_state, local_state) {
2783                                // Check if the transition from current to incoming is valid
2784                                let valid = transitions
2785                                    .get(&current)
2786                                    .is_some_and(|targets| targets.contains(&incoming));
2787                                if !valid && incoming != current {
2788                                    result.skipped_rows += 1;
2789                                    if row_has_vector
2790                                        && let Some(remote_row_id) =
2791                                            vector_row_ids.get(vector_row_idx)
2792                                    {
2793                                        failed_row_ids.insert(*remote_row_id);
2794                                        vector_row_idx += 1;
2795                                    }
2796                                    result.conflicts.push(Conflict {
2797                                        natural_key: row.natural_key.clone(),
2798                                        resolution: ConflictPolicy::LatestWins,
2799                                        reason: Some(format!(
2800                                            "state_machine: invalid transition {} -> {} (current: {})",
2801                                            current, incoming, current
2802                                        )),
2803                                    });
2804                                    self.commit_with_source(tx, CommitSource::SyncPull)?;
2805                                    tx = self.begin();
2806                                    continue;
2807                                }
2808                            }
2809                        }
2810
2811                        match self.upsert_row(
2812                            tx,
2813                            &row.table,
2814                            &row.natural_key.column,
2815                            values.clone(),
2816                        ) {
2817                            Ok(_) => {
2818                                visible_rows_cache.remove(&row.table);
2819                                result.applied_rows += 1;
2820                                if row_has_vector
2821                                    && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2822                                {
2823                                    if let Ok(Some(found)) = self.point_lookup_in_tx(
2824                                        tx,
2825                                        &row.table,
2826                                        &row.natural_key.column,
2827                                        &row.natural_key.value,
2828                                        self.snapshot(),
2829                                    ) {
2830                                        vector_row_map.insert(*remote_row_id, found.row_id);
2831                                    }
2832                                    vector_row_idx += 1;
2833                                }
2834                            }
2835                            Err(err) => {
2836                                if is_fatal_sync_apply_error(&err) {
2837                                    return Err(err);
2838                                }
2839                                result.skipped_rows += 1;
2840                                if row_has_vector
2841                                    && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2842                                {
2843                                    failed_row_ids.insert(*remote_row_id);
2844                                    vector_row_idx += 1;
2845                                }
2846                                result.conflicts.push(Conflict {
2847                                    natural_key: row.natural_key.clone(),
2848                                    resolution: ConflictPolicy::LatestWins,
2849                                    reason: Some(format!("state_machine_or_constraint: {err}")),
2850                                });
2851                            }
2852                        }
2853                    }
2854                }
2855                (Some(_), ConflictPolicy::EdgeWins) => {
2856                    result.conflicts.push(Conflict {
2857                        natural_key: row.natural_key.clone(),
2858                        resolution: ConflictPolicy::EdgeWins,
2859                        reason: Some("edge_wins".to_string()),
2860                    });
2861                    match self.upsert_row(tx, &row.table, &row.natural_key.column, values.clone()) {
2862                        Ok(_) => {
2863                            visible_rows_cache.remove(&row.table);
2864                            result.applied_rows += 1;
2865                            if row_has_vector
2866                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2867                            {
2868                                if let Ok(Some(found)) = self.point_lookup_in_tx(
2869                                    tx,
2870                                    &row.table,
2871                                    &row.natural_key.column,
2872                                    &row.natural_key.value,
2873                                    self.snapshot(),
2874                                ) {
2875                                    vector_row_map.insert(*remote_row_id, found.row_id);
2876                                }
2877                                vector_row_idx += 1;
2878                            }
2879                        }
2880                        Err(err) => {
2881                            if is_fatal_sync_apply_error(&err) {
2882                                return Err(err);
2883                            }
2884                            result.skipped_rows += 1;
2885                            if row_has_vector
2886                                && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2887                            {
2888                                failed_row_ids.insert(*remote_row_id);
2889                                vector_row_idx += 1;
2890                            }
2891                            if let Some(last) = result.conflicts.last_mut() {
2892                                last.reason = Some(format!("state_machine_or_constraint: {err}"));
2893                            }
2894                        }
2895                    }
2896                }
2897            }
2898
2899            self.commit_with_source(tx, CommitSource::SyncPull)?;
2900            tx = self.begin();
2901        }
2902
2903        for edge in changes.edges {
2904            let is_delete = matches!(edge.properties.get("__deleted"), Some(Value::Bool(true)));
2905            if is_delete {
2906                let _ = self.delete_edge(tx, edge.source, edge.target, &edge.edge_type);
2907            } else {
2908                let _ = self.insert_edge(
2909                    tx,
2910                    edge.source,
2911                    edge.target,
2912                    edge.edge_type,
2913                    edge.properties,
2914                );
2915            }
2916        }
2917
2918        for vector in changes.vectors {
2919            if failed_row_ids.contains(&vector.row_id) {
2920                continue; // skip vectors for rows that failed to insert
2921            }
2922            let local_row_id = vector_row_map
2923                .get(&vector.row_id)
2924                .copied()
2925                .unwrap_or(vector.row_id);
2926            if vector.vector.is_empty() {
2927                let _ = self.vector.delete_vector(tx, local_row_id);
2928            } else {
2929                if self.has_live_vector(local_row_id, self.snapshot()) {
2930                    let _ = self.delete_vector(tx, local_row_id);
2931                }
2932                if let Err(err) = self.insert_vector(tx, local_row_id, vector.vector)
2933                    && is_fatal_sync_apply_error(&err)
2934                {
2935                    return Err(err);
2936                }
2937            }
2938        }
2939
2940        self.commit_with_source(tx, CommitSource::SyncPull)?;
2941        result.new_lsn = self.current_lsn();
2942        Ok(result)
2943    }
2944
2945    fn row_change_values(
2946        &self,
2947        table: &str,
2948        row_id: RowId,
2949    ) -> Option<(NaturalKey, HashMap<String, Value>)> {
2950        let tables = self.relational_store.tables.read();
2951        let meta = self.relational_store.table_meta.read();
2952        let rows = tables.get(table)?;
2953        let row = rows.iter().find(|r| r.row_id == row_id)?;
2954        let key_col = meta
2955            .get(table)
2956            .and_then(|m| m.natural_key_column.clone())
2957            .or_else(|| {
2958                meta.get(table).and_then(|m| {
2959                    m.columns
2960                        .iter()
2961                        .find(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
2962                        .map(|_| "id".to_string())
2963                })
2964            })?;
2965
2966        let key_val = row.values.get(&key_col)?.clone();
2967        let values = row
2968            .values
2969            .iter()
2970            .map(|(k, v)| (k.clone(), v.clone()))
2971            .collect::<HashMap<_, _>>();
2972        Some((
2973            NaturalKey {
2974                column: key_col,
2975                value: key_val,
2976            },
2977            values,
2978        ))
2979    }
2980
2981    fn edge_properties(
2982        &self,
2983        source: NodeId,
2984        target: NodeId,
2985        edge_type: &str,
2986        lsn: u64,
2987    ) -> Option<HashMap<String, Value>> {
2988        self.graph_store
2989            .forward_adj
2990            .read()
2991            .get(&source)
2992            .and_then(|entries| {
2993                entries
2994                    .iter()
2995                    .find(|e| e.target == target && e.edge_type == edge_type && e.lsn == lsn)
2996                    .map(|e| e.properties.clone())
2997            })
2998    }
2999
3000    fn vector_for_row_lsn(&self, row_id: RowId, lsn: u64) -> Option<Vec<f32>> {
3001        self.vector_store
3002            .vectors
3003            .read()
3004            .iter()
3005            .find(|v| v.row_id == row_id && v.lsn == lsn)
3006            .map(|v| v.vector.clone())
3007    }
3008}
3009
3010fn strip_internal_row_id(mut qr: QueryResult) -> QueryResult {
3011    if let Some(pos) = qr.columns.iter().position(|c| c == "row_id") {
3012        qr.columns.remove(pos);
3013        for row in &mut qr.rows {
3014            if pos < row.len() {
3015                row.remove(pos);
3016            }
3017        }
3018    }
3019    qr
3020}
3021
3022fn cached_table_meta(
3023    db: &Database,
3024    cache: &mut HashMap<String, Option<TableMeta>>,
3025    table: &str,
3026) -> Option<TableMeta> {
3027    cache
3028        .entry(table.to_string())
3029        .or_insert_with(|| db.table_meta(table))
3030        .clone()
3031}
3032
3033fn cached_visible_rows<'a>(
3034    db: &Database,
3035    cache: &'a mut HashMap<String, Vec<VersionedRow>>,
3036    table: &str,
3037) -> Result<&'a mut Vec<VersionedRow>> {
3038    if !cache.contains_key(table) {
3039        let rows = db.scan(table, db.snapshot())?;
3040        cache.insert(table.to_string(), rows);
3041    }
3042    Ok(cache.get_mut(table).expect("cached visible rows"))
3043}
3044
3045fn cached_point_lookup(
3046    db: &Database,
3047    cache: &mut HashMap<String, Vec<VersionedRow>>,
3048    table: &str,
3049    col: &str,
3050    value: &Value,
3051) -> Result<Option<VersionedRow>> {
3052    let rows = cached_visible_rows(db, cache, table)?;
3053    Ok(rows
3054        .iter()
3055        .find(|r| r.values.get(col) == Some(value))
3056        .cloned())
3057}
3058
3059fn record_cached_insert(
3060    cache: &mut HashMap<String, Vec<VersionedRow>>,
3061    table: &str,
3062    row: VersionedRow,
3063) {
3064    if let Some(rows) = cache.get_mut(table) {
3065        rows.push(row);
3066    }
3067}
3068
3069fn remove_cached_row(cache: &mut HashMap<String, Vec<VersionedRow>>, table: &str, row_id: RowId) {
3070    if let Some(rows) = cache.get_mut(table) {
3071        rows.retain(|row| row.row_id != row_id);
3072    }
3073}
3074
3075fn query_outcome_from_result(result: &Result<QueryResult>) -> QueryOutcome {
3076    match result {
3077        Ok(query_result) => QueryOutcome::Success {
3078            row_count: if query_result.rows.is_empty() {
3079                query_result.rows_affected as usize
3080            } else {
3081                query_result.rows.len()
3082            },
3083        },
3084        Err(error) => QueryOutcome::Error {
3085            error: error.to_string(),
3086        },
3087    }
3088}
3089
3090fn maybe_prebuild_hnsw(vector_store: &VectorStore, accountant: &MemoryAccountant) {
3091    if vector_store.vector_count() >= 1000 {
3092        let entries = vector_store.all_entries();
3093        let dim = vector_store.dimension().unwrap_or(0);
3094        let estimated_bytes = estimate_hnsw_index_bytes(entries.len(), dim);
3095        if accountant
3096            .try_allocate_for(
3097                estimated_bytes,
3098                "vector_index",
3099                "prebuild_hnsw",
3100                "Open the database with a larger MEMORY_LIMIT to enable HNSW indexing.",
3101            )
3102            .is_err()
3103        {
3104            return;
3105        }
3106
3107        let hnsw_opt = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
3108            HnswIndex::new(&entries, dim)
3109        }))
3110        .ok();
3111        if hnsw_opt.is_some() {
3112            vector_store.set_hnsw_bytes(estimated_bytes);
3113        } else {
3114            accountant.release(estimated_bytes);
3115        }
3116        let _ = vector_store.hnsw.set(RwLock::new(hnsw_opt));
3117    }
3118}
3119
3120fn estimate_row_bytes_for_meta(
3121    values: &HashMap<ColName, Value>,
3122    meta: &TableMeta,
3123    include_vectors: bool,
3124) -> usize {
3125    let mut bytes = 96usize;
3126    for column in &meta.columns {
3127        let Some(value) = values.get(&column.name) else {
3128            continue;
3129        };
3130        if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
3131            continue;
3132        }
3133        bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
3134    }
3135    bytes
3136}
3137
3138fn estimate_vector_bytes(vector: &[f32]) -> usize {
3139    24 + vector.len().saturating_mul(std::mem::size_of::<f32>())
3140}
3141
3142fn estimate_edge_bytes(
3143    source: NodeId,
3144    target: NodeId,
3145    edge_type: &str,
3146    properties: &HashMap<String, Value>,
3147) -> usize {
3148    AdjEntry {
3149        source,
3150        target,
3151        edge_type: edge_type.to_string(),
3152        properties: properties.clone(),
3153        created_tx: 0,
3154        deleted_tx: None,
3155        lsn: 0,
3156    }
3157    .estimated_bytes()
3158}
3159
3160fn estimate_hnsw_index_bytes(entry_count: usize, dimension: usize) -> usize {
3161    entry_count
3162        .saturating_mul(dimension)
3163        .saturating_mul(std::mem::size_of::<f32>())
3164        .saturating_mul(3)
3165}
3166
3167impl Drop for Database {
3168    fn drop(&mut self) {
3169        if self.closed.swap(true, Ordering::SeqCst) {
3170            return;
3171        }
3172        let runtime = self.pruning_runtime.get_mut();
3173        runtime.shutdown.store(true, Ordering::SeqCst);
3174        if let Some(handle) = runtime.handle.take() {
3175            let _ = handle.join();
3176        }
3177        self.subscriptions.lock().subscribers.clear();
3178        if let Some(persistence) = &self.persistence {
3179            persistence.close();
3180        }
3181    }
3182}
3183
3184fn sleep_with_shutdown(shutdown: &AtomicBool, interval: Duration) {
3185    let deadline = Instant::now() + interval;
3186    while !shutdown.load(Ordering::SeqCst) {
3187        let now = Instant::now();
3188        if now >= deadline {
3189            break;
3190        }
3191        let remaining = deadline.saturating_duration_since(now);
3192        thread::sleep(remaining.min(Duration::from_millis(50)));
3193    }
3194}
3195
3196fn prune_expired_rows(
3197    relational_store: &Arc<RelationalStore>,
3198    graph_store: &Arc<GraphStore>,
3199    vector_store: &Arc<VectorStore>,
3200    accountant: &MemoryAccountant,
3201    persistence: Option<&Arc<RedbPersistence>>,
3202    sync_watermark: u64,
3203) -> u64 {
3204    let now_millis = current_epoch_millis();
3205    let metas = relational_store.table_meta.read().clone();
3206    let mut pruned_by_table: HashMap<String, Vec<RowId>> = HashMap::new();
3207    let mut pruned_node_ids = HashSet::new();
3208    let mut released_row_bytes = 0usize;
3209
3210    {
3211        let mut tables = relational_store.tables.write();
3212        for (table_name, rows) in tables.iter_mut() {
3213            let Some(meta) = metas.get(table_name) else {
3214                continue;
3215            };
3216            if meta.default_ttl_seconds.is_none() {
3217                continue;
3218            }
3219
3220            rows.retain(|row| {
3221                if !row_is_prunable(row, meta, now_millis, sync_watermark) {
3222                    return true;
3223                }
3224
3225                pruned_by_table
3226                    .entry(table_name.clone())
3227                    .or_default()
3228                    .push(row.row_id);
3229                released_row_bytes = released_row_bytes
3230                    .saturating_add(estimate_row_bytes_for_meta(&row.values, meta, false));
3231                if let Some(Value::Uuid(id)) = row.values.get("id") {
3232                    pruned_node_ids.insert(*id);
3233                }
3234                false
3235            });
3236        }
3237    }
3238
3239    let pruned_row_ids: HashSet<RowId> = pruned_by_table
3240        .values()
3241        .flat_map(|rows| rows.iter().copied())
3242        .collect();
3243    if pruned_row_ids.is_empty() {
3244        return 0;
3245    }
3246
3247    let mut released_vector_bytes = 0usize;
3248    {
3249        let mut vectors = vector_store.vectors.write();
3250        vectors.retain(|entry| {
3251            if pruned_row_ids.contains(&entry.row_id) {
3252                released_vector_bytes =
3253                    released_vector_bytes.saturating_add(entry.estimated_bytes());
3254                false
3255            } else {
3256                true
3257            }
3258        });
3259    }
3260    if let Some(hnsw) = vector_store.hnsw.get() {
3261        *hnsw.write() = None;
3262    }
3263
3264    let mut released_edge_bytes = 0usize;
3265    {
3266        let mut forward = graph_store.forward_adj.write();
3267        for entries in forward.values_mut() {
3268            entries.retain(|entry| {
3269                if pruned_node_ids.contains(&entry.source)
3270                    || pruned_node_ids.contains(&entry.target)
3271                {
3272                    released_edge_bytes =
3273                        released_edge_bytes.saturating_add(entry.estimated_bytes());
3274                    false
3275                } else {
3276                    true
3277                }
3278            });
3279        }
3280        forward.retain(|_, entries| !entries.is_empty());
3281    }
3282    {
3283        let mut reverse = graph_store.reverse_adj.write();
3284        for entries in reverse.values_mut() {
3285            entries.retain(|entry| {
3286                !pruned_node_ids.contains(&entry.source) && !pruned_node_ids.contains(&entry.target)
3287            });
3288        }
3289        reverse.retain(|_, entries| !entries.is_empty());
3290    }
3291
3292    if let Some(persistence) = persistence {
3293        for table_name in pruned_by_table.keys() {
3294            let rows = relational_store
3295                .tables
3296                .read()
3297                .get(table_name)
3298                .cloned()
3299                .unwrap_or_default();
3300            let _ = persistence.rewrite_table_rows(table_name, &rows);
3301        }
3302
3303        let vectors = vector_store.vectors.read().clone();
3304        let edges = graph_store
3305            .forward_adj
3306            .read()
3307            .values()
3308            .flat_map(|entries| entries.iter().cloned())
3309            .collect::<Vec<_>>();
3310        let _ = persistence.rewrite_vectors(&vectors);
3311        let _ = persistence.rewrite_graph_edges(&edges);
3312    }
3313
3314    accountant.release(
3315        released_row_bytes
3316            .saturating_add(released_vector_bytes)
3317            .saturating_add(released_edge_bytes),
3318    );
3319
3320    pruned_row_ids.len() as u64
3321}
3322
3323fn row_is_prunable(
3324    row: &VersionedRow,
3325    meta: &TableMeta,
3326    now_millis: u64,
3327    sync_watermark: u64,
3328) -> bool {
3329    if meta.sync_safe && row.lsn >= sync_watermark {
3330        return false;
3331    }
3332
3333    let Some(default_ttl_seconds) = meta.default_ttl_seconds else {
3334        return false;
3335    };
3336
3337    if let Some(expires_column) = &meta.expires_column {
3338        match row.values.get(expires_column) {
3339            Some(Value::Timestamp(millis)) if *millis == i64::MAX => return false,
3340            Some(Value::Timestamp(millis)) if *millis < 0 => return true,
3341            Some(Value::Timestamp(millis)) => return (*millis as u64) <= now_millis,
3342            Some(Value::Null) | None => {}
3343            Some(_) => {}
3344        }
3345    }
3346
3347    let ttl_millis = default_ttl_seconds.saturating_mul(1000);
3348    row.created_at
3349        .map(|created_at| now_millis.saturating_sub(created_at) > ttl_millis)
3350        .unwrap_or(false)
3351}
3352
3353fn current_epoch_millis() -> u64 {
3354    std::time::SystemTime::now()
3355        .duration_since(std::time::UNIX_EPOCH)
3356        .unwrap_or_default()
3357        .as_millis() as u64
3358}
3359
3360fn max_tx_across_all(
3361    relational: &RelationalStore,
3362    graph: &GraphStore,
3363    vector: &VectorStore,
3364) -> TxId {
3365    let relational_max = relational
3366        .tables
3367        .read()
3368        .values()
3369        .flat_map(|rows| rows.iter())
3370        .flat_map(|row| std::iter::once(row.created_tx).chain(row.deleted_tx))
3371        .max()
3372        .unwrap_or(0);
3373    let graph_max = graph
3374        .forward_adj
3375        .read()
3376        .values()
3377        .flat_map(|entries| entries.iter())
3378        .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
3379        .max()
3380        .unwrap_or(0);
3381    let vector_max = vector
3382        .vectors
3383        .read()
3384        .iter()
3385        .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
3386        .max()
3387        .unwrap_or(0);
3388
3389    relational_max.max(graph_max).max(vector_max)
3390}
3391
3392fn max_lsn_across_all(
3393    relational: &RelationalStore,
3394    graph: &GraphStore,
3395    vector: &VectorStore,
3396) -> u64 {
3397    let relational_max = relational
3398        .tables
3399        .read()
3400        .values()
3401        .flat_map(|rows| rows.iter().map(|row| row.lsn))
3402        .max()
3403        .unwrap_or(0);
3404    let graph_max = graph
3405        .forward_adj
3406        .read()
3407        .values()
3408        .flat_map(|entries| entries.iter().map(|entry| entry.lsn))
3409        .max()
3410        .unwrap_or(0);
3411    let vector_max = vector
3412        .vectors
3413        .read()
3414        .iter()
3415        .map(|entry| entry.lsn)
3416        .max()
3417        .unwrap_or(0);
3418
3419    relational_max.max(graph_max).max(vector_max)
3420}
3421
3422fn persisted_memory_limit(path: &Path) -> Result<Option<usize>> {
3423    if !path.exists() {
3424        return Ok(None);
3425    }
3426    let persistence = RedbPersistence::open(path)?;
3427    let limit = persistence.load_config_value::<usize>("memory_limit")?;
3428    persistence.close();
3429    Ok(limit)
3430}
3431
3432fn is_fatal_sync_apply_error(err: &Error) -> bool {
3433    matches!(
3434        err,
3435        Error::MemoryBudgetExceeded { .. } | Error::DiskBudgetExceeded { .. }
3436    )
3437}
3438
3439fn ddl_change_from_create_table(ct: &CreateTable) -> DdlChange {
3440    DdlChange::CreateTable {
3441        name: ct.name.clone(),
3442        columns: ct
3443            .columns
3444            .iter()
3445            .map(|col| {
3446                (
3447                    col.name.clone(),
3448                    sql_type_for_ast_column(col, &ct.propagation_rules),
3449                )
3450            })
3451            .collect(),
3452        constraints: create_table_constraints_from_ast(ct),
3453    }
3454}
3455
3456fn ddl_change_from_meta(name: &str, meta: &TableMeta) -> DdlChange {
3457    DdlChange::CreateTable {
3458        name: name.to_string(),
3459        columns: meta
3460            .columns
3461            .iter()
3462            .map(|col| {
3463                (
3464                    col.name.clone(),
3465                    sql_type_for_meta_column(col, &meta.propagation_rules),
3466                )
3467            })
3468            .collect(),
3469        constraints: create_table_constraints_from_meta(meta),
3470    }
3471}
3472
3473fn sql_type_for_ast(data_type: &DataType) -> String {
3474    match data_type {
3475        DataType::Uuid => "UUID".to_string(),
3476        DataType::Text => "TEXT".to_string(),
3477        DataType::Integer => "INTEGER".to_string(),
3478        DataType::Real => "REAL".to_string(),
3479        DataType::Boolean => "BOOLEAN".to_string(),
3480        DataType::Timestamp => "TIMESTAMP".to_string(),
3481        DataType::Json => "JSON".to_string(),
3482        DataType::Vector(dim) => format!("VECTOR({dim})"),
3483    }
3484}
3485
3486fn sql_type_for_ast_column(
3487    col: &contextdb_parser::ast::ColumnDef,
3488    _rules: &[contextdb_parser::ast::AstPropagationRule],
3489) -> String {
3490    let mut ty = sql_type_for_ast(&col.data_type);
3491    if let Some(reference) = &col.references {
3492        ty.push_str(&format!(
3493            " REFERENCES {}({})",
3494            reference.table, reference.column
3495        ));
3496        for rule in &reference.propagation_rules {
3497            if let contextdb_parser::ast::AstPropagationRule::FkState {
3498                trigger_state,
3499                target_state,
3500                max_depth,
3501                abort_on_failure,
3502            } = rule
3503            {
3504                ty.push_str(&format!(
3505                    " ON STATE {} PROPAGATE SET {}",
3506                    trigger_state, target_state
3507                ));
3508                if max_depth.unwrap_or(10) != 10 {
3509                    ty.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
3510                }
3511                if *abort_on_failure {
3512                    ty.push_str(" ABORT ON FAILURE");
3513                }
3514            }
3515        }
3516    }
3517    if col.primary_key {
3518        ty.push_str(" PRIMARY KEY");
3519    }
3520    if !col.nullable && !col.primary_key {
3521        ty.push_str(" NOT NULL");
3522    }
3523    if col.unique {
3524        ty.push_str(" UNIQUE");
3525    }
3526    ty
3527}
3528
3529fn sql_type_for_meta_column(col: &contextdb_core::ColumnDef, rules: &[PropagationRule]) -> String {
3530    let mut ty = match col.column_type {
3531        ColumnType::Integer => "INTEGER".to_string(),
3532        ColumnType::Real => "REAL".to_string(),
3533        ColumnType::Text => "TEXT".to_string(),
3534        ColumnType::Boolean => "BOOLEAN".to_string(),
3535        ColumnType::Json => "JSON".to_string(),
3536        ColumnType::Uuid => "UUID".to_string(),
3537        ColumnType::Vector(dim) => format!("VECTOR({dim})"),
3538        ColumnType::Timestamp => "TIMESTAMP".to_string(),
3539    };
3540
3541    let fk_rules = rules
3542        .iter()
3543        .filter_map(|rule| match rule {
3544            PropagationRule::ForeignKey {
3545                fk_column,
3546                referenced_table,
3547                referenced_column,
3548                trigger_state,
3549                target_state,
3550                max_depth,
3551                abort_on_failure,
3552            } if fk_column == &col.name => Some((
3553                referenced_table,
3554                referenced_column,
3555                trigger_state,
3556                target_state,
3557                *max_depth,
3558                *abort_on_failure,
3559            )),
3560            _ => None,
3561        })
3562        .collect::<Vec<_>>();
3563
3564    if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
3565        ty.push_str(&format!(
3566            " REFERENCES {}({})",
3567            referenced_table, referenced_column
3568        ));
3569        for (_, _, trigger_state, target_state, max_depth, abort_on_failure) in fk_rules {
3570            ty.push_str(&format!(
3571                " ON STATE {} PROPAGATE SET {}",
3572                trigger_state, target_state
3573            ));
3574            if max_depth != 10 {
3575                ty.push_str(&format!(" MAX DEPTH {max_depth}"));
3576            }
3577            if abort_on_failure {
3578                ty.push_str(" ABORT ON FAILURE");
3579            }
3580        }
3581    }
3582    if col.primary_key {
3583        ty.push_str(" PRIMARY KEY");
3584    }
3585    if !col.nullable && !col.primary_key {
3586        ty.push_str(" NOT NULL");
3587    }
3588    if col.unique {
3589        ty.push_str(" UNIQUE");
3590    }
3591    if col.expires {
3592        ty.push_str(" EXPIRES");
3593    }
3594
3595    ty
3596}
3597
3598fn create_table_constraints_from_ast(ct: &CreateTable) -> Vec<String> {
3599    let mut constraints = Vec::new();
3600
3601    if ct.immutable {
3602        constraints.push("IMMUTABLE".to_string());
3603    }
3604
3605    if let Some(sm) = &ct.state_machine {
3606        let transitions = sm
3607            .transitions
3608            .iter()
3609            .map(|(from, tos)| format!("{from} -> [{}]", tos.join(", ")))
3610            .collect::<Vec<_>>()
3611            .join(", ");
3612        constraints.push(format!("STATE MACHINE ({}: {})", sm.column, transitions));
3613    }
3614
3615    if !ct.dag_edge_types.is_empty() {
3616        let edge_types = ct
3617            .dag_edge_types
3618            .iter()
3619            .map(|edge_type| format!("'{edge_type}'"))
3620            .collect::<Vec<_>>()
3621            .join(", ");
3622        constraints.push(format!("DAG({edge_types})"));
3623    }
3624
3625    if let Some(retain) = &ct.retain {
3626        let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(retain.duration_seconds));
3627        if retain.sync_safe {
3628            clause.push_str(" SYNC SAFE");
3629        }
3630        constraints.push(clause);
3631    }
3632
3633    for rule in &ct.propagation_rules {
3634        match rule {
3635            contextdb_parser::ast::AstPropagationRule::EdgeState {
3636                edge_type,
3637                direction,
3638                trigger_state,
3639                target_state,
3640                max_depth,
3641                abort_on_failure,
3642            } => {
3643                let mut clause = format!(
3644                    "PROPAGATE ON EDGE {} {} STATE {} SET {}",
3645                    edge_type, direction, trigger_state, target_state
3646                );
3647                if max_depth.unwrap_or(10) != 10 {
3648                    clause.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
3649                }
3650                if *abort_on_failure {
3651                    clause.push_str(" ABORT ON FAILURE");
3652                }
3653                constraints.push(clause);
3654            }
3655            contextdb_parser::ast::AstPropagationRule::VectorExclusion { trigger_state } => {
3656                constraints.push(format!(
3657                    "PROPAGATE ON STATE {} EXCLUDE VECTOR",
3658                    trigger_state
3659                ));
3660            }
3661            contextdb_parser::ast::AstPropagationRule::FkState { .. } => {}
3662        }
3663    }
3664
3665    constraints
3666}
3667
3668fn create_table_constraints_from_meta(meta: &TableMeta) -> Vec<String> {
3669    let mut constraints = Vec::new();
3670
3671    if meta.immutable {
3672        constraints.push("IMMUTABLE".to_string());
3673    }
3674
3675    if let Some(sm) = &meta.state_machine {
3676        let states = sm
3677            .transitions
3678            .iter()
3679            .map(|(from, to)| format!("{from} -> [{}]", to.join(", ")))
3680            .collect::<Vec<_>>()
3681            .join(", ");
3682        constraints.push(format!("STATE MACHINE ({}: {})", sm.column, states));
3683    }
3684
3685    if !meta.dag_edge_types.is_empty() {
3686        let edge_types = meta
3687            .dag_edge_types
3688            .iter()
3689            .map(|edge_type| format!("'{edge_type}'"))
3690            .collect::<Vec<_>>()
3691            .join(", ");
3692        constraints.push(format!("DAG({edge_types})"));
3693    }
3694
3695    if let Some(ttl_seconds) = meta.default_ttl_seconds {
3696        let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(ttl_seconds));
3697        if meta.sync_safe {
3698            clause.push_str(" SYNC SAFE");
3699        }
3700        constraints.push(clause);
3701    }
3702
3703    for rule in &meta.propagation_rules {
3704        match rule {
3705            PropagationRule::Edge {
3706                edge_type,
3707                direction,
3708                trigger_state,
3709                target_state,
3710                max_depth,
3711                abort_on_failure,
3712            } => {
3713                let dir = match direction {
3714                    Direction::Incoming => "INCOMING",
3715                    Direction::Outgoing => "OUTGOING",
3716                    Direction::Both => "BOTH",
3717                };
3718                let mut clause = format!(
3719                    "PROPAGATE ON EDGE {} {} STATE {} SET {}",
3720                    edge_type, dir, trigger_state, target_state
3721                );
3722                if *max_depth != 10 {
3723                    clause.push_str(&format!(" MAX DEPTH {max_depth}"));
3724                }
3725                if *abort_on_failure {
3726                    clause.push_str(" ABORT ON FAILURE");
3727                }
3728                constraints.push(clause);
3729            }
3730            PropagationRule::VectorExclusion { trigger_state } => {
3731                constraints.push(format!(
3732                    "PROPAGATE ON STATE {} EXCLUDE VECTOR",
3733                    trigger_state
3734                ));
3735            }
3736            PropagationRule::ForeignKey { .. } => {}
3737        }
3738    }
3739
3740    constraints
3741}
3742
3743fn ttl_seconds_to_sql(seconds: u64) -> String {
3744    if seconds.is_multiple_of(24 * 60 * 60) {
3745        format!("{} DAYS", seconds / (24 * 60 * 60))
3746    } else if seconds.is_multiple_of(60 * 60) {
3747        format!("{} HOURS", seconds / (60 * 60))
3748    } else if seconds.is_multiple_of(60) {
3749        format!("{} MINUTES", seconds / 60)
3750    } else {
3751        format!("{seconds} SECONDS")
3752    }
3753}