1use crate::composite_store::{ChangeLogEntry, CompositeStore};
2use crate::executor::execute_plan;
3use crate::persistence::RedbPersistence;
4use crate::persistent_store::PersistentCompositeStore;
5use crate::plugin::{
6 CommitEvent, CommitSource, CorePlugin, DatabasePlugin, PluginHealth, QueryOutcome,
7 SubscriptionMetrics,
8};
9use crate::schema_enforcer::validate_dml;
10use crate::sync_types::{
11 ApplyResult, ChangeSet, Conflict, ConflictPolicies, ConflictPolicy, DdlChange, EdgeChange,
12 NaturalKey, RowChange, VectorChange,
13};
14use contextdb_core::*;
15use contextdb_graph::{GraphStore, MemGraphExecutor};
16use contextdb_parser::Statement;
17use contextdb_parser::ast::{AlterAction, CreateTable, DataType};
18use contextdb_planner::PhysicalPlan;
19use contextdb_relational::{MemRelationalExecutor, RelationalStore};
20use contextdb_tx::{TxManager, WriteSetApplicator};
21use contextdb_vector::{HnswIndex, MemVectorExecutor, VectorStore};
22use parking_lot::{Mutex, RwLock};
23use roaring::RoaringTreemap;
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::path::Path;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
28use std::sync::{Arc, OnceLock};
29use std::thread::{self, JoinHandle};
30use std::time::{Duration, Instant};
31
32type DynStore = Box<dyn WriteSetApplicator>;
33const DEFAULT_SUBSCRIPTION_CAPACITY: usize = 64;
34const MAX_STATEMENT_CACHE_ENTRIES: usize = 1024;
35
36#[derive(Debug, Clone)]
37pub struct IndexCandidate {
38 pub name: String,
39 pub rejected_reason: std::borrow::Cow<'static, str>,
40}
41
42#[derive(Debug, Clone, Default)]
43pub struct QueryTrace {
44 pub physical_plan: &'static str,
45 pub index_used: Option<String>,
46 pub predicates_pushed: smallvec::SmallVec<[std::borrow::Cow<'static, str>; 4]>,
47 pub indexes_considered: smallvec::SmallVec<[IndexCandidate; 4]>,
48 pub sort_elided: bool,
49}
50
51impl QueryTrace {
52 pub fn scan() -> Self {
56 Self {
57 physical_plan: "Scan",
58 ..Default::default()
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
64pub struct CascadeReport {
65 pub dropped_indexes: Vec<String>,
66}
67
68#[derive(Debug, Clone)]
69pub struct QueryResult {
70 pub columns: Vec<String>,
71 pub rows: Vec<Vec<Value>>,
72 pub rows_affected: u64,
73 pub trace: QueryTrace,
74 pub cascade: Option<CascadeReport>,
75}
76
77#[derive(Debug, Clone)]
78struct CachedStatement {
79 stmt: Statement,
80 plan: PhysicalPlan,
81}
82
83impl QueryResult {
84 pub fn empty() -> Self {
85 Self {
86 columns: vec![],
87 rows: vec![],
88 rows_affected: 0,
89 trace: QueryTrace::scan(),
90 cascade: None,
91 }
92 }
93
94 pub fn empty_with_affected(rows_affected: u64) -> Self {
95 Self {
96 columns: vec![],
97 rows: vec![],
98 rows_affected,
99 trace: QueryTrace::scan(),
100 cascade: None,
101 }
102 }
103}
104
105thread_local! {
106 static SNAPSHOT_OVERRIDE: std::cell::RefCell<Option<SnapshotId>> =
107 const { std::cell::RefCell::new(None) };
108}
109
110pub struct Database {
111 tx_mgr: Arc<TxManager<DynStore>>,
112 relational_store: Arc<RelationalStore>,
113 graph_store: Arc<GraphStore>,
114 vector_store: Arc<VectorStore>,
115 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
116 ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
117 persistence: Option<Arc<RedbPersistence>>,
118 relational: MemRelationalExecutor<DynStore>,
119 graph: MemGraphExecutor<DynStore>,
120 vector: MemVectorExecutor<DynStore>,
121 session_tx: Mutex<Option<TxId>>,
122 instance_id: uuid::Uuid,
123 plugin: Arc<dyn DatabasePlugin>,
124 accountant: Arc<MemoryAccountant>,
125 conflict_policies: RwLock<ConflictPolicies>,
126 subscriptions: Mutex<SubscriptionState>,
127 pruning_runtime: Mutex<PruningRuntime>,
128 pruning_guard: Arc<Mutex<()>>,
129 disk_limit: AtomicU64,
130 disk_limit_startup_ceiling: AtomicU64,
131 sync_watermark: Arc<AtomicLsn>,
132 closed: AtomicBool,
133 rows_examined: AtomicU64,
134 statement_cache: RwLock<HashMap<String, Arc<CachedStatement>>>,
135}
136
137pub(crate) enum InsertRowResult {
138 Inserted(RowId),
139 NoOp,
140}
141
142#[derive(Debug, Default)]
143pub(crate) struct IndexScanTxOverlay {
144 pub deleted_row_ids: std::collections::HashSet<RowId>,
145 pub matching_inserts: Vec<VersionedRow>,
146}
147
148enum RowConstraintCheck {
149 Valid,
150 DuplicateUniqueNoOp,
151}
152
153enum ConstraintProbe {
154 NoIndex,
155 NoMatch,
156 Match(RowId),
157}
158
159impl std::fmt::Debug for Database {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 f.debug_struct("Database")
162 .field("instance_id", &self.instance_id)
163 .finish_non_exhaustive()
164 }
165}
166
167#[derive(Debug, Clone)]
168struct PropagationQueueEntry {
169 table: String,
170 uuid: uuid::Uuid,
171 target_state: String,
172 depth: u32,
173 abort_on_failure: bool,
174}
175
176#[derive(Debug, Clone, Copy)]
177struct PropagationSource<'a> {
178 table: &'a str,
179 uuid: uuid::Uuid,
180 state: &'a str,
181 depth: u32,
182}
183
184#[derive(Debug, Clone, Copy)]
185struct PropagationContext<'a> {
186 tx: TxId,
187 snapshot: SnapshotId,
188 metas: &'a HashMap<String, TableMeta>,
189}
190
191#[derive(Debug)]
192struct SubscriptionState {
193 subscribers: Vec<SyncSender<CommitEvent>>,
194 events_sent: u64,
195 events_dropped: u64,
196}
197
198impl SubscriptionState {
199 fn new() -> Self {
200 Self {
201 subscribers: Vec::new(),
202 events_sent: 0,
203 events_dropped: 0,
204 }
205 }
206}
207
208#[derive(Debug)]
209struct PruningRuntime {
210 shutdown: Arc<AtomicBool>,
211 handle: Option<JoinHandle<()>>,
212}
213
214impl PruningRuntime {
215 fn new() -> Self {
216 Self {
217 shutdown: Arc::new(AtomicBool::new(false)),
218 handle: None,
219 }
220 }
221}
222
223impl Database {
224 #[allow(clippy::too_many_arguments)]
225 fn build_db(
226 tx_mgr: Arc<TxManager<DynStore>>,
227 relational: Arc<RelationalStore>,
228 graph: Arc<GraphStore>,
229 vector_store: Arc<VectorStore>,
230 hnsw: Arc<OnceLock<parking_lot::RwLock<Option<HnswIndex>>>>,
231 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
232 ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
233 persistence: Option<Arc<RedbPersistence>>,
234 plugin: Arc<dyn DatabasePlugin>,
235 accountant: Arc<MemoryAccountant>,
236 disk_limit: Option<u64>,
237 disk_limit_startup_ceiling: Option<u64>,
238 ) -> Self {
239 Self {
240 tx_mgr: tx_mgr.clone(),
241 relational_store: relational.clone(),
242 graph_store: graph.clone(),
243 vector_store: vector_store.clone(),
244 change_log,
245 ddl_log,
246 persistence,
247 relational: MemRelationalExecutor::new(relational, tx_mgr.clone()),
248 graph: MemGraphExecutor::new(graph, tx_mgr.clone()),
249 vector: MemVectorExecutor::new_with_accountant(
250 vector_store,
251 tx_mgr.clone(),
252 hnsw,
253 accountant.clone(),
254 ),
255 session_tx: Mutex::new(None),
256 instance_id: uuid::Uuid::new_v4(),
257 plugin,
258 accountant,
259 conflict_policies: RwLock::new(ConflictPolicies::uniform(ConflictPolicy::LatestWins)),
260 subscriptions: Mutex::new(SubscriptionState::new()),
261 pruning_runtime: Mutex::new(PruningRuntime::new()),
262 pruning_guard: Arc::new(Mutex::new(())),
263 disk_limit: AtomicU64::new(disk_limit.unwrap_or(0)),
264 disk_limit_startup_ceiling: AtomicU64::new(disk_limit_startup_ceiling.unwrap_or(0)),
265 sync_watermark: Arc::new(AtomicLsn::new(Lsn(0))),
266 closed: AtomicBool::new(false),
267 rows_examined: AtomicU64::new(0),
268 statement_cache: RwLock::new(HashMap::new()),
269 }
270 }
271
272 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
273 Self::open_with_config(
274 path,
275 Arc::new(CorePlugin),
276 Arc::new(MemoryAccountant::no_limit()),
277 )
278 }
279
280 pub fn open_memory() -> Self {
281 Self::open_memory_with_plugin_and_accountant(
282 Arc::new(CorePlugin),
283 Arc::new(MemoryAccountant::no_limit()),
284 )
285 .expect("failed to open in-memory database")
286 }
287
288 fn open_loaded(
289 path: impl AsRef<Path>,
290 plugin: Arc<dyn DatabasePlugin>,
291 mut accountant: Arc<MemoryAccountant>,
292 startup_disk_limit: Option<u64>,
293 ) -> Result<Self> {
294 let path = path.as_ref();
295 let persistence = if path.exists() {
296 Arc::new(RedbPersistence::open(path)?)
297 } else {
298 Arc::new(RedbPersistence::create(path)?)
299 };
300 if accountant.usage().limit.is_none()
301 && let Some(limit) = persistence.load_config_value::<usize>("memory_limit")?
302 {
303 accountant = Arc::new(MemoryAccountant::with_budget(limit));
304 }
305 let persisted_disk_limit = persistence.load_config_value::<u64>("disk_limit")?;
306 let startup_disk_ceiling = startup_disk_limit;
307 let effective_disk_limit = match (persisted_disk_limit, startup_disk_limit) {
308 (Some(persisted), Some(ceiling)) => Some(persisted.min(ceiling)),
309 (Some(persisted), None) => Some(persisted),
310 (None, Some(ceiling)) => Some(ceiling),
311 (None, None) => None,
312 };
313
314 let all_meta = persistence.load_all_table_meta()?;
315
316 let relational = Arc::new(RelationalStore::new());
317 for (name, meta) in &all_meta {
318 relational.create_table(name, meta.clone());
319 for decl in &meta.indexes {
323 relational.create_index_storage(name, &decl.name, decl.columns.clone());
324 }
325 for row in persistence.load_relational_table(name)? {
326 relational.insert_loaded_row(name, row);
327 }
328 }
329
330 let graph = Arc::new(GraphStore::new());
331 for edge in persistence.load_forward_edges()? {
332 graph.insert_loaded_edge(edge);
333 }
334
335 let hnsw = Arc::new(OnceLock::new());
336 let vector = Arc::new(VectorStore::new(hnsw.clone()));
337 for meta in all_meta.values() {
338 for column in &meta.columns {
339 if let ColumnType::Vector(dimension) = column.column_type {
340 vector.set_dimension(dimension);
341 break;
342 }
343 }
344 }
345 for entry in persistence.load_vectors()? {
346 vector.insert_loaded_vector(entry);
347 }
348
349 let max_row_id = relational.max_row_id();
350 let max_tx = max_tx_across_all(&relational, &graph, &vector);
351 let max_lsn = max_lsn_across_all(&relational, &graph, &vector);
352 relational.set_next_row_id(RowId(max_row_id.0.saturating_add(1)));
353
354 let change_log = Arc::new(RwLock::new(persistence.load_change_log()?));
355 let ddl_log = Arc::new(RwLock::new(persistence.load_ddl_log()?));
356 let composite = CompositeStore::new(
357 relational.clone(),
358 graph.clone(),
359 vector.clone(),
360 change_log.clone(),
361 ddl_log.clone(),
362 );
363 let persistent = PersistentCompositeStore::new(composite, persistence.clone());
364 let store: DynStore = Box::new(persistent);
365 let tx_mgr = Arc::new(TxManager::new_with_counters(
366 store,
367 TxId(max_tx.0.saturating_add(1)),
368 Lsn(max_lsn.0.saturating_add(1)),
369 max_tx,
370 ));
371
372 let db = Self::build_db(
373 tx_mgr,
374 relational,
375 graph,
376 vector,
377 hnsw,
378 change_log,
379 ddl_log,
380 Some(persistence),
381 plugin,
382 accountant,
383 effective_disk_limit,
384 startup_disk_ceiling,
385 );
386
387 for meta in all_meta.values() {
388 if !meta.dag_edge_types.is_empty() {
389 db.graph.register_dag_edge_types(&meta.dag_edge_types);
390 }
391 }
392
393 db.account_loaded_state()?;
394 maybe_prebuild_hnsw(&db.vector_store, db.accountant());
395
396 Ok(db)
397 }
398
399 fn open_memory_internal(
400 plugin: Arc<dyn DatabasePlugin>,
401 accountant: Arc<MemoryAccountant>,
402 ) -> Result<Self> {
403 let relational = Arc::new(RelationalStore::new());
404 let graph = Arc::new(GraphStore::new());
405 let hnsw = Arc::new(OnceLock::new());
406 let vector = Arc::new(VectorStore::new(hnsw.clone()));
407 let change_log = Arc::new(RwLock::new(Vec::new()));
408 let ddl_log = Arc::new(RwLock::new(Vec::new()));
409 let store: DynStore = Box::new(CompositeStore::new(
410 relational.clone(),
411 graph.clone(),
412 vector.clone(),
413 change_log.clone(),
414 ddl_log.clone(),
415 ));
416 let tx_mgr = Arc::new(TxManager::new(store));
417
418 let db = Self::build_db(
419 tx_mgr, relational, graph, vector, hnsw, change_log, ddl_log, None, plugin, accountant,
420 None, None,
421 );
422 maybe_prebuild_hnsw(&db.vector_store, db.accountant());
423 Ok(db)
424 }
425
426 pub fn begin(&self) -> TxId {
427 self.tx_mgr.begin()
428 }
429
430 pub fn commit(&self, tx: TxId) -> Result<()> {
431 self.commit_with_source(tx, CommitSource::User)
432 }
433
434 pub fn rollback(&self, tx: TxId) -> Result<()> {
435 let ws = self.tx_mgr.cloned_write_set(tx)?;
436 self.release_insert_allocations(&ws);
437 self.tx_mgr.rollback(tx)
438 }
439
440 pub fn snapshot(&self) -> SnapshotId {
441 self.tx_mgr.snapshot()
442 }
443
444 pub fn execute(&self, sql: &str, params: &HashMap<String, Value>) -> Result<QueryResult> {
445 if let Some(cached) = self.cached_statement(sql) {
446 let active_tx = *self.session_tx.lock();
447 return self.execute_statement_with_plan(
448 &cached.stmt,
449 sql,
450 params,
451 active_tx,
452 Some(&cached.plan),
453 );
454 }
455
456 let stmt = contextdb_parser::parse(sql)?;
457
458 match &stmt {
459 Statement::Begin => {
460 let mut session = self.session_tx.lock();
461 if session.is_none() {
462 *session = Some(self.begin());
463 }
464 return Ok(QueryResult::empty());
465 }
466 Statement::Commit => {
467 let mut session = self.session_tx.lock();
468 if let Some(tx) = session.take() {
469 self.commit_with_source(tx, CommitSource::User)?;
470 }
471 return Ok(QueryResult::empty());
472 }
473 Statement::Rollback => {
474 let mut session = self.session_tx.lock();
475 if let Some(tx) = *session {
476 self.rollback(tx)?;
477 *session = None;
478 }
479 return Ok(QueryResult::empty());
480 }
481 _ => {}
482 }
483
484 let active_tx = *self.session_tx.lock();
485 self.execute_statement(&stmt, sql, params, active_tx)
486 }
487
488 fn cached_statement(&self, sql: &str) -> Option<Arc<CachedStatement>> {
489 self.statement_cache.read().get(sql).cloned()
490 }
491
492 fn cache_statement_if_eligible(&self, sql: &str, stmt: &Statement, plan: &PhysicalPlan) {
493 if !Self::is_statement_cache_eligible(stmt, plan) {
494 return;
495 }
496
497 let mut cache = self.statement_cache.write();
498 if cache.contains_key(sql) {
499 return;
500 }
501 if cache.len() >= MAX_STATEMENT_CACHE_ENTRIES {
502 return;
503 }
504 cache.insert(
505 sql.to_string(),
506 Arc::new(CachedStatement {
507 stmt: stmt.clone(),
508 plan: plan.clone(),
509 }),
510 );
511 }
512
513 fn is_statement_cache_eligible(stmt: &Statement, plan: &PhysicalPlan) -> bool {
514 matches!((stmt, plan), (Statement::Insert(ins), PhysicalPlan::Insert(_))
515 if !ins.table.eq_ignore_ascii_case("GRAPH")
516 && !ins.table.eq_ignore_ascii_case("__edges"))
517 }
518
519 pub(crate) fn clear_statement_cache(&self) {
520 self.statement_cache.write().clear();
521 }
522
523 #[doc(hidden)]
524 pub fn __statement_cache_len(&self) -> usize {
525 self.statement_cache.read().len()
526 }
527
528 fn execute_autocommit(
529 &self,
530 plan: &PhysicalPlan,
531 params: &HashMap<String, Value>,
532 ) -> Result<QueryResult> {
533 self.__reset_rows_examined();
537 match plan {
538 PhysicalPlan::Insert(_) | PhysicalPlan::Delete(_) | PhysicalPlan::Update(_) => {
539 let tx = self.begin();
540 let result = execute_plan(self, plan, params, Some(tx));
541 match result {
542 Ok(qr) => {
543 self.commit_with_source(tx, CommitSource::AutoCommit)?;
544 Ok(qr)
545 }
546 Err(e) => {
547 let _ = self.rollback(tx);
548 Err(e)
549 }
550 }
551 }
552 _ => execute_plan(self, plan, params, None),
553 }
554 }
555
556 pub fn explain(&self, sql: &str) -> Result<String> {
557 let stmt = contextdb_parser::parse(sql)?;
558 let plan = contextdb_planner::plan(&stmt)?;
559 if self.vector_store.vector_count() >= 1000 && !self.vector_store.has_hnsw_index() {
560 maybe_prebuild_hnsw(&self.vector_store, self.accountant());
561 }
562 let mut output = plan.explain();
563 if self.vector_store.has_hnsw_index() {
564 output = output.replace("VectorSearch(", "HNSWSearch(");
565 output = output.replace("VectorSearch {", "HNSWSearch {");
566 } else {
567 output = output.replace("VectorSearch(", "VectorSearch(strategy=BruteForce, ");
568 output = output.replace("VectorSearch {", "VectorSearch { strategy: BruteForce,");
569 }
570 Ok(output)
571 }
572
573 pub fn execute_in_tx(
574 &self,
575 tx: TxId,
576 sql: &str,
577 params: &HashMap<String, Value>,
578 ) -> Result<QueryResult> {
579 let stmt = contextdb_parser::parse(sql)?;
580 self.execute_statement(&stmt, sql, params, Some(tx))
581 }
582
583 fn commit_with_source(&self, tx: TxId, source: CommitSource) -> Result<()> {
584 let mut ws = self.tx_mgr.cloned_write_set(tx)?;
585
586 if !ws.is_empty()
587 && let Err(err) = self.validate_foreign_keys_in_tx(tx)
588 {
589 let _ = self.rollback(tx);
590 return Err(err);
591 }
592
593 if !ws.is_empty()
594 && let Err(err) = self.plugin.pre_commit(&ws, source)
595 {
596 let _ = self.rollback(tx);
597 return Err(err);
598 }
599
600 let lsn = self.tx_mgr.commit_with_lsn(tx)?;
601
602 if !ws.is_empty() {
603 self.release_delete_allocations(&ws);
604 ws.stamp_lsn(lsn);
605 self.plugin.post_commit(&ws, source);
606 self.publish_commit_event_if_subscribers(&ws, source, lsn);
607 }
608
609 Ok(())
610 }
611
612 fn build_commit_event(
613 ws: &contextdb_tx::WriteSet,
614 source: CommitSource,
615 lsn: Lsn,
616 ) -> CommitEvent {
617 let mut tables_changed: Vec<String> = ws
618 .relational_inserts
619 .iter()
620 .map(|(table, _)| table.clone())
621 .chain(
622 ws.relational_deletes
623 .iter()
624 .map(|(table, _, _)| table.clone()),
625 )
626 .collect::<HashSet<_>>()
627 .into_iter()
628 .collect();
629 tables_changed.sort();
630
631 CommitEvent {
632 source,
633 lsn,
634 tables_changed,
635 row_count: ws.relational_inserts.len()
636 + ws.relational_deletes.len()
637 + ws.adj_inserts.len()
638 + ws.adj_deletes.len()
639 + ws.vector_inserts.len()
640 + ws.vector_deletes.len(),
641 }
642 }
643
644 fn publish_commit_event_if_subscribers(
645 &self,
646 ws: &contextdb_tx::WriteSet,
647 source: CommitSource,
648 lsn: Lsn,
649 ) {
650 let mut subscriptions = self.subscriptions.lock();
651 if subscriptions.subscribers.is_empty() {
652 return;
653 }
654 let event = Self::build_commit_event(ws, source, lsn);
655 let subscribers = std::mem::take(&mut subscriptions.subscribers);
656 let mut live_subscribers = Vec::with_capacity(subscribers.len());
657
658 for sender in subscribers {
659 match sender.try_send(event.clone()) {
660 Ok(()) => {
661 subscriptions.events_sent += 1;
662 live_subscribers.push(sender);
663 }
664 Err(TrySendError::Full(_)) => {
665 subscriptions.events_dropped += 1;
666 live_subscribers.push(sender);
667 }
668 Err(TrySendError::Disconnected(_)) => {}
669 }
670 }
671
672 subscriptions.subscribers = live_subscribers;
673 }
674
675 fn stop_pruning_thread(&self) {
676 let handle = {
677 let mut runtime = self.pruning_runtime.lock();
678 runtime.shutdown.store(true, Ordering::SeqCst);
679 let handle = runtime.handle.take();
680 runtime.shutdown = Arc::new(AtomicBool::new(false));
681 handle
682 };
683
684 if let Some(handle) = handle {
685 let _ = handle.join();
686 }
687 }
688
689 fn execute_statement(
690 &self,
691 stmt: &Statement,
692 sql: &str,
693 params: &HashMap<String, Value>,
694 tx: Option<TxId>,
695 ) -> Result<QueryResult> {
696 self.execute_statement_with_plan(stmt, sql, params, tx, None)
697 }
698
699 fn execute_statement_with_plan(
700 &self,
701 stmt: &Statement,
702 sql: &str,
703 params: &HashMap<String, Value>,
704 tx: Option<TxId>,
705 cached_plan: Option<&PhysicalPlan>,
706 ) -> Result<QueryResult> {
707 self.plugin.on_query(sql)?;
708
709 if let Some(change) = self.ddl_change_for_statement(stmt).as_ref() {
710 self.plugin.on_ddl(change)?;
711 }
712
713 if let Statement::Insert(ins) = stmt
715 && (ins.table.eq_ignore_ascii_case("GRAPH")
716 || ins.table.eq_ignore_ascii_case("__edges"))
717 {
718 return self.execute_graph_insert(ins, params, tx);
719 }
720
721 let started = Instant::now();
722 let result = (|| {
723 if let Some(plan) = cached_plan {
724 return self.run_planned_statement(stmt, plan, params, tx);
725 }
726
727 let (stmt, plan) = {
728 let stmt = self.pre_resolve_cte_subqueries(stmt, params, tx)?;
730 let plan = contextdb_planner::plan(&stmt)?;
731 self.cache_statement_if_eligible(sql, &stmt, &plan);
732 (stmt, plan)
733 };
734 self.run_planned_statement(&stmt, &plan, params, tx)
735 })();
736 let duration = started.elapsed();
737 let outcome = query_outcome_from_result(&result);
738 self.plugin.post_query(sql, duration, &outcome);
739 result.map(strip_internal_row_id)
740 }
741
742 fn run_planned_statement(
743 &self,
744 stmt: &Statement,
745 plan: &PhysicalPlan,
746 params: &HashMap<String, Value>,
747 tx: Option<TxId>,
748 ) -> Result<QueryResult> {
749 validate_dml(plan, self, params)?;
750 let result = match tx {
751 Some(tx) => {
752 self.__reset_rows_examined();
755 execute_plan(self, plan, params, Some(tx))
756 }
757 None => self.execute_autocommit(plan, params),
758 };
759 if result.is_ok()
760 && let Statement::CreateTable(ct) = stmt
761 && !ct.dag_edge_types.is_empty()
762 {
763 self.graph.register_dag_edge_types(&ct.dag_edge_types);
764 }
765 result
766 }
767
768 fn execute_graph_insert(
770 &self,
771 ins: &contextdb_parser::ast::Insert,
772 params: &HashMap<String, Value>,
773 tx: Option<TxId>,
774 ) -> Result<QueryResult> {
775 use crate::executor::resolve_expr;
776
777 let col_index = |name: &str| {
778 ins.columns
779 .iter()
780 .position(|c| c.eq_ignore_ascii_case(name))
781 };
782 let source_idx = col_index("source_id")
783 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires source_id column".into()))?;
784 let target_idx = col_index("target_id")
785 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires target_id column".into()))?;
786 let edge_type_idx = col_index("edge_type")
787 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires edge_type column".into()))?;
788
789 let auto_commit = tx.is_none();
790 let tx = tx.unwrap_or_else(|| self.begin());
791 let mut count = 0u64;
792 for row_exprs in &ins.values {
793 let source = resolve_expr(&row_exprs[source_idx], params)?;
794 let target = resolve_expr(&row_exprs[target_idx], params)?;
795 let edge_type = resolve_expr(&row_exprs[edge_type_idx], params)?;
796
797 let source_uuid = match &source {
798 Value::Uuid(u) => *u,
799 Value::Text(t) => uuid::Uuid::parse_str(t)
800 .map_err(|e| Error::PlanError(format!("invalid source_id uuid: {e}")))?,
801 _ => return Err(Error::PlanError("source_id must be UUID".into())),
802 };
803 let target_uuid = match &target {
804 Value::Uuid(u) => *u,
805 Value::Text(t) => uuid::Uuid::parse_str(t)
806 .map_err(|e| Error::PlanError(format!("invalid target_id uuid: {e}")))?,
807 _ => return Err(Error::PlanError("target_id must be UUID".into())),
808 };
809 let edge_type_str = match &edge_type {
810 Value::Text(t) => t.clone(),
811 _ => return Err(Error::PlanError("edge_type must be TEXT".into())),
812 };
813
814 self.insert_edge(
815 tx,
816 source_uuid,
817 target_uuid,
818 edge_type_str,
819 Default::default(),
820 )?;
821 count += 1;
822 }
823
824 if auto_commit {
825 self.commit_with_source(tx, CommitSource::AutoCommit)?;
826 }
827
828 Ok(QueryResult::empty_with_affected(count))
829 }
830
831 fn ddl_change_for_statement(&self, stmt: &Statement) -> Option<DdlChange> {
832 match stmt {
833 Statement::CreateTable(ct) => Some(ddl_change_from_create_table(ct)),
834 Statement::DropTable(dt) => Some(DdlChange::DropTable {
835 name: dt.name.clone(),
836 }),
837 Statement::AlterTable(at) => {
838 let mut meta = self.table_meta(&at.table).unwrap_or_default();
839 match &at.action {
841 AlterAction::AddColumn(col) => {
842 meta.columns.push(contextdb_core::ColumnDef {
843 name: col.name.clone(),
844 column_type: crate::executor::map_column_type(&col.data_type),
845 nullable: col.nullable,
846 primary_key: col.primary_key,
847 unique: col.unique,
848 default: col
849 .default
850 .as_ref()
851 .map(crate::executor::stored_default_expr),
852 references: col.references.as_ref().map(|reference| {
853 contextdb_core::ForeignKeyReference {
854 table: reference.table.clone(),
855 column: reference.column.clone(),
856 }
857 }),
858 expires: col.expires,
859 immutable: col.immutable,
860 });
861 if col.expires {
862 meta.expires_column = Some(col.name.clone());
863 }
864 }
865 AlterAction::DropColumn {
866 column: name,
867 cascade: _,
868 } => {
869 meta.columns.retain(|c| c.name != *name);
870 if meta.expires_column.as_deref() == Some(name.as_str()) {
871 meta.expires_column = None;
872 }
873 }
874 AlterAction::RenameColumn { from, to } => {
875 if let Some(c) = meta.columns.iter_mut().find(|c| c.name == *from) {
876 c.name = to.clone();
877 }
878 if meta.expires_column.as_deref() == Some(from.as_str()) {
879 meta.expires_column = Some(to.clone());
880 }
881 }
882 AlterAction::SetRetain {
883 duration_seconds,
884 sync_safe,
885 } => {
886 meta.default_ttl_seconds = Some(*duration_seconds);
887 meta.sync_safe = *sync_safe;
888 }
889 AlterAction::DropRetain => {
890 meta.default_ttl_seconds = None;
891 meta.sync_safe = false;
892 }
893 AlterAction::SetSyncConflictPolicy(_) | AlterAction::DropSyncConflictPolicy => { }
895 }
896 Some(DdlChange::AlterTable {
897 name: at.table.clone(),
898 columns: meta
899 .columns
900 .iter()
901 .map(|c| {
902 (
903 c.name.clone(),
904 sql_type_for_meta_column(c, &meta.propagation_rules),
905 )
906 })
907 .collect(),
908 constraints: create_table_constraints_from_meta(&meta),
909 })
910 }
911 _ => None,
912 }
913 }
914
915 fn pre_resolve_cte_subqueries(
918 &self,
919 stmt: &Statement,
920 params: &HashMap<String, Value>,
921 tx: Option<TxId>,
922 ) -> Result<Statement> {
923 if let Statement::Select(sel) = stmt
924 && !sel.ctes.is_empty()
925 && sel.body.where_clause.is_some()
926 {
927 use crate::executor::resolve_in_subqueries_with_ctes;
928 let resolved_where = sel
929 .body
930 .where_clause
931 .as_ref()
932 .map(|expr| resolve_in_subqueries_with_ctes(self, expr, params, tx, &sel.ctes))
933 .transpose()?;
934 let mut new_body = sel.body.clone();
935 new_body.where_clause = resolved_where;
936 Ok(Statement::Select(contextdb_parser::ast::SelectStatement {
937 ctes: sel.ctes.clone(),
938 body: new_body,
939 }))
940 } else {
941 Ok(stmt.clone())
942 }
943 }
944
945 pub fn insert_row(
946 &self,
947 tx: TxId,
948 table: &str,
949 values: HashMap<ColName, Value>,
950 ) -> Result<RowId> {
951 let values =
956 self.coerce_row_for_insert(table, values, Some(self.committed_watermark()), Some(tx))?;
957 self.validate_row_constraints(tx, table, &values, None)?;
958 self.relational.insert(tx, table, values)
959 }
960
961 pub(crate) fn insert_row_replacing(
967 &self,
968 tx: TxId,
969 table: &str,
970 values: HashMap<ColName, Value>,
971 old_row_id: RowId,
972 ) -> Result<RowId> {
973 let values =
974 self.coerce_row_for_insert(table, values, Some(self.committed_watermark()), Some(tx))?;
975 self.validate_row_constraints(tx, table, &values, Some(old_row_id))?;
976 self.relational.insert(tx, table, values)
977 }
978
979 pub(crate) fn insert_row_for_sync(
983 &self,
984 tx: TxId,
985 table: &str,
986 values: HashMap<ColName, Value>,
987 ) -> Result<RowId> {
988 let values = self.coerce_row_for_insert(table, values, None, None)?;
989 self.validate_row_constraints(tx, table, &values, None)?;
990 self.relational.insert(tx, table, values)
991 }
992
993 pub(crate) fn upsert_row_for_sync(
994 &self,
995 tx: TxId,
996 table: &str,
997 conflict_col: &str,
998 values: HashMap<ColName, Value>,
999 ) -> Result<UpsertResult> {
1000 let values = self.coerce_row_for_insert(table, values, None, None)?;
1001 self.upsert_row(tx, table, conflict_col, values)
1002 }
1003
1004 fn coerce_row_for_insert(
1013 &self,
1014 table: &str,
1015 values: HashMap<ColName, Value>,
1016 current_tx_max: Option<TxId>,
1017 active_tx: Option<TxId>,
1018 ) -> Result<HashMap<ColName, Value>> {
1019 let meta = self.table_meta(table);
1020 let mut out: HashMap<ColName, Value> = HashMap::with_capacity(values.len());
1021 for (col, v) in values {
1022 let is_vector_bypass = matches!(&v, Value::Vector(_))
1024 && meta
1025 .as_ref()
1026 .and_then(|m| m.columns.iter().find(|c| c.name == col))
1027 .map(|c| matches!(c.column_type, contextdb_core::ColumnType::Vector(_)))
1028 .unwrap_or(false);
1029
1030 let coerced = if is_vector_bypass {
1031 v
1032 } else {
1033 crate::executor::coerce_into_column(
1034 self,
1035 table,
1036 &col,
1037 v,
1038 current_tx_max,
1039 active_tx,
1040 )?
1041 };
1042 out.insert(col, coerced);
1043 }
1044 Ok(out)
1045 }
1046
1047 pub(crate) fn insert_row_with_unique_noop(
1048 &self,
1049 tx: TxId,
1050 table: &str,
1051 values: HashMap<ColName, Value>,
1052 ) -> Result<InsertRowResult> {
1053 match self.check_row_constraints(tx, table, &values, None, true)? {
1054 RowConstraintCheck::Valid => self
1055 .relational
1056 .insert(tx, table, values)
1057 .map(InsertRowResult::Inserted),
1058 RowConstraintCheck::DuplicateUniqueNoOp => Ok(InsertRowResult::NoOp),
1059 }
1060 }
1061
1062 pub fn upsert_row(
1063 &self,
1064 tx: TxId,
1065 table: &str,
1066 conflict_col: &str,
1067 values: HashMap<ColName, Value>,
1068 ) -> Result<UpsertResult> {
1069 let snapshot = self.snapshot_for_read();
1070 let existing_row = values
1071 .get(conflict_col)
1072 .map(|conflict_value| {
1073 self.point_lookup_in_tx(tx, table, conflict_col, conflict_value, snapshot)
1074 })
1075 .transpose()?
1076 .flatten();
1077 let existing_row_id = existing_row.as_ref().map(|row| row.row_id);
1078 if let (Some(existing), Some(meta)) = (existing_row.as_ref(), self.table_meta(table)) {
1082 for col_def in meta.columns.iter().filter(|c| c.immutable) {
1083 let Some(incoming) = values.get(&col_def.name) else {
1084 continue;
1085 };
1086 let existing_value = existing.values.get(&col_def.name);
1087 if existing_value != Some(incoming) {
1088 return Err(Error::ImmutableColumn {
1089 table: table.to_string(),
1090 column: col_def.name.clone(),
1091 });
1092 }
1093 }
1094 }
1095 self.validate_row_constraints(tx, table, &values, existing_row_id)?;
1096
1097 let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1098 let meta = self.table_meta(table);
1099 let new_state = meta
1100 .as_ref()
1101 .and_then(|m| m.state_machine.as_ref())
1102 .and_then(|sm| values.get(&sm.column))
1103 .and_then(Value::as_text)
1104 .map(std::borrow::ToOwned::to_owned);
1105
1106 let result = self
1107 .relational
1108 .upsert(tx, table, conflict_col, values, snapshot)?;
1109
1110 if let (Some(uuid), Some(state), Some(_meta)) =
1111 (row_uuid, new_state.as_deref(), meta.as_ref())
1112 && matches!(result, UpsertResult::Updated)
1113 {
1114 self.propagate_state_change_if_needed(tx, table, Some(uuid), Some(state))?;
1115 }
1116
1117 Ok(result)
1118 }
1119
1120 fn validate_row_constraints(
1121 &self,
1122 tx: TxId,
1123 table: &str,
1124 values: &HashMap<ColName, Value>,
1125 skip_row_id: Option<RowId>,
1126 ) -> Result<()> {
1127 match self.check_row_constraints(tx, table, values, skip_row_id, false)? {
1128 RowConstraintCheck::Valid => Ok(()),
1129 RowConstraintCheck::DuplicateUniqueNoOp => {
1130 unreachable!("strict constraint validation cannot return no-op")
1131 }
1132 }
1133 }
1134
1135 fn check_row_constraints(
1136 &self,
1137 tx: TxId,
1138 table: &str,
1139 values: &HashMap<ColName, Value>,
1140 skip_row_id: Option<RowId>,
1141 allow_duplicate_unique_noop: bool,
1142 ) -> Result<RowConstraintCheck> {
1143 let metas = self.relational_store.table_meta.read();
1144 let meta = metas
1145 .get(table)
1146 .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1147 let snapshot = self.snapshot();
1152
1153 let mut visible_rows_cache: Option<Vec<VersionedRow>> = None;
1156
1157 for column in meta.columns.iter().filter(|column| column.primary_key) {
1158 let Some(value) = values.get(&column.name) else {
1159 continue;
1160 };
1161 if *value == Value::Null {
1162 continue;
1163 }
1164 match self.probe_column_for_constraint(
1165 tx,
1166 table,
1167 &column.name,
1168 value,
1169 snapshot,
1170 skip_row_id,
1171 )? {
1172 ConstraintProbe::Match(_) => {
1173 return Err(Error::UniqueViolation {
1174 table: table.to_string(),
1175 column: column.name.clone(),
1176 });
1177 }
1178 ConstraintProbe::NoMatch => {}
1179 ConstraintProbe::NoIndex => {
1180 if visible_rows_cache.is_none() {
1182 visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1183 Some(tx),
1184 table,
1185 snapshot,
1186 &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1187 )?);
1188 }
1189 let rows = visible_rows_cache.as_deref().unwrap();
1190 if rows
1191 .iter()
1192 .any(|existing| existing.values.get(&column.name) == Some(value))
1193 {
1194 return Err(Error::UniqueViolation {
1195 table: table.to_string(),
1196 column: column.name.clone(),
1197 });
1198 }
1199 }
1200 }
1201 }
1202
1203 let mut duplicate_unique_row_id = None;
1204
1205 for column in meta
1206 .columns
1207 .iter()
1208 .filter(|column| column.unique && !column.primary_key)
1209 {
1210 let Some(value) = values.get(&column.name) else {
1211 continue;
1212 };
1213 if *value == Value::Null {
1214 continue;
1215 }
1216 let matching_row_ids: Vec<RowId> = match self.probe_column_for_constraint(
1217 tx,
1218 table,
1219 &column.name,
1220 value,
1221 snapshot,
1222 skip_row_id,
1223 )? {
1224 ConstraintProbe::Match(rid) => vec![rid],
1225 ConstraintProbe::NoMatch => Vec::new(),
1226 ConstraintProbe::NoIndex => {
1227 if visible_rows_cache.is_none() {
1228 visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1229 Some(tx),
1230 table,
1231 snapshot,
1232 &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1233 )?);
1234 }
1235 let rows = visible_rows_cache.as_deref().unwrap();
1236 rows.iter()
1237 .filter(|existing| existing.values.get(&column.name) == Some(value))
1238 .map(|existing| existing.row_id)
1239 .collect()
1240 }
1241 };
1242 self.merge_unique_conflict(
1243 table,
1244 &column.name,
1245 &matching_row_ids,
1246 allow_duplicate_unique_noop,
1247 &mut duplicate_unique_row_id,
1248 )?;
1249 }
1250
1251 for unique_constraint in &meta.unique_constraints {
1252 let mut candidate_values = Vec::with_capacity(unique_constraint.len());
1253 let mut has_null = false;
1254
1255 for column_name in unique_constraint {
1256 match values.get(column_name) {
1257 Some(Value::Null) | None => {
1258 has_null = true;
1259 break;
1260 }
1261 Some(value) => candidate_values.push(value.clone()),
1262 }
1263 }
1264
1265 if has_null {
1266 continue;
1267 }
1268
1269 let matching_row_ids: Vec<RowId> = if let Some(rid) = self.probe_composite_unique(
1270 tx,
1271 table,
1272 unique_constraint,
1273 &candidate_values,
1274 snapshot,
1275 skip_row_id,
1276 )? {
1277 vec![rid]
1278 } else if self.index_covers_composite(table, unique_constraint) {
1279 Vec::new()
1280 } else {
1281 if visible_rows_cache.is_none() {
1282 visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1283 Some(tx),
1284 table,
1285 snapshot,
1286 &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1287 )?);
1288 }
1289 let rows = visible_rows_cache.as_deref().unwrap();
1290 rows.iter()
1291 .filter(|existing| {
1292 unique_constraint.iter().zip(candidate_values.iter()).all(
1293 |(column_name, value)| existing.values.get(column_name) == Some(value),
1294 )
1295 })
1296 .map(|existing| existing.row_id)
1297 .collect()
1298 };
1299 let column_label = unique_constraint.first().map(|s| s.as_str()).unwrap_or("");
1302 self.merge_unique_conflict(
1303 table,
1304 column_label,
1305 &matching_row_ids,
1306 allow_duplicate_unique_noop,
1307 &mut duplicate_unique_row_id,
1308 )?;
1309 }
1310
1311 if duplicate_unique_row_id.is_some() {
1312 Ok(RowConstraintCheck::DuplicateUniqueNoOp)
1313 } else {
1314 Ok(RowConstraintCheck::Valid)
1315 }
1316 }
1317
1318 fn merge_unique_conflict(
1319 &self,
1320 table: &str,
1321 column: &str,
1322 matching_row_ids: &[RowId],
1323 allow_duplicate_unique_noop: bool,
1324 duplicate_unique_row_id: &mut Option<RowId>,
1325 ) -> Result<()> {
1326 if matching_row_ids.is_empty() {
1327 return Ok(());
1328 }
1329
1330 if !allow_duplicate_unique_noop || matching_row_ids.len() != 1 {
1331 return Err(Error::UniqueViolation {
1332 table: table.to_string(),
1333 column: column.to_string(),
1334 });
1335 }
1336
1337 let matched_row_id = matching_row_ids[0];
1338
1339 if let Some(existing_row_id) = duplicate_unique_row_id {
1340 if *existing_row_id != matched_row_id {
1341 return Err(Error::UniqueViolation {
1342 table: table.to_string(),
1343 column: column.to_string(),
1344 });
1345 }
1346 } else {
1347 *duplicate_unique_row_id = Some(matched_row_id);
1348 }
1349
1350 Ok(())
1351 }
1352
1353 fn index_covers_column(&self, table: &str, column: &str) -> bool {
1355 let indexes = self.relational_store.indexes.read();
1356 indexes
1357 .iter()
1358 .any(|((t, _), idx)| t == table && idx.columns.len() == 1 && idx.columns[0].0 == column)
1359 }
1360
1361 fn index_covers_composite(&self, table: &str, cols: &[String]) -> bool {
1364 let indexes = self.relational_store.indexes.read();
1365 indexes.iter().any(|((t, _), idx)| {
1366 t == table
1367 && idx.columns.len() >= cols.len()
1368 && idx
1369 .columns
1370 .iter()
1371 .zip(cols.iter())
1372 .all(|((c, _), want)| c == want)
1373 })
1374 }
1375
1376 fn probe_column_for_constraint(
1379 &self,
1380 tx: TxId,
1381 table: &str,
1382 column: &str,
1383 value: &Value,
1384 snapshot: SnapshotId,
1385 skip_row_id: Option<RowId>,
1386 ) -> Result<ConstraintProbe> {
1387 use contextdb_core::{DirectedValue, TotalOrdAsc};
1388 let (tx_staged_deletes, staged_overlap) = self.tx_mgr.with_write_set(tx, |ws| {
1389 let deletes = if ws.relational_deletes.is_empty() {
1393 std::collections::HashSet::new()
1394 } else {
1395 ws.relational_deletes
1396 .iter()
1397 .filter(|(t, _, _)| t == table)
1398 .map(|(_, row_id, _)| *row_id)
1399 .collect()
1400 };
1401 let overlap = ws.relational_inserts.iter().find_map(|(t, row)| {
1402 if t != table {
1403 return None;
1404 }
1405 if let Some(sid) = skip_row_id
1406 && row.row_id == sid
1407 {
1408 return None;
1409 }
1410 if row.values.get(column) == Some(value) {
1411 Some(row.row_id)
1412 } else {
1413 None
1414 }
1415 });
1416 (deletes, overlap)
1417 })?;
1418 let indexes = self.relational_store.indexes.read();
1419 let table_key = table.to_string();
1422 let pk_key = (table_key.clone(), format!("__pk_{column}"));
1423 let unique_key = (table_key, format!("__unique_{column}"));
1424 let storage = indexes
1425 .get(&pk_key)
1426 .or_else(|| indexes.get(&unique_key))
1427 .or_else(|| {
1428 indexes.iter().find_map(|((t, _), idx)| {
1429 if t == table && idx.columns.len() == 1 && idx.columns[0].0 == column {
1430 Some(idx)
1431 } else {
1432 None
1433 }
1434 })
1435 });
1436 let Some(storage) = storage else {
1437 return Ok(ConstraintProbe::NoIndex);
1438 };
1439 let key = vec![DirectedValue::Asc(TotalOrdAsc(value.clone()))];
1440 if let Some(entries) = storage.tree.get(&key) {
1441 for entry in entries {
1442 if let Some(sid) = skip_row_id
1443 && entry.row_id == sid
1444 {
1445 continue;
1446 }
1447 if tx_staged_deletes.contains(&entry.row_id) {
1448 continue;
1449 }
1450 if entry.visible_at(snapshot) {
1451 return Ok(ConstraintProbe::Match(entry.row_id));
1452 }
1453 }
1454 }
1455 drop(indexes);
1456 Ok(match staged_overlap {
1457 Some(row_id) => ConstraintProbe::Match(row_id),
1458 None => ConstraintProbe::NoMatch,
1459 })
1460 }
1461
1462 fn probe_composite_unique(
1466 &self,
1467 tx: TxId,
1468 table: &str,
1469 cols: &[String],
1470 values: &[Value],
1471 snapshot: SnapshotId,
1472 skip_row_id: Option<RowId>,
1473 ) -> Result<Option<RowId>> {
1474 use contextdb_core::{DirectedValue, TotalOrdAsc};
1475 if cols.is_empty() || values.is_empty() || cols.len() != values.len() {
1476 return Ok(None);
1477 }
1478 let tx_staged_deletes: std::collections::HashSet<RowId> =
1480 self.tx_mgr.with_write_set(tx, |ws| {
1481 ws.relational_deletes
1482 .iter()
1483 .filter(|(t, _, _)| t == table)
1484 .map(|(_, row_id, _)| *row_id)
1485 .collect()
1486 })?;
1487 let indexes = self.relational_store.indexes.read();
1488 let storage_entry = indexes.iter().find(|((t, _), idx)| {
1489 t == table
1490 && idx.columns.len() >= cols.len()
1491 && idx
1492 .columns
1493 .iter()
1494 .zip(cols.iter())
1495 .all(|((c, _), w)| c == w)
1496 });
1497 let (_, storage) = match storage_entry {
1498 Some(e) => e,
1499 None => return Ok(None),
1500 };
1501 for (key, entries) in storage.tree.iter() {
1504 if key.len() < cols.len() {
1505 continue;
1506 }
1507 let prefix_match = values.iter().enumerate().all(|(i, v)| match &key[i] {
1508 DirectedValue::Asc(TotalOrdAsc(k)) => k == v,
1509 DirectedValue::Desc(contextdb_core::TotalOrdDesc(k)) => k == v,
1510 });
1511 if !prefix_match {
1512 continue;
1513 }
1514 for entry in entries {
1515 if let Some(sid) = skip_row_id
1516 && entry.row_id == sid
1517 {
1518 continue;
1519 }
1520 if tx_staged_deletes.contains(&entry.row_id) {
1521 continue;
1522 }
1523 if entry.visible_at(snapshot) {
1524 return Ok(Some(entry.row_id));
1525 }
1526 }
1527 }
1528 drop(indexes);
1529 let overlap = self.tx_mgr.with_write_set(tx, |ws| {
1531 for (t, row) in &ws.relational_inserts {
1532 if t != table {
1533 continue;
1534 }
1535 if let Some(sid) = skip_row_id
1536 && row.row_id == sid
1537 {
1538 continue;
1539 }
1540 let matches = cols
1541 .iter()
1542 .zip(values.iter())
1543 .all(|(c, v)| row.values.get(c) == Some(v));
1544 if matches {
1545 return Some(row.row_id);
1546 }
1547 }
1548 None
1549 })?;
1550 Ok(overlap)
1551 }
1552
1553 fn validate_foreign_keys_in_tx(&self, tx: TxId) -> Result<()> {
1554 let snapshot = self.snapshot();
1557 let relational_inserts = self
1558 .tx_mgr
1559 .with_write_set(tx, |ws| ws.relational_inserts.clone())?;
1560
1561 for (table, row) in relational_inserts {
1562 let meta = self
1563 .table_meta(&table)
1564 .ok_or_else(|| Error::TableNotFound(table.clone()))?;
1565 for column in &meta.columns {
1566 let Some(reference) = &column.references else {
1567 continue;
1568 };
1569 let Some(value) = row.values.get(&column.name) else {
1570 continue;
1571 };
1572 if *value == Value::Null {
1573 continue;
1574 }
1575 if self
1576 .point_lookup_in_tx(tx, &reference.table, &reference.column, value, snapshot)?
1577 .is_none()
1578 {
1579 return Err(Error::ForeignKeyViolation {
1580 table: table.clone(),
1581 column: column.name.clone(),
1582 ref_table: reference.table.clone(),
1583 });
1584 }
1585 }
1586 }
1587
1588 Ok(())
1589 }
1590
1591 pub(crate) fn propagate_state_change_if_needed(
1592 &self,
1593 tx: TxId,
1594 table: &str,
1595 row_uuid: Option<uuid::Uuid>,
1596 new_state: Option<&str>,
1597 ) -> Result<()> {
1598 if let (Some(uuid), Some(state)) = (row_uuid, new_state) {
1599 let already_propagating = self
1600 .tx_mgr
1601 .with_write_set(tx, |ws| ws.propagation_in_progress)?;
1602 if !already_propagating {
1603 self.tx_mgr
1604 .with_write_set(tx, |ws| ws.propagation_in_progress = true)?;
1605 let propagate_result = self.propagate(tx, table, uuid, state);
1606 self.tx_mgr
1607 .with_write_set(tx, |ws| ws.propagation_in_progress = false)?;
1608 propagate_result?;
1609 }
1610 }
1611
1612 Ok(())
1613 }
1614
1615 fn propagate(
1616 &self,
1617 tx: TxId,
1618 table: &str,
1619 row_uuid: uuid::Uuid,
1620 new_state: &str,
1621 ) -> Result<()> {
1622 let snapshot = self.snapshot_for_read();
1623 let metas = self.relational_store().table_meta.read().clone();
1624 let mut queue: VecDeque<PropagationQueueEntry> = VecDeque::new();
1625 let mut visited: HashSet<(String, uuid::Uuid)> = HashSet::new();
1626 let mut abort_violation: Option<Error> = None;
1627 let ctx = PropagationContext {
1628 tx,
1629 snapshot,
1630 metas: &metas,
1631 };
1632 let root = PropagationSource {
1633 table,
1634 uuid: row_uuid,
1635 state: new_state,
1636 depth: 0,
1637 };
1638
1639 self.enqueue_fk_children(&ctx, &mut queue, root);
1640 self.enqueue_edge_children(&ctx, &mut queue, root)?;
1641 self.apply_vector_exclusions(&ctx, root)?;
1642
1643 while let Some(entry) = queue.pop_front() {
1644 if !visited.insert((entry.table.clone(), entry.uuid)) {
1645 continue;
1646 }
1647
1648 let Some(meta) = metas.get(&entry.table) else {
1649 continue;
1650 };
1651
1652 let Some(state_machine) = &meta.state_machine else {
1653 let msg = format!(
1654 "warning: propagation target table {} has no state machine",
1655 entry.table
1656 );
1657 eprintln!("{msg}");
1658 if entry.abort_on_failure && abort_violation.is_none() {
1659 abort_violation = Some(Error::PropagationAborted {
1660 table: entry.table.clone(),
1661 column: String::new(),
1662 from: String::new(),
1663 to: entry.target_state.clone(),
1664 });
1665 }
1666 continue;
1667 };
1668
1669 let state_column = state_machine.column.clone();
1670 let Some(existing) = self.relational.point_lookup_with_tx(
1671 Some(tx),
1672 &entry.table,
1673 "id",
1674 &Value::Uuid(entry.uuid),
1675 snapshot,
1676 )?
1677 else {
1678 continue;
1679 };
1680
1681 let from_state = existing
1682 .values
1683 .get(&state_column)
1684 .and_then(Value::as_text)
1685 .unwrap_or("")
1686 .to_string();
1687
1688 let mut next_values = existing.values.clone();
1689 next_values.insert(
1690 state_column.clone(),
1691 Value::Text(entry.target_state.clone()),
1692 );
1693
1694 let upsert_outcome =
1695 self.relational
1696 .upsert(tx, &entry.table, "id", next_values, snapshot);
1697
1698 let reached_state = match upsert_outcome {
1699 Ok(UpsertResult::Updated) => entry.target_state.as_str(),
1700 Ok(UpsertResult::NoOp) | Ok(UpsertResult::Inserted) => continue,
1701 Err(Error::InvalidStateTransition(_)) => {
1702 eprintln!(
1703 "warning: skipped invalid propagated transition {}.{} {} -> {}",
1704 entry.table, state_column, from_state, entry.target_state
1705 );
1706 if entry.abort_on_failure && abort_violation.is_none() {
1707 abort_violation = Some(Error::PropagationAborted {
1708 table: entry.table.clone(),
1709 column: state_column.clone(),
1710 from: from_state,
1711 to: entry.target_state.clone(),
1712 });
1713 }
1714 continue;
1715 }
1716 Err(err) => return Err(err),
1717 };
1718
1719 self.enqueue_edge_children(
1720 &ctx,
1721 &mut queue,
1722 PropagationSource {
1723 table: &entry.table,
1724 uuid: entry.uuid,
1725 state: reached_state,
1726 depth: entry.depth,
1727 },
1728 )?;
1729 self.apply_vector_exclusions(
1730 &ctx,
1731 PropagationSource {
1732 table: &entry.table,
1733 uuid: entry.uuid,
1734 state: reached_state,
1735 depth: entry.depth,
1736 },
1737 )?;
1738
1739 self.enqueue_fk_children(
1740 &ctx,
1741 &mut queue,
1742 PropagationSource {
1743 table: &entry.table,
1744 uuid: entry.uuid,
1745 state: reached_state,
1746 depth: entry.depth,
1747 },
1748 );
1749 }
1750
1751 if let Some(err) = abort_violation {
1752 return Err(err);
1753 }
1754
1755 Ok(())
1756 }
1757
1758 fn enqueue_fk_children(
1759 &self,
1760 ctx: &PropagationContext<'_>,
1761 queue: &mut VecDeque<PropagationQueueEntry>,
1762 source: PropagationSource<'_>,
1763 ) {
1764 for (owner_table, owner_meta) in ctx.metas {
1765 for rule in &owner_meta.propagation_rules {
1766 let PropagationRule::ForeignKey {
1767 fk_column,
1768 referenced_table,
1769 trigger_state,
1770 target_state,
1771 max_depth,
1772 abort_on_failure,
1773 ..
1774 } = rule
1775 else {
1776 continue;
1777 };
1778
1779 if referenced_table != source.table || trigger_state != source.state {
1780 continue;
1781 }
1782
1783 if source.depth >= *max_depth {
1784 continue;
1785 }
1786
1787 let rows = match self.relational.scan_filter_with_tx(
1788 Some(ctx.tx),
1789 owner_table,
1790 ctx.snapshot,
1791 &|row| row.values.get(fk_column) == Some(&Value::Uuid(source.uuid)),
1792 ) {
1793 Ok(rows) => rows,
1794 Err(err) => {
1795 eprintln!(
1796 "warning: propagation scan failed for {owner_table}.{fk_column}: {err}"
1797 );
1798 continue;
1799 }
1800 };
1801
1802 for row in rows {
1803 if let Some(id) = row.values.get("id").and_then(Value::as_uuid).copied() {
1804 queue.push_back(PropagationQueueEntry {
1805 table: owner_table.clone(),
1806 uuid: id,
1807 target_state: target_state.clone(),
1808 depth: source.depth + 1,
1809 abort_on_failure: *abort_on_failure,
1810 });
1811 }
1812 }
1813 }
1814 }
1815 }
1816
1817 fn enqueue_edge_children(
1818 &self,
1819 ctx: &PropagationContext<'_>,
1820 queue: &mut VecDeque<PropagationQueueEntry>,
1821 source: PropagationSource<'_>,
1822 ) -> Result<()> {
1823 let Some(meta) = ctx.metas.get(source.table) else {
1824 return Ok(());
1825 };
1826
1827 for rule in &meta.propagation_rules {
1828 let PropagationRule::Edge {
1829 edge_type,
1830 direction,
1831 trigger_state,
1832 target_state,
1833 max_depth,
1834 abort_on_failure,
1835 } = rule
1836 else {
1837 continue;
1838 };
1839
1840 if trigger_state != source.state || source.depth >= *max_depth {
1841 continue;
1842 }
1843
1844 let bfs = self.query_bfs(
1845 source.uuid,
1846 Some(std::slice::from_ref(edge_type)),
1847 *direction,
1848 1,
1849 ctx.snapshot,
1850 )?;
1851
1852 for node in bfs.nodes {
1853 if self
1854 .relational
1855 .point_lookup_with_tx(
1856 Some(ctx.tx),
1857 source.table,
1858 "id",
1859 &Value::Uuid(node.id),
1860 ctx.snapshot,
1861 )?
1862 .is_some()
1863 {
1864 queue.push_back(PropagationQueueEntry {
1865 table: source.table.to_string(),
1866 uuid: node.id,
1867 target_state: target_state.clone(),
1868 depth: source.depth + 1,
1869 abort_on_failure: *abort_on_failure,
1870 });
1871 }
1872 }
1873 }
1874
1875 Ok(())
1876 }
1877
1878 fn apply_vector_exclusions(
1879 &self,
1880 ctx: &PropagationContext<'_>,
1881 source: PropagationSource<'_>,
1882 ) -> Result<()> {
1883 let Some(meta) = ctx.metas.get(source.table) else {
1884 return Ok(());
1885 };
1886
1887 for rule in &meta.propagation_rules {
1888 let PropagationRule::VectorExclusion { trigger_state } = rule else {
1889 continue;
1890 };
1891 if trigger_state != source.state {
1892 continue;
1893 }
1894 for row_id in self.logical_row_ids_for_uuid(ctx.tx, source.table, source.uuid) {
1895 self.delete_vector(ctx.tx, row_id)?;
1896 }
1897 }
1898
1899 Ok(())
1900 }
1901
1902 pub fn delete_row(&self, tx: TxId, table: &str, row_id: RowId) -> Result<()> {
1903 self.relational.delete(tx, table, row_id)
1904 }
1905
1906 pub fn scan(&self, table: &str, snapshot: SnapshotId) -> Result<Vec<VersionedRow>> {
1907 self.relational.scan(table, snapshot)
1908 }
1909
1910 pub(crate) fn scan_in_tx(
1913 &self,
1914 tx: TxId,
1915 table: &str,
1916 snapshot: SnapshotId,
1917 ) -> Result<Vec<VersionedRow>> {
1918 self.relational.scan_with_tx(Some(tx), table, snapshot)
1919 }
1920
1921 pub(crate) fn index_scan_tx_overlay(
1925 &self,
1926 tx: TxId,
1927 table: &str,
1928 column: &str,
1929 shape: &crate::executor::IndexPredicateShape,
1930 ) -> Result<IndexScanTxOverlay> {
1931 use crate::executor::{IndexPredicateShape, range_includes};
1932 let mut overlay = IndexScanTxOverlay::default();
1933 self.tx_mgr.with_write_set(tx, |ws| {
1934 for (t, _row_id, _) in &ws.relational_deletes {
1935 if t == table {
1936 overlay.deleted_row_ids.insert(*_row_id);
1937 }
1938 }
1939 for (t, row) in &ws.relational_inserts {
1940 if t != table {
1941 continue;
1942 }
1943 let v = row.values.get(column).cloned().unwrap_or(Value::Null);
1944 let include = match shape {
1945 IndexPredicateShape::Equality(target) => v == *target,
1946 IndexPredicateShape::NotEqual(target) => v != *target,
1947 IndexPredicateShape::InList(list) => list.contains(&v),
1948 IndexPredicateShape::Range { lower, upper } => range_includes(&v, lower, upper),
1949 IndexPredicateShape::IsNull => v == Value::Null,
1950 IndexPredicateShape::IsNotNull => v != Value::Null,
1951 };
1952 if include {
1953 overlay.matching_inserts.push(row.clone());
1954 }
1955 }
1956 })?;
1957 Ok(overlay)
1958 }
1959
1960 pub fn scan_filter(
1961 &self,
1962 table: &str,
1963 snapshot: SnapshotId,
1964 predicate: &dyn Fn(&VersionedRow) -> bool,
1965 ) -> Result<Vec<VersionedRow>> {
1966 self.relational.scan_filter(table, snapshot, predicate)
1967 }
1968
1969 pub fn point_lookup(
1970 &self,
1971 table: &str,
1972 col: &str,
1973 value: &Value,
1974 snapshot: SnapshotId,
1975 ) -> Result<Option<VersionedRow>> {
1976 self.relational.point_lookup(table, col, value, snapshot)
1977 }
1978
1979 pub(crate) fn point_lookup_in_tx(
1980 &self,
1981 tx: TxId,
1982 table: &str,
1983 col: &str,
1984 value: &Value,
1985 snapshot: SnapshotId,
1986 ) -> Result<Option<VersionedRow>> {
1987 self.relational
1988 .point_lookup_with_tx(Some(tx), table, col, value, snapshot)
1989 }
1990
1991 pub(crate) fn logical_row_ids_for_uuid(
1992 &self,
1993 tx: TxId,
1994 table: &str,
1995 uuid: uuid::Uuid,
1996 ) -> Vec<RowId> {
1997 let mut row_ids = HashSet::new();
1998
1999 if let Some(rows) = self.relational_store.tables.read().get(table) {
2000 for row in rows {
2001 if row.values.get("id") == Some(&Value::Uuid(uuid)) {
2002 row_ids.insert(row.row_id);
2003 }
2004 }
2005 }
2006
2007 let _ = self.tx_mgr.with_write_set(tx, |ws| {
2008 for (insert_table, row) in &ws.relational_inserts {
2009 if insert_table == table && row.values.get("id") == Some(&Value::Uuid(uuid)) {
2010 row_ids.insert(row.row_id);
2011 }
2012 }
2013 });
2014
2015 row_ids.into_iter().collect()
2016 }
2017
2018 pub fn insert_edge(
2019 &self,
2020 tx: TxId,
2021 source: NodeId,
2022 target: NodeId,
2023 edge_type: EdgeType,
2024 properties: HashMap<String, Value>,
2025 ) -> Result<bool> {
2026 let bytes = estimate_edge_bytes(source, target, &edge_type, &properties);
2027 self.accountant.try_allocate_for(
2028 bytes,
2029 "graph_insert",
2030 "insert_edge",
2031 "Reduce edge fan-out or raise MEMORY_LIMIT before inserting more graph edges.",
2032 )?;
2033
2034 match self
2035 .graph
2036 .insert_edge(tx, source, target, edge_type, properties)
2037 {
2038 Ok(inserted) => {
2039 if !inserted {
2040 self.accountant.release(bytes);
2041 }
2042 Ok(inserted)
2043 }
2044 Err(err) => {
2045 self.accountant.release(bytes);
2046 Err(err)
2047 }
2048 }
2049 }
2050
2051 pub fn delete_edge(
2052 &self,
2053 tx: TxId,
2054 source: NodeId,
2055 target: NodeId,
2056 edge_type: &str,
2057 ) -> Result<()> {
2058 self.graph.delete_edge(tx, source, target, edge_type)
2059 }
2060
2061 pub fn query_bfs(
2062 &self,
2063 start: NodeId,
2064 edge_types: Option<&[EdgeType]>,
2065 direction: Direction,
2066 max_depth: u32,
2067 snapshot: SnapshotId,
2068 ) -> Result<TraversalResult> {
2069 self.graph
2070 .bfs(start, edge_types, direction, 1, max_depth, snapshot)
2071 }
2072
2073 pub fn edge_count(
2074 &self,
2075 source: NodeId,
2076 edge_type: &str,
2077 snapshot: SnapshotId,
2078 ) -> Result<usize> {
2079 Ok(self.graph.edge_count(source, edge_type, snapshot))
2080 }
2081
2082 pub fn get_edge_properties(
2083 &self,
2084 source: NodeId,
2085 target: NodeId,
2086 edge_type: &str,
2087 snapshot: SnapshotId,
2088 ) -> Result<Option<HashMap<String, Value>>> {
2089 let props = self
2090 .graph_store
2091 .forward_adj
2092 .read()
2093 .get(&source)
2094 .and_then(|entries| {
2095 entries
2096 .iter()
2097 .rev()
2098 .find(|entry| {
2099 entry.target == target
2100 && entry.edge_type == edge_type
2101 && entry.visible_at(snapshot)
2102 })
2103 .map(|entry| entry.properties.clone())
2104 });
2105 Ok(props)
2106 }
2107
2108 pub fn insert_vector(&self, tx: TxId, row_id: RowId, vector: Vec<f32>) -> Result<()> {
2109 let bytes = estimate_vector_bytes(&vector);
2110 self.accountant.try_allocate_for(
2111 bytes,
2112 "insert",
2113 "vector_insert",
2114 "Reduce vector dimensionality, insert fewer rows, or raise MEMORY_LIMIT.",
2115 )?;
2116 self.vector
2117 .insert_vector(tx, row_id, vector)
2118 .inspect_err(|_| self.accountant.release(bytes))
2119 }
2120
2121 pub fn delete_vector(&self, tx: TxId, row_id: RowId) -> Result<()> {
2122 self.vector.delete_vector(tx, row_id)
2123 }
2124
2125 pub fn query_vector(
2126 &self,
2127 query: &[f32],
2128 k: usize,
2129 candidates: Option<&RoaringTreemap>,
2130 snapshot: SnapshotId,
2131 ) -> Result<Vec<(RowId, f32)>> {
2132 self.vector.search(query, k, candidates, snapshot)
2133 }
2134
2135 pub fn has_live_vector(&self, row_id: RowId, snapshot: SnapshotId) -> bool {
2136 self.vector_store
2137 .vectors
2138 .read()
2139 .iter()
2140 .any(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
2141 }
2142
2143 pub fn live_vector_entry(&self, row_id: RowId, snapshot: SnapshotId) -> Option<VectorEntry> {
2144 self.vector_store
2145 .vectors
2146 .read()
2147 .iter()
2148 .rev()
2149 .find(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
2150 .cloned()
2151 }
2152
2153 pub(crate) fn drop_table_aux_state(&self, table: &str) {
2154 let snapshot = self.snapshot_for_read();
2155 let rows = self.scan(table, snapshot).unwrap_or_default();
2156 let row_ids: HashSet<RowId> = rows.iter().map(|row| row.row_id).collect();
2157 let edge_keys: HashSet<(NodeId, EdgeType, NodeId)> = rows
2158 .iter()
2159 .filter_map(|row| {
2160 match (
2161 row.values.get("source_id").and_then(Value::as_uuid),
2162 row.values.get("target_id").and_then(Value::as_uuid),
2163 row.values.get("edge_type").and_then(Value::as_text),
2164 ) {
2165 (Some(source), Some(target), Some(edge_type)) => {
2166 Some((*source, edge_type.to_string(), *target))
2167 }
2168 _ => None,
2169 }
2170 })
2171 .collect();
2172
2173 if !row_ids.is_empty() {
2174 let mut vectors = self.vector_store.vectors.write();
2175 vectors.retain(|entry| !row_ids.contains(&entry.row_id));
2176 self.vector_store.clear_hnsw(self.accountant());
2177 }
2178
2179 if !edge_keys.is_empty() {
2180 {
2181 let mut forward = self.graph_store.forward_adj.write();
2182 for entries in forward.values_mut() {
2183 entries.retain(|entry| {
2184 !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
2185 });
2186 }
2187 forward.retain(|_, entries| !entries.is_empty());
2188 }
2189 {
2190 let mut reverse = self.graph_store.reverse_adj.write();
2191 for entries in reverse.values_mut() {
2192 entries.retain(|entry| {
2193 !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
2194 });
2195 }
2196 reverse.retain(|_, entries| !entries.is_empty());
2197 }
2198 }
2199 }
2200
2201 pub fn table_names(&self) -> Vec<String> {
2202 self.relational_store.table_names()
2203 }
2204
2205 pub fn table_meta(&self, table: &str) -> Option<TableMeta> {
2206 self.relational_store.table_meta(table)
2207 }
2208
2209 #[doc(hidden)]
2213 pub fn execute_at_snapshot(
2214 &self,
2215 sql: &str,
2216 params: &HashMap<String, Value>,
2217 snapshot: SnapshotId,
2218 ) -> Result<QueryResult> {
2219 SNAPSHOT_OVERRIDE.with(|cell| {
2220 let prior = cell.replace(Some(snapshot));
2221 let r = self.execute(sql, params);
2222 cell.replace(prior);
2223 r
2224 })
2225 }
2226
2227 pub(crate) fn snapshot_for_read(&self) -> SnapshotId {
2228 SNAPSHOT_OVERRIDE.with(|cell| cell.borrow().unwrap_or_else(|| self.snapshot()))
2229 }
2230
2231 #[doc(hidden)]
2236 pub fn change_log_rows_since(&self, since: Lsn) -> Result<Vec<RowChange>> {
2237 let entries = self.change_log_since(since);
2238 let tables = self.relational_store.tables.read();
2239 let mut out = Vec::new();
2240 for e in entries {
2241 match e {
2242 ChangeLogEntry::RowInsert { table, row_id, lsn } => {
2243 let Some(rows) = tables.get(&table) else {
2244 continue;
2245 };
2246 let Some(row) = rows.iter().find(|r| r.row_id == row_id) else {
2247 continue;
2248 };
2249 let natural_key = row
2250 .values
2251 .get("id")
2252 .cloned()
2253 .map(|value| NaturalKey {
2254 column: "id".to_string(),
2255 value,
2256 })
2257 .unwrap_or_else(|| NaturalKey {
2258 column: "id".to_string(),
2259 value: Value::Int64(row_id.0 as i64),
2260 });
2261 out.push(RowChange {
2262 table,
2263 natural_key,
2264 values: row.values.clone(),
2265 deleted: row.deleted_tx.is_some(),
2266 lsn,
2267 });
2268 }
2269 ChangeLogEntry::RowDelete {
2270 table,
2271 row_id: _,
2272 natural_key,
2273 lsn,
2274 } => {
2275 out.push(RowChange {
2276 table,
2277 natural_key,
2278 values: HashMap::new(),
2279 deleted: true,
2280 lsn,
2281 });
2282 }
2283 _ => {}
2284 }
2285 }
2286 Ok(out)
2287 }
2288
2289 #[doc(hidden)]
2291 pub fn __rows_examined(&self) -> u64 {
2292 self.rows_examined.load(Ordering::SeqCst)
2293 }
2294
2295 #[doc(hidden)]
2296 pub fn __reset_rows_examined(&self) {
2297 self.rows_examined.store(0, Ordering::SeqCst);
2298 }
2299
2300 #[doc(hidden)]
2301 pub fn __bump_rows_examined(&self, delta: u64) {
2302 self.rows_examined.fetch_add(delta, Ordering::SeqCst);
2303 }
2304
2305 #[doc(hidden)]
2308 pub fn __index_write_lock_count(&self) -> u64 {
2309 self.relational_store.index_write_lock_count()
2310 }
2311
2312 #[doc(hidden)]
2314 pub fn __introspect_indexes_total_entries(&self) -> u64 {
2315 self.relational_store.introspect_indexes_total_entries()
2316 }
2317
2318 #[doc(hidden)]
2323 pub fn __probe_constraint_check(
2324 &self,
2325 table: &str,
2326 column: &str,
2327 value: Value,
2328 ) -> Result<QueryResult> {
2329 let covered = self.index_covers_column(table, column)
2330 || self
2331 .relational_store
2332 .indexes
2333 .read()
2334 .iter()
2335 .any(|((t, _), idx)| {
2336 t == table && idx.columns.first().is_some_and(|(c, _)| c == column)
2337 });
2338 let trace = if covered {
2339 QueryTrace {
2340 physical_plan: "IndexScan",
2341 index_used: None,
2342 predicates_pushed: Default::default(),
2343 indexes_considered: Default::default(),
2344 sort_elided: false,
2345 }
2346 } else {
2347 QueryTrace::scan()
2348 };
2349 let _ = value;
2350 Ok(QueryResult {
2351 columns: vec![],
2352 rows: vec![],
2353 rows_affected: 0,
2354 trace,
2355 cascade: None,
2356 })
2357 }
2358
2359 pub fn run_pruning_cycle(&self) -> u64 {
2361 let _guard = self.pruning_guard.lock();
2362 prune_expired_rows(
2363 &self.relational_store,
2364 &self.graph_store,
2365 &self.vector_store,
2366 self.accountant(),
2367 self.persistence.as_ref(),
2368 self.sync_watermark(),
2369 )
2370 }
2371
2372 pub fn set_pruning_interval(&self, interval: Duration) {
2374 self.stop_pruning_thread();
2375
2376 let shutdown = Arc::new(AtomicBool::new(false));
2377 let relational = self.relational_store.clone();
2378 let graph = self.graph_store.clone();
2379 let vector = self.vector_store.clone();
2380 let accountant = self.accountant.clone();
2381 let persistence = self.persistence.clone();
2382 let sync_watermark = self.sync_watermark.clone();
2383 let pruning_guard = self.pruning_guard.clone();
2384 let thread_shutdown = shutdown.clone();
2385
2386 let handle = thread::spawn(move || {
2387 while !thread_shutdown.load(Ordering::SeqCst) {
2388 {
2389 let _guard = pruning_guard.lock();
2390 let _ = prune_expired_rows(
2391 &relational,
2392 &graph,
2393 &vector,
2394 accountant.as_ref(),
2395 persistence.as_ref(),
2396 sync_watermark.load(Ordering::SeqCst),
2397 );
2398 }
2399 sleep_with_shutdown(&thread_shutdown, interval);
2400 }
2401 });
2402
2403 let mut runtime = self.pruning_runtime.lock();
2404 runtime.shutdown = shutdown;
2405 runtime.handle = Some(handle);
2406 }
2407
2408 pub fn sync_watermark(&self) -> Lsn {
2409 self.sync_watermark.load(Ordering::SeqCst)
2410 }
2411
2412 pub fn set_sync_watermark(&self, watermark: Lsn) {
2413 self.sync_watermark.store(watermark, Ordering::SeqCst);
2414 }
2415
2416 pub fn instance_id(&self) -> uuid::Uuid {
2417 self.instance_id
2418 }
2419
2420 pub fn open_memory_with_plugin_and_accountant(
2421 plugin: Arc<dyn DatabasePlugin>,
2422 accountant: Arc<MemoryAccountant>,
2423 ) -> Result<Self> {
2424 Self::open_memory_internal(plugin, accountant)
2425 }
2426
2427 pub fn open_memory_with_plugin(plugin: Arc<dyn DatabasePlugin>) -> Result<Self> {
2428 let db = Self::open_memory_with_plugin_and_accountant(
2429 plugin,
2430 Arc::new(MemoryAccountant::no_limit()),
2431 )?;
2432 db.plugin.on_open()?;
2433 Ok(db)
2434 }
2435
2436 pub fn close(&self) -> Result<()> {
2437 if self.closed.swap(true, Ordering::SeqCst) {
2438 return Ok(());
2439 }
2440 let tx = self.session_tx.lock().take();
2441 if let Some(tx) = tx {
2442 self.rollback(tx)?;
2443 }
2444 self.stop_pruning_thread();
2445 self.subscriptions.lock().subscribers.clear();
2446 if let Some(persistence) = &self.persistence {
2447 persistence.close();
2448 }
2449 self.plugin.on_close()
2450 }
2451
2452 pub fn open_with_plugin(
2454 path: impl AsRef<Path>,
2455 plugin: Arc<dyn DatabasePlugin>,
2456 ) -> Result<Self> {
2457 let db = Self::open_loaded(path, plugin, Arc::new(MemoryAccountant::no_limit()), None)?;
2458 db.plugin.on_open()?;
2459 Ok(db)
2460 }
2461
2462 pub fn open_with_config(
2464 path: impl AsRef<Path>,
2465 plugin: Arc<dyn DatabasePlugin>,
2466 accountant: Arc<MemoryAccountant>,
2467 ) -> Result<Self> {
2468 Self::open_with_config_and_disk_limit(path, plugin, accountant, None)
2469 }
2470
2471 pub fn open_with_config_and_disk_limit(
2472 path: impl AsRef<Path>,
2473 plugin: Arc<dyn DatabasePlugin>,
2474 accountant: Arc<MemoryAccountant>,
2475 startup_disk_limit: Option<u64>,
2476 ) -> Result<Self> {
2477 let path = path.as_ref();
2478 if path.as_os_str() == ":memory:" {
2479 return Self::open_memory_with_plugin_and_accountant(plugin, accountant);
2480 }
2481 let accountant = if let Some(limit) = persisted_memory_limit(path)? {
2482 let usage = accountant.usage();
2483 if usage.limit.is_none() && usage.startup_ceiling.is_none() {
2484 Arc::new(MemoryAccountant::with_budget(limit))
2485 } else {
2486 accountant
2487 }
2488 } else {
2489 accountant
2490 };
2491 let db = Self::open_loaded(path, plugin, accountant, startup_disk_limit)?;
2492 db.plugin.on_open()?;
2493 Ok(db)
2494 }
2495
2496 pub fn open_memory_with_accountant(accountant: Arc<MemoryAccountant>) -> Self {
2498 Self::open_memory_internal(Arc::new(CorePlugin), accountant)
2499 .expect("failed to open in-memory database with accountant")
2500 }
2501
2502 pub fn accountant(&self) -> &MemoryAccountant {
2504 &self.accountant
2505 }
2506
2507 fn account_loaded_state(&self) -> Result<()> {
2508 let metadata_bytes = self
2509 .relational_store
2510 .table_meta
2511 .read()
2512 .values()
2513 .fold(0usize, |acc, meta| {
2514 acc.saturating_add(meta.estimated_bytes())
2515 });
2516 self.accountant.try_allocate_for(
2517 metadata_bytes,
2518 "open",
2519 "load_table_metadata",
2520 "Open the database with a larger MEMORY_LIMIT or reduce stored schema metadata.",
2521 )?;
2522
2523 let row_bytes =
2524 self.relational_store
2525 .tables
2526 .read()
2527 .iter()
2528 .fold(0usize, |acc, (table, rows)| {
2529 let meta = self.table_meta(table);
2530 acc.saturating_add(rows.iter().fold(0usize, |inner, row| {
2531 inner.saturating_add(meta.as_ref().map_or_else(
2532 || row.estimated_bytes(),
2533 |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
2534 ))
2535 }))
2536 });
2537 self.accountant.try_allocate_for(
2538 row_bytes,
2539 "open",
2540 "load_rows",
2541 "Open the database with a larger MEMORY_LIMIT or prune retained rows first.",
2542 )?;
2543
2544 let edge_bytes = self
2545 .graph_store
2546 .forward_adj
2547 .read()
2548 .values()
2549 .flatten()
2550 .filter(|edge| edge.deleted_tx.is_none())
2551 .fold(0usize, |acc, edge| {
2552 acc.saturating_add(edge.estimated_bytes())
2553 });
2554 self.accountant.try_allocate_for(
2555 edge_bytes,
2556 "open",
2557 "load_edges",
2558 "Open the database with a larger MEMORY_LIMIT or reduce graph edge volume.",
2559 )?;
2560
2561 let vector_bytes = self
2562 .vector_store
2563 .vectors
2564 .read()
2565 .iter()
2566 .filter(|entry| entry.deleted_tx.is_none())
2567 .fold(0usize, |acc, entry| {
2568 acc.saturating_add(entry.estimated_bytes())
2569 });
2570 self.accountant.try_allocate_for(
2571 vector_bytes,
2572 "open",
2573 "load_vectors",
2574 "Open the database with a larger MEMORY_LIMIT or reduce stored vector data.",
2575 )?;
2576
2577 Ok(())
2578 }
2579
2580 fn release_insert_allocations(&self, ws: &contextdb_tx::WriteSet) {
2581 for (table, row) in &ws.relational_inserts {
2582 let bytes = self
2583 .table_meta(table)
2584 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
2585 .unwrap_or_else(|| row.estimated_bytes());
2586 self.accountant.release(bytes);
2587 }
2588
2589 for edge in &ws.adj_inserts {
2590 self.accountant.release(edge.estimated_bytes());
2591 }
2592
2593 for entry in &ws.vector_inserts {
2594 self.accountant.release(entry.estimated_bytes());
2595 }
2596 }
2597
2598 fn release_delete_allocations(&self, ws: &contextdb_tx::WriteSet) {
2599 for (table, row_id, _) in &ws.relational_deletes {
2600 if let Some(row) = self.find_row_by_id(table, *row_id) {
2601 let bytes = self
2602 .table_meta(table)
2603 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
2604 .unwrap_or_else(|| row.estimated_bytes());
2605 self.accountant.release(bytes);
2606 }
2607 }
2608
2609 for (source, edge_type, target, _) in &ws.adj_deletes {
2610 if let Some(edge) = self.find_edge(source, target, edge_type) {
2611 self.accountant.release(edge.estimated_bytes());
2612 }
2613 }
2614
2615 for (row_id, _) in &ws.vector_deletes {
2616 if let Some(vector) = self.find_vector_by_row_id(*row_id) {
2617 self.accountant.release(vector.estimated_bytes());
2618 }
2619 }
2620
2621 if !ws.vector_deletes.is_empty() {
2622 self.vector_store.clear_hnsw(self.accountant());
2623 }
2624 }
2625
2626 fn find_row_by_id(&self, table: &str, row_id: RowId) -> Option<VersionedRow> {
2627 self.relational_store
2628 .tables
2629 .read()
2630 .get(table)
2631 .and_then(|rows| rows.iter().find(|row| row.row_id == row_id))
2632 .cloned()
2633 }
2634
2635 fn find_vector_by_row_id(&self, row_id: RowId) -> Option<VectorEntry> {
2636 self.vector_store
2637 .vectors
2638 .read()
2639 .iter()
2640 .find(|entry| entry.row_id == row_id)
2641 .cloned()
2642 }
2643
2644 fn find_edge(&self, source: &NodeId, target: &NodeId, edge_type: &str) -> Option<AdjEntry> {
2645 self.graph_store
2646 .forward_adj
2647 .read()
2648 .get(source)
2649 .and_then(|entries| {
2650 entries
2651 .iter()
2652 .find(|entry| entry.target == *target && entry.edge_type == edge_type)
2653 .cloned()
2654 })
2655 }
2656
2657 pub(crate) fn write_set_checkpoint(
2658 &self,
2659 tx: TxId,
2660 ) -> Result<(usize, usize, usize, usize, usize)> {
2661 self.tx_mgr.with_write_set(tx, |ws| {
2662 (
2663 ws.relational_inserts.len(),
2664 ws.relational_deletes.len(),
2665 ws.adj_inserts.len(),
2666 ws.vector_inserts.len(),
2667 ws.vector_deletes.len(),
2668 )
2669 })
2670 }
2671
2672 pub(crate) fn restore_write_set_checkpoint(
2673 &self,
2674 tx: TxId,
2675 checkpoint: (usize, usize, usize, usize, usize),
2676 ) -> Result<()> {
2677 self.tx_mgr.with_write_set(tx, |ws| {
2678 ws.relational_inserts.truncate(checkpoint.0);
2679 ws.relational_deletes.truncate(checkpoint.1);
2680 ws.adj_inserts.truncate(checkpoint.2);
2681 ws.vector_inserts.truncate(checkpoint.3);
2682 ws.vector_deletes.truncate(checkpoint.4);
2683 })
2684 }
2685
2686 pub fn conflict_policies(&self) -> ConflictPolicies {
2688 self.conflict_policies.read().clone()
2689 }
2690
2691 pub fn set_default_conflict_policy(&self, policy: ConflictPolicy) {
2693 self.conflict_policies.write().default = policy;
2694 }
2695
2696 pub fn set_table_conflict_policy(&self, table: &str, policy: ConflictPolicy) {
2698 self.conflict_policies
2699 .write()
2700 .per_table
2701 .insert(table.to_string(), policy);
2702 }
2703
2704 pub fn drop_table_conflict_policy(&self, table: &str) {
2706 self.conflict_policies.write().per_table.remove(table);
2707 }
2708
2709 pub fn plugin(&self) -> &dyn DatabasePlugin {
2710 self.plugin.as_ref()
2711 }
2712
2713 pub fn plugin_health(&self) -> PluginHealth {
2714 self.plugin.health()
2715 }
2716
2717 pub fn plugin_describe(&self) -> serde_json::Value {
2718 self.plugin.describe()
2719 }
2720
2721 pub(crate) fn graph(&self) -> &MemGraphExecutor<DynStore> {
2722 &self.graph
2723 }
2724
2725 pub(crate) fn relational_store(&self) -> &Arc<RelationalStore> {
2726 &self.relational_store
2727 }
2728
2729 pub(crate) fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
2730 where
2731 F: FnOnce(Lsn) -> R,
2732 {
2733 self.tx_mgr.allocate_ddl_lsn(f)
2734 }
2735
2736 pub(crate) fn with_commit_lock<F, R>(&self, f: F) -> R
2737 where
2738 F: FnOnce() -> R,
2739 {
2740 self.tx_mgr.with_commit_lock(f)
2741 }
2742
2743 pub(crate) fn log_create_table_ddl(
2744 &self,
2745 name: &str,
2746 meta: &TableMeta,
2747 lsn: Lsn,
2748 ) -> Result<()> {
2749 let change = ddl_change_from_meta(name, meta);
2750 self.ddl_log.write().push((lsn, change.clone()));
2751 if let Some(persistence) = &self.persistence {
2752 persistence.append_ddl_log(lsn, &change)?;
2753 }
2754 Ok(())
2755 }
2756
2757 pub(crate) fn log_drop_table_ddl(&self, name: &str, lsn: Lsn) -> Result<()> {
2758 let change = DdlChange::DropTable {
2759 name: name.to_string(),
2760 };
2761 self.ddl_log.write().push((lsn, change.clone()));
2762 if let Some(persistence) = &self.persistence {
2763 persistence.append_ddl_log(lsn, &change)?;
2764 }
2765 Ok(())
2766 }
2767
2768 pub(crate) fn log_alter_table_ddl(&self, name: &str, meta: &TableMeta, lsn: Lsn) -> Result<()> {
2769 let change = DdlChange::AlterTable {
2770 name: name.to_string(),
2771 columns: meta
2772 .columns
2773 .iter()
2774 .map(|c| {
2775 (
2776 c.name.clone(),
2777 sql_type_for_meta_column(c, &meta.propagation_rules),
2778 )
2779 })
2780 .collect(),
2781 constraints: create_table_constraints_from_meta(meta),
2782 };
2783 self.ddl_log.write().push((lsn, change.clone()));
2784 if let Some(persistence) = &self.persistence {
2785 persistence.append_ddl_log(lsn, &change)?;
2786 }
2787 Ok(())
2788 }
2789
2790 pub(crate) fn log_create_index_ddl(
2791 &self,
2792 table: &str,
2793 name: &str,
2794 columns: &[(String, contextdb_core::SortDirection)],
2795 lsn: Lsn,
2796 ) -> Result<()> {
2797 let change = DdlChange::CreateIndex {
2798 table: table.to_string(),
2799 name: name.to_string(),
2800 columns: columns.to_vec(),
2801 };
2802 self.ddl_log.write().push((lsn, change.clone()));
2803 if let Some(persistence) = &self.persistence {
2804 persistence.append_ddl_log(lsn, &change)?;
2805 }
2806 Ok(())
2807 }
2808
2809 pub(crate) fn log_drop_index_ddl(&self, table: &str, name: &str, lsn: Lsn) -> Result<()> {
2810 let change = DdlChange::DropIndex {
2811 table: table.to_string(),
2812 name: name.to_string(),
2813 };
2814 self.ddl_log.write().push((lsn, change.clone()));
2815 if let Some(persistence) = &self.persistence {
2816 persistence.append_ddl_log(lsn, &change)?;
2817 }
2818 Ok(())
2819 }
2820
2821 pub(crate) fn persist_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
2822 if let Some(persistence) = &self.persistence {
2823 persistence.flush_table_meta(name, meta)?;
2824 }
2825 Ok(())
2826 }
2827
2828 pub(crate) fn persist_memory_limit(&self, limit: Option<usize>) -> Result<()> {
2829 if let Some(persistence) = &self.persistence {
2830 match limit {
2831 Some(limit) => persistence.flush_config_value("memory_limit", &limit)?,
2832 None => persistence.remove_config_value("memory_limit")?,
2833 }
2834 }
2835 Ok(())
2836 }
2837
2838 pub fn set_disk_limit(&self, limit: Option<u64>) -> Result<()> {
2839 if self.persistence.is_none() {
2840 self.disk_limit.store(0, Ordering::SeqCst);
2841 return Ok(());
2842 }
2843
2844 let ceiling = self.disk_limit_startup_ceiling();
2845 if let Some(ceiling) = ceiling {
2846 match limit {
2847 Some(bytes) if bytes > ceiling => {
2848 return Err(Error::Other(format!(
2849 "disk limit {bytes} exceeds startup ceiling {ceiling}"
2850 )));
2851 }
2852 None => {
2853 return Err(Error::Other(
2854 "cannot remove disk limit when a startup ceiling is set".to_string(),
2855 ));
2856 }
2857 _ => {}
2858 }
2859 }
2860
2861 self.disk_limit.store(limit.unwrap_or(0), Ordering::SeqCst);
2862 Ok(())
2863 }
2864
2865 pub fn disk_limit(&self) -> Option<u64> {
2866 match self.disk_limit.load(Ordering::SeqCst) {
2867 0 => None,
2868 bytes => Some(bytes),
2869 }
2870 }
2871
2872 pub fn disk_limit_startup_ceiling(&self) -> Option<u64> {
2873 match self.disk_limit_startup_ceiling.load(Ordering::SeqCst) {
2874 0 => None,
2875 bytes => Some(bytes),
2876 }
2877 }
2878
2879 pub fn disk_file_size(&self) -> Option<u64> {
2880 self.persistence
2881 .as_ref()
2882 .map(|persistence| std::fs::metadata(persistence.path()).map(|meta| meta.len()))
2883 .transpose()
2884 .ok()
2885 .flatten()
2886 }
2887
2888 pub(crate) fn persist_disk_limit(&self, limit: Option<u64>) -> Result<()> {
2889 if let Some(persistence) = &self.persistence {
2890 match limit {
2891 Some(limit) => persistence.flush_config_value("disk_limit", &limit)?,
2892 None => persistence.remove_config_value("disk_limit")?,
2893 }
2894 }
2895 Ok(())
2896 }
2897
2898 pub fn check_disk_budget(&self, operation: &str) -> Result<()> {
2899 let Some(limit) = self.disk_limit() else {
2900 return Ok(());
2901 };
2902 let Some(current_bytes) = self.disk_file_size() else {
2903 return Ok(());
2904 };
2905 if current_bytes < limit {
2906 return Ok(());
2907 }
2908 Err(Error::DiskBudgetExceeded {
2909 operation: operation.to_string(),
2910 current_bytes,
2911 budget_limit_bytes: limit,
2912 hint: "Reduce retained file-backed data or raise DISK_LIMIT before writing more data."
2913 .to_string(),
2914 })
2915 }
2916
2917 pub fn persisted_sync_watermarks(&self, tenant_id: &str) -> Result<(Lsn, Lsn)> {
2918 let Some(persistence) = &self.persistence else {
2919 return Ok((Lsn(0), Lsn(0)));
2920 };
2921 let push = persistence
2922 .load_config_value::<u64>(&format!("sync_push_watermark:{tenant_id}"))?
2923 .map(Lsn)
2924 .unwrap_or(Lsn(0));
2925 let pull = persistence
2926 .load_config_value::<u64>(&format!("sync_pull_watermark:{tenant_id}"))?
2927 .map(Lsn)
2928 .unwrap_or(Lsn(0));
2929 Ok((push, pull))
2930 }
2931
2932 pub fn persist_sync_push_watermark(&self, tenant_id: &str, watermark: Lsn) -> Result<()> {
2933 if let Some(persistence) = &self.persistence {
2934 persistence
2935 .flush_config_value(&format!("sync_push_watermark:{tenant_id}"), &watermark.0)?;
2936 }
2937 Ok(())
2938 }
2939
2940 pub fn persist_sync_pull_watermark(&self, tenant_id: &str, watermark: Lsn) -> Result<()> {
2941 if let Some(persistence) = &self.persistence {
2942 persistence
2943 .flush_config_value(&format!("sync_pull_watermark:{tenant_id}"), &watermark.0)?;
2944 }
2945 Ok(())
2946 }
2947
2948 pub(crate) fn persist_table_rows(&self, name: &str) -> Result<()> {
2949 if let Some(persistence) = &self.persistence {
2950 let tables = self.relational_store.tables.read();
2951 if let Some(rows) = tables.get(name) {
2952 persistence.rewrite_table_rows(name, rows)?;
2953 }
2954 }
2955 Ok(())
2956 }
2957
2958 pub(crate) fn remove_persisted_table(&self, name: &str) -> Result<()> {
2959 if let Some(persistence) = &self.persistence {
2960 persistence.remove_table_meta(name)?;
2961 persistence.remove_table_data(name)?;
2962 }
2963 Ok(())
2964 }
2965
2966 pub fn change_log_since(&self, since_lsn: Lsn) -> Vec<ChangeLogEntry> {
2967 let log = self.change_log.read();
2968 let start = log.partition_point(|e| e.lsn() <= since_lsn);
2969 log[start..].to_vec()
2970 }
2971
2972 pub fn ddl_log_since(&self, since_lsn: Lsn) -> Vec<DdlChange> {
2973 let ddl = self.ddl_log.read();
2974 let start = ddl.partition_point(|(lsn, _)| *lsn <= since_lsn);
2975 ddl[start..].iter().map(|(_, c)| c.clone()).collect()
2976 }
2977
2978 #[allow(dead_code)]
2981 fn full_state_snapshot(&self) -> ChangeSet {
2982 let mut rows = Vec::new();
2983 let mut edges = Vec::new();
2984 let mut vectors = Vec::new();
2985 let mut ddl = Vec::new();
2986
2987 let meta_guard = self.relational_store.table_meta.read();
2988 let tables_guard = self.relational_store.tables.read();
2989
2990 for (name, meta) in meta_guard.iter() {
2992 ddl.push(ddl_change_from_meta(name, meta));
2993 }
2994
2995 let mut live_row_ids: HashSet<RowId> = HashSet::new();
2997 for (table_name, table_rows) in tables_guard.iter() {
2998 let meta = match meta_guard.get(table_name) {
2999 Some(m) => m,
3000 None => continue,
3001 };
3002 let key_col = meta
3003 .natural_key_column
3004 .clone()
3005 .or_else(|| {
3006 if meta
3007 .columns
3008 .iter()
3009 .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3010 {
3011 Some("id".to_string())
3012 } else {
3013 None
3014 }
3015 })
3016 .or_else(|| {
3017 meta.columns
3018 .iter()
3019 .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
3020 .map(|c| c.name.clone())
3021 })
3022 .unwrap_or_default();
3023 if key_col.is_empty() {
3024 continue;
3025 }
3026 for row in table_rows.iter().filter(|r| r.deleted_tx.is_none()) {
3027 let key_val = match row.values.get(&key_col) {
3028 Some(v) => v.clone(),
3029 None => continue,
3030 };
3031 live_row_ids.insert(row.row_id);
3032 rows.push(RowChange {
3033 table: table_name.clone(),
3034 natural_key: NaturalKey {
3035 column: key_col.clone(),
3036 value: key_val,
3037 },
3038 values: row.values.clone(),
3039 deleted: false,
3040 lsn: row.lsn,
3041 });
3042 }
3043 }
3044
3045 drop(tables_guard);
3046 drop(meta_guard);
3047
3048 let fwd = self.graph_store.forward_adj.read();
3050 for (_source, entries) in fwd.iter() {
3051 for entry in entries.iter().filter(|e| e.deleted_tx.is_none()) {
3052 edges.push(EdgeChange {
3053 source: entry.source,
3054 target: entry.target,
3055 edge_type: entry.edge_type.clone(),
3056 properties: entry.properties.clone(),
3057 lsn: entry.lsn,
3058 });
3059 }
3060 }
3061 drop(fwd);
3062
3063 let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
3065 let vecs = self.vector_store.vectors.read();
3066 for entry in vecs.iter().filter(|v| v.deleted_tx.is_none()) {
3067 if !live_row_ids.contains(&entry.row_id) {
3068 continue; }
3070 if !seen_vector_rows.insert(entry.row_id) {
3071 continue; }
3073 vectors.push(VectorChange {
3074 row_id: entry.row_id,
3075 vector: entry.vector.clone(),
3076 lsn: entry.lsn,
3077 });
3078 }
3079 drop(vecs);
3080
3081 ChangeSet {
3082 rows,
3083 edges,
3084 vectors,
3085 ddl,
3086 }
3087 }
3088
3089 fn persisted_state_since(&self, since_lsn: Lsn) -> ChangeSet {
3090 if since_lsn == Lsn(0) {
3091 return self.full_state_snapshot();
3092 }
3093
3094 let mut rows = Vec::new();
3095 let mut edges = Vec::new();
3096 let mut vectors = Vec::new();
3097 let ddl = Vec::new();
3098
3099 let meta_guard = self.relational_store.table_meta.read();
3100 let tables_guard = self.relational_store.tables.read();
3101
3102 let mut live_row_ids: HashSet<RowId> = HashSet::new();
3103 for (table_name, table_rows) in tables_guard.iter() {
3104 let meta = match meta_guard.get(table_name) {
3105 Some(meta) => meta,
3106 None => continue,
3107 };
3108 let key_col = meta
3109 .natural_key_column
3110 .clone()
3111 .or_else(|| {
3112 if meta
3113 .columns
3114 .iter()
3115 .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3116 {
3117 Some("id".to_string())
3118 } else {
3119 None
3120 }
3121 })
3122 .or_else(|| {
3123 meta.columns
3124 .iter()
3125 .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
3126 .map(|c| c.name.clone())
3127 })
3128 .unwrap_or_default();
3129 if key_col.is_empty() {
3130 continue;
3131 }
3132 for row in table_rows.iter().filter(|row| row.deleted_tx.is_none()) {
3133 live_row_ids.insert(row.row_id);
3134 if row.lsn <= since_lsn {
3135 continue;
3136 }
3137 let key_val = match row.values.get(&key_col) {
3138 Some(value) => value.clone(),
3139 None => continue,
3140 };
3141 rows.push(RowChange {
3142 table: table_name.clone(),
3143 natural_key: NaturalKey {
3144 column: key_col.clone(),
3145 value: key_val,
3146 },
3147 values: row.values.clone(),
3148 deleted: false,
3149 lsn: row.lsn,
3150 });
3151 }
3152 }
3153 drop(tables_guard);
3154 drop(meta_guard);
3155
3156 let fwd = self.graph_store.forward_adj.read();
3157 for entries in fwd.values() {
3158 for entry in entries
3159 .iter()
3160 .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
3161 {
3162 edges.push(EdgeChange {
3163 source: entry.source,
3164 target: entry.target,
3165 edge_type: entry.edge_type.clone(),
3166 properties: entry.properties.clone(),
3167 lsn: entry.lsn,
3168 });
3169 }
3170 }
3171 drop(fwd);
3172
3173 let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
3174 let vecs = self.vector_store.vectors.read();
3175 for entry in vecs
3176 .iter()
3177 .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
3178 {
3179 if !live_row_ids.contains(&entry.row_id) {
3180 continue;
3181 }
3182 if !seen_vector_rows.insert(entry.row_id) {
3183 continue;
3184 }
3185 vectors.push(VectorChange {
3186 row_id: entry.row_id,
3187 vector: entry.vector.clone(),
3188 lsn: entry.lsn,
3189 });
3190 }
3191 drop(vecs);
3192
3193 ChangeSet {
3194 rows,
3195 edges,
3196 vectors,
3197 ddl,
3198 }
3199 }
3200
3201 fn preflight_sync_apply_memory(
3202 &self,
3203 changes: &ChangeSet,
3204 policies: &ConflictPolicies,
3205 ) -> Result<()> {
3206 let usage = self.accountant.usage();
3207 let Some(limit) = usage.limit else {
3208 return Ok(());
3209 };
3210 let available = usage.available.unwrap_or(limit);
3211 let mut required = 0usize;
3212
3213 for row in &changes.rows {
3214 if row.deleted || row.values.is_empty() {
3215 continue;
3216 }
3217
3218 let policy = policies
3219 .per_table
3220 .get(&row.table)
3221 .copied()
3222 .unwrap_or(policies.default);
3223 let existing = self.point_lookup(
3224 &row.table,
3225 &row.natural_key.column,
3226 &row.natural_key.value,
3227 self.snapshot(),
3228 )?;
3229
3230 if existing.is_some()
3231 && matches!(
3232 policy,
3233 ConflictPolicy::InsertIfNotExists | ConflictPolicy::ServerWins
3234 )
3235 {
3236 continue;
3237 }
3238
3239 required = required.saturating_add(
3240 self.table_meta(&row.table)
3241 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
3242 .unwrap_or_else(|| estimate_row_value_bytes(&row.values)),
3243 );
3244 }
3245
3246 for edge in &changes.edges {
3247 required = required.saturating_add(
3248 96 + edge.edge_type.len().saturating_mul(16)
3249 + estimate_row_value_bytes(&edge.properties),
3250 );
3251 }
3252
3253 for vector in &changes.vectors {
3254 if vector.vector.is_empty() {
3255 continue;
3256 }
3257 required = required.saturating_add(
3258 24 + vector
3259 .vector
3260 .len()
3261 .saturating_mul(std::mem::size_of::<f32>()),
3262 );
3263 }
3264
3265 if required > available {
3266 return Err(Error::MemoryBudgetExceeded {
3267 subsystem: "sync".to_string(),
3268 operation: "apply_changes".to_string(),
3269 requested_bytes: required,
3270 available_bytes: available,
3271 budget_limit_bytes: limit,
3272 hint:
3273 "Reduce sync batch size, split the push, or raise MEMORY_LIMIT on the server."
3274 .to_string(),
3275 });
3276 }
3277
3278 Ok(())
3279 }
3280
3281 pub fn changes_since(&self, since_lsn: Lsn) -> ChangeSet {
3283 if since_lsn > self.current_lsn() {
3285 return ChangeSet::default();
3286 }
3287
3288 let log = self.change_log.read();
3291 let change_first_lsn = log.first().map(|e| e.lsn());
3292 let change_log_empty = log.is_empty();
3293 drop(log);
3294
3295 let ddl = self.ddl_log.read();
3296 let ddl_first_lsn = ddl.first().map(|(lsn, _)| *lsn);
3297 let ddl_log_empty = ddl.is_empty();
3298 drop(ddl);
3299
3300 let has_table_data = !self
3301 .relational_store
3302 .tables
3303 .read()
3304 .values()
3305 .all(|rows| rows.is_empty());
3306 let has_table_meta = !self.relational_store.table_meta.read().is_empty();
3307
3308 if change_log_empty && ddl_log_empty && (has_table_data || has_table_meta) {
3311 return self.persisted_state_since(since_lsn);
3312 }
3313
3314 let min_first_lsn = match (change_first_lsn, ddl_first_lsn) {
3316 (Some(c), Some(d)) => Some(c.min(d)),
3317 (Some(c), None) => Some(c),
3318 (None, Some(d)) => Some(d),
3319 (None, None) => None, };
3321
3322 if min_first_lsn.is_some_and(|min_lsn| min_lsn.0 > since_lsn.0 + 1) {
3323 return self.persisted_state_since(since_lsn);
3325 }
3326
3327 let (ddl, change_entries) = self.with_commit_lock(|| {
3328 let ddl = self.ddl_log_since(since_lsn);
3329 let changes = self.change_log_since(since_lsn);
3330 (ddl, changes)
3331 });
3332
3333 let mut rows = Vec::new();
3334 let mut edges = Vec::new();
3335 let mut vectors = Vec::new();
3336
3337 for entry in change_entries {
3338 match entry {
3339 ChangeLogEntry::RowInsert { table, row_id, lsn } => {
3340 if let Some((natural_key, values)) = self.row_change_values(&table, row_id) {
3341 rows.push(RowChange {
3342 table,
3343 natural_key,
3344 values,
3345 deleted: false,
3346 lsn,
3347 });
3348 }
3349 }
3350 ChangeLogEntry::RowDelete {
3351 table,
3352 natural_key,
3353 lsn,
3354 ..
3355 } => {
3356 let mut values = HashMap::new();
3357 values.insert("__deleted".to_string(), Value::Bool(true));
3358 rows.push(RowChange {
3359 table,
3360 natural_key,
3361 values,
3362 deleted: true,
3363 lsn,
3364 });
3365 }
3366 ChangeLogEntry::EdgeInsert {
3367 source,
3368 target,
3369 edge_type,
3370 lsn,
3371 } => {
3372 let properties = self
3373 .edge_properties(source, target, &edge_type, lsn)
3374 .unwrap_or_default();
3375 edges.push(EdgeChange {
3376 source,
3377 target,
3378 edge_type,
3379 properties,
3380 lsn,
3381 });
3382 }
3383 ChangeLogEntry::EdgeDelete {
3384 source,
3385 target,
3386 edge_type,
3387 lsn,
3388 } => {
3389 let mut properties = HashMap::new();
3390 properties.insert("__deleted".to_string(), Value::Bool(true));
3391 edges.push(EdgeChange {
3392 source,
3393 target,
3394 edge_type,
3395 properties,
3396 lsn,
3397 });
3398 }
3399 ChangeLogEntry::VectorInsert { row_id, lsn } => {
3400 if let Some(vector) = self.vector_for_row_lsn(row_id, lsn) {
3401 vectors.push(VectorChange {
3402 row_id,
3403 vector,
3404 lsn,
3405 });
3406 }
3407 }
3408 ChangeLogEntry::VectorDelete { row_id, lsn } => vectors.push(VectorChange {
3409 row_id,
3410 vector: Vec::new(),
3411 lsn,
3412 }),
3413 }
3414 }
3415
3416 let insert_max_lsn: HashMap<(String, String, String), Lsn> = {
3422 let mut map: HashMap<(String, String, String), Lsn> = HashMap::new();
3423 for r in rows.iter().filter(|r| !r.deleted) {
3424 let key = (
3425 r.table.clone(),
3426 r.natural_key.column.clone(),
3427 format!("{:?}", r.natural_key.value),
3428 );
3429 let entry = map.entry(key).or_insert(Lsn(0));
3430 if r.lsn > *entry {
3431 *entry = r.lsn;
3432 }
3433 }
3434 map
3435 };
3436 rows.retain(|r| {
3437 if r.deleted {
3438 let key = (
3439 r.table.clone(),
3440 r.natural_key.column.clone(),
3441 format!("{:?}", r.natural_key.value),
3442 );
3443 match insert_max_lsn.get(&key) {
3446 Some(&insert_lsn) => insert_lsn < r.lsn,
3447 None => true,
3448 }
3449 } else {
3450 true
3451 }
3452 });
3453
3454 ChangeSet {
3455 rows,
3456 edges,
3457 vectors,
3458 ddl,
3459 }
3460 }
3461
3462 pub fn current_lsn(&self) -> Lsn {
3464 self.tx_mgr.current_lsn()
3465 }
3466
3467 pub fn committed_watermark(&self) -> TxId {
3469 self.tx_mgr.current_tx_max()
3470 }
3471
3472 pub fn next_tx(&self) -> TxId {
3474 self.tx_mgr.peek_next_tx()
3475 }
3476
3477 pub fn subscribe(&self) -> Receiver<CommitEvent> {
3480 self.subscribe_with_capacity(DEFAULT_SUBSCRIPTION_CAPACITY)
3481 }
3482
3483 pub fn subscribe_with_capacity(&self, capacity: usize) -> Receiver<CommitEvent> {
3485 let (tx, rx) = mpsc::sync_channel(capacity.max(1));
3486 self.subscriptions.lock().subscribers.push(tx);
3487 rx
3488 }
3489
3490 pub fn subscription_health(&self) -> SubscriptionMetrics {
3492 let subscriptions = self.subscriptions.lock();
3493 SubscriptionMetrics {
3494 active_channels: subscriptions.subscribers.len(),
3495 events_sent: subscriptions.events_sent,
3496 events_dropped: subscriptions.events_dropped,
3497 }
3498 }
3499
3500 pub fn apply_changes(
3502 &self,
3503 mut changes: ChangeSet,
3504 policies: &ConflictPolicies,
3505 ) -> Result<ApplyResult> {
3506 self.relational_store.bump_index_write_lock_count();
3511 self.plugin.on_sync_pull(&mut changes)?;
3512 self.check_disk_budget("sync_pull")?;
3513 self.preflight_sync_apply_memory(&changes, policies)?;
3514
3515 for row in &changes.rows {
3517 for v in row.values.values() {
3518 if let Value::TxId(incoming) = v
3519 && incoming.0 == u64::MAX
3520 {
3521 return Err(Error::TxIdOverflow {
3522 table: row.table.clone(),
3523 incoming: u64::MAX,
3524 });
3525 }
3526 }
3527 }
3528
3529 let mut tx = self.begin();
3530 let mut result = ApplyResult {
3531 applied_rows: 0,
3532 skipped_rows: 0,
3533 conflicts: Vec::new(),
3534 new_lsn: self.current_lsn(),
3535 };
3536 let vector_row_ids = changes.vectors.iter().map(|v| v.row_id).collect::<Vec<_>>();
3537 let mut vector_row_map: HashMap<RowId, RowId> = HashMap::new();
3538 let mut vector_row_idx = 0usize;
3539 let mut failed_row_ids: HashSet<RowId> = HashSet::new();
3540 let mut table_meta_cache: HashMap<String, Option<TableMeta>> = HashMap::new();
3541 let mut visible_rows_cache: HashMap<String, Vec<VersionedRow>> = HashMap::new();
3542
3543 for ddl in changes.ddl.clone() {
3544 match ddl {
3545 DdlChange::CreateTable {
3546 name,
3547 columns,
3548 constraints,
3549 } => {
3550 if self.table_meta(&name).is_some() {
3551 if let Some(local_meta) = self.table_meta(&name) {
3552 let local_cols: Vec<(String, String)> = local_meta
3553 .columns
3554 .iter()
3555 .map(|c| {
3556 let ty = match c.column_type {
3557 ColumnType::Integer => "INTEGER".to_string(),
3558 ColumnType::Real => "REAL".to_string(),
3559 ColumnType::Text => "TEXT".to_string(),
3560 ColumnType::Boolean => "BOOLEAN".to_string(),
3561 ColumnType::Json => "JSON".to_string(),
3562 ColumnType::Uuid => "UUID".to_string(),
3563 ColumnType::Vector(dim) => format!("VECTOR({dim})"),
3564 ColumnType::Timestamp => "TIMESTAMP".to_string(),
3565 ColumnType::TxId => "TXID".to_string(),
3566 };
3567 (c.name.clone(), ty)
3568 })
3569 .collect();
3570 let remote_cols: Vec<(String, String)> = columns
3571 .iter()
3572 .map(|(col_name, col_type)| {
3573 let base_type = col_type
3574 .split_whitespace()
3575 .next()
3576 .unwrap_or(col_type)
3577 .to_string();
3578 (col_name.clone(), base_type)
3579 })
3580 .collect();
3581 let mut local_sorted = local_cols.clone();
3582 local_sorted.sort();
3583 let mut remote_sorted = remote_cols.clone();
3584 remote_sorted.sort();
3585 if local_sorted != remote_sorted {
3586 result.conflicts.push(Conflict {
3587 natural_key: NaturalKey {
3588 column: "table".to_string(),
3589 value: Value::Text(name.clone()),
3590 },
3591 resolution: ConflictPolicy::ServerWins,
3592 reason: Some(format!(
3593 "schema mismatch: local columns {:?} differ from remote {:?}",
3594 local_cols, remote_cols
3595 )),
3596 });
3597 }
3598 }
3599 continue;
3600 }
3601 let mut sql = format!(
3602 "CREATE TABLE {} ({})",
3603 name,
3604 columns
3605 .iter()
3606 .map(|(col, ty)| format!("{col} {ty}"))
3607 .collect::<Vec<_>>()
3608 .join(", ")
3609 );
3610 if !constraints.is_empty() {
3611 sql.push(' ');
3612 sql.push_str(&constraints.join(" "));
3613 }
3614 self.execute_in_tx(tx, &sql, &HashMap::new())?;
3615 self.clear_statement_cache();
3616 table_meta_cache.remove(&name);
3617 visible_rows_cache.remove(&name);
3618 }
3619 DdlChange::DropTable { name } => {
3620 if self.table_meta(&name).is_some() {
3621 self.drop_table_aux_state(&name);
3622 self.relational_store().drop_table(&name);
3623 self.remove_persisted_table(&name)?;
3624 self.clear_statement_cache();
3625 }
3626 table_meta_cache.remove(&name);
3627 visible_rows_cache.remove(&name);
3628 }
3629 DdlChange::AlterTable {
3630 name,
3631 columns,
3632 constraints,
3633 } => {
3634 if self.table_meta(&name).is_none() {
3635 continue;
3636 }
3637 self.relational_store().table_meta.write().remove(&name);
3638 let mut sql = format!(
3639 "CREATE TABLE {} ({})",
3640 name,
3641 columns
3642 .iter()
3643 .map(|(col, ty)| format!("{col} {ty}"))
3644 .collect::<Vec<_>>()
3645 .join(", ")
3646 );
3647 if !constraints.is_empty() {
3648 sql.push(' ');
3649 sql.push_str(&constraints.join(" "));
3650 }
3651 self.execute_in_tx(tx, &sql, &HashMap::new())?;
3652 self.clear_statement_cache();
3653 table_meta_cache.remove(&name);
3654 visible_rows_cache.remove(&name);
3655 }
3656 DdlChange::CreateIndex {
3657 table,
3658 name,
3659 columns,
3660 } => {
3661 if self.table_meta(&table).is_none() {
3668 return Err(Error::TableNotFound(table.clone()));
3669 }
3670 let already = self
3671 .table_meta(&table)
3672 .map(|m| m.indexes.iter().any(|i| i.name == name))
3673 .unwrap_or(false);
3674 if !already {
3675 {
3676 let store = self.relational_store();
3677 let mut metas = store.table_meta.write();
3678 if let Some(m) = metas.get_mut(&table) {
3679 m.indexes.push(contextdb_core::IndexDecl {
3680 name: name.clone(),
3681 columns: columns.clone(),
3682 kind: contextdb_core::IndexKind::UserDeclared,
3683 });
3684 }
3685 }
3686 self.relational_store().create_index_storage(
3687 &table,
3688 &name,
3689 columns.clone(),
3690 );
3691 self.relational_store().rebuild_index(&table, &name);
3692 if let Some(table_meta) = self.table_meta(&table) {
3693 self.persist_table_meta(&table, &table_meta)?;
3694 }
3695 self.allocate_ddl_lsn(|lsn| {
3696 self.log_create_index_ddl(&table, &name, &columns, lsn)
3697 })?;
3698 self.clear_statement_cache();
3699 }
3700 table_meta_cache.remove(&table);
3701 }
3702 DdlChange::DropIndex { table, name } => {
3703 if self.table_meta(&table).is_some() {
3704 let exists = self
3705 .table_meta(&table)
3706 .map(|m| m.indexes.iter().any(|i| i.name == name))
3707 .unwrap_or(false);
3708 if exists {
3709 {
3710 let store = self.relational_store();
3711 let mut metas = store.table_meta.write();
3712 if let Some(m) = metas.get_mut(&table) {
3713 m.indexes.retain(|i| i.name != name);
3714 }
3715 }
3716 self.relational_store().drop_index_storage(&table, &name);
3717 if let Some(table_meta) = self.table_meta(&table) {
3718 self.persist_table_meta(&table, &table_meta)?;
3719 }
3720 self.allocate_ddl_lsn(|lsn| {
3721 self.log_drop_index_ddl(&table, &name, lsn)
3722 })?;
3723 self.clear_statement_cache();
3724 }
3725 }
3726 table_meta_cache.remove(&table);
3727 }
3728 }
3729 }
3730
3731 self.preflight_sync_apply_memory(&changes, policies)?;
3732
3733 for row in changes.rows {
3734 if row.values.is_empty() {
3735 result.skipped_rows += 1;
3736 self.commit_with_source(tx, CommitSource::SyncPull)?;
3737 tx = self.begin();
3738 continue;
3739 }
3740
3741 let policy = policies
3742 .per_table
3743 .get(&row.table)
3744 .copied()
3745 .unwrap_or(policies.default);
3746
3747 let existing = cached_point_lookup(
3748 self,
3749 &mut visible_rows_cache,
3750 &row.table,
3751 &row.natural_key.column,
3752 &row.natural_key.value,
3753 )?;
3754 let is_delete = row.deleted;
3755 let row_has_vector = cached_table_meta(self, &mut table_meta_cache, &row.table)
3756 .is_some_and(|meta| {
3757 meta.columns
3758 .iter()
3759 .any(|col| matches!(col.column_type, ColumnType::Vector(_)))
3760 });
3761
3762 if is_delete {
3763 if let Some(local) = existing {
3764 if row_has_vector
3765 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3766 {
3767 vector_row_map.insert(*remote_row_id, local.row_id);
3768 vector_row_idx += 1;
3769 }
3770 if let Err(err) = self.delete_row(tx, &row.table, local.row_id) {
3771 result.conflicts.push(Conflict {
3772 natural_key: row.natural_key.clone(),
3773 resolution: policy,
3774 reason: Some(format!("delete failed: {err}")),
3775 });
3776 result.skipped_rows += 1;
3777 } else {
3778 remove_cached_row(&mut visible_rows_cache, &row.table, local.row_id);
3779 result.applied_rows += 1;
3780 }
3781 } else {
3782 result.skipped_rows += 1;
3783 }
3784 self.commit_with_source(tx, CommitSource::SyncPull)?;
3785 tx = self.begin();
3786 continue;
3787 }
3788
3789 let mut values = row.values.clone();
3790 values.remove("__deleted");
3791
3792 match (existing, policy) {
3793 (None, _) => {
3794 if let Some(meta) = cached_table_meta(self, &mut table_meta_cache, &row.table) {
3795 let mut constraint_error: Option<String> = None;
3796
3797 for col_def in &meta.columns {
3798 if !col_def.nullable
3799 && !col_def.primary_key
3800 && col_def.default.is_none()
3801 {
3802 match values.get(&col_def.name) {
3803 None | Some(Value::Null) => {
3804 constraint_error = Some(format!(
3805 "NOT NULL constraint violated: {}.{}",
3806 row.table, col_def.name
3807 ));
3808 break;
3809 }
3810 _ => {}
3811 }
3812 }
3813 }
3814
3815 let has_unique = meta.columns.iter().any(|c| c.unique && !c.primary_key);
3816 if constraint_error.is_none() && has_unique {
3817 for col_def in &meta.columns {
3818 if col_def.unique
3819 && !col_def.primary_key
3820 && let Some(new_val) = values.get(&col_def.name)
3821 && *new_val != Value::Null
3822 && cached_visible_rows(
3823 self,
3824 &mut visible_rows_cache,
3825 &row.table,
3826 )?
3827 .iter()
3828 .any(|r| r.values.get(&col_def.name) == Some(new_val))
3829 {
3830 constraint_error = Some(format!(
3831 "UNIQUE constraint violated: {}.{}",
3832 row.table, col_def.name
3833 ));
3834 break;
3835 }
3836 }
3837 }
3838
3839 if let Some(err_msg) = constraint_error {
3840 result.skipped_rows += 1;
3841 if row_has_vector
3842 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3843 {
3844 failed_row_ids.insert(*remote_row_id);
3845 vector_row_idx += 1;
3846 }
3847 result.conflicts.push(Conflict {
3848 natural_key: row.natural_key.clone(),
3849 resolution: policy,
3850 reason: Some(err_msg),
3851 });
3852 self.commit_with_source(tx, CommitSource::SyncPull)?;
3853 tx = self.begin();
3854 continue;
3855 }
3856 }
3857
3858 let mut overflow: Option<Error> = None;
3860 for v in values.values() {
3861 if let Value::TxId(incoming) = v
3862 && let Err(err) = self.tx_mgr.advance_for_sync(&row.table, *incoming)
3863 {
3864 overflow = Some(err);
3865 break;
3866 }
3867 }
3868 if let Some(err) = overflow {
3869 return Err(err);
3870 }
3871
3872 match self.insert_row_for_sync(tx, &row.table, values.clone()) {
3873 Ok(new_row_id) => {
3874 record_cached_insert(
3875 &mut visible_rows_cache,
3876 &row.table,
3877 VersionedRow {
3878 row_id: new_row_id,
3879 values: values.clone(),
3880 created_tx: tx,
3881 deleted_tx: None,
3882 lsn: row.lsn,
3883 created_at: None,
3884 },
3885 );
3886 result.applied_rows += 1;
3887 if row_has_vector
3888 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3889 {
3890 vector_row_map.insert(*remote_row_id, new_row_id);
3891 vector_row_idx += 1;
3892 }
3893 }
3894 Err(err) => {
3895 if is_fatal_sync_apply_error(&err) {
3896 return Err(err);
3897 }
3898 result.skipped_rows += 1;
3899 if row_has_vector
3900 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3901 {
3902 failed_row_ids.insert(*remote_row_id);
3903 vector_row_idx += 1;
3904 }
3905 result.conflicts.push(Conflict {
3906 natural_key: row.natural_key.clone(),
3907 resolution: policy,
3908 reason: Some(format!("{err}")),
3909 });
3910 }
3911 }
3912 }
3913 (Some(local), ConflictPolicy::InsertIfNotExists) => {
3914 if row_has_vector
3915 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3916 {
3917 vector_row_map.insert(*remote_row_id, local.row_id);
3918 vector_row_idx += 1;
3919 }
3920 result.skipped_rows += 1;
3921 }
3922 (Some(_), ConflictPolicy::ServerWins) => {
3923 result.skipped_rows += 1;
3924 if row_has_vector
3925 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3926 {
3927 failed_row_ids.insert(*remote_row_id);
3928 vector_row_idx += 1;
3929 }
3930 result.conflicts.push(Conflict {
3931 natural_key: row.natural_key.clone(),
3932 resolution: ConflictPolicy::ServerWins,
3933 reason: Some("server_wins".to_string()),
3934 });
3935 }
3936 (Some(local), ConflictPolicy::LatestWins) => {
3937 let incoming_wins = if row.lsn == local.lsn {
3942 let mut winner = false;
3943 for (col, incoming_val) in values.iter() {
3944 if let (Value::TxId(incoming_tx), Some(Value::TxId(local_tx))) =
3945 (incoming_val, local.values.get(col))
3946 {
3947 if incoming_tx.0 > local_tx.0 {
3948 winner = true;
3949 break;
3950 } else if incoming_tx.0 < local_tx.0 {
3951 winner = false;
3952 break;
3953 }
3954 }
3955 }
3956 winner
3957 } else {
3958 row.lsn > local.lsn
3959 };
3960
3961 if !incoming_wins {
3962 result.skipped_rows += 1;
3963 if row_has_vector
3964 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3965 {
3966 failed_row_ids.insert(*remote_row_id);
3967 vector_row_idx += 1;
3968 }
3969 result.conflicts.push(Conflict {
3970 natural_key: row.natural_key.clone(),
3971 resolution: ConflictPolicy::LatestWins,
3972 reason: Some("local_lsn_newer_or_equal".to_string()),
3973 });
3974 } else {
3975 if let Some(meta) = self.table_meta(&row.table)
3977 && let Some(sm) = &meta.state_machine
3978 {
3979 let sm_col = sm.column.clone();
3980 let transitions = sm.transitions.clone();
3981 let incoming_state = values.get(&sm_col).and_then(|v| match v {
3982 Value::Text(s) => Some(s.clone()),
3983 _ => None,
3984 });
3985 let local_state = local.values.get(&sm_col).and_then(|v| match v {
3986 Value::Text(s) => Some(s.clone()),
3987 _ => None,
3988 });
3989
3990 if let (Some(incoming), Some(current)) = (incoming_state, local_state) {
3991 let valid = transitions
3993 .get(¤t)
3994 .is_some_and(|targets| targets.contains(&incoming));
3995 if !valid && incoming != current {
3996 result.skipped_rows += 1;
3997 if row_has_vector
3998 && let Some(remote_row_id) =
3999 vector_row_ids.get(vector_row_idx)
4000 {
4001 failed_row_ids.insert(*remote_row_id);
4002 vector_row_idx += 1;
4003 }
4004 result.conflicts.push(Conflict {
4005 natural_key: row.natural_key.clone(),
4006 resolution: ConflictPolicy::LatestWins,
4007 reason: Some(format!(
4008 "state_machine: invalid transition {} -> {} (current: {})",
4009 current, incoming, current
4010 )),
4011 });
4012 self.commit_with_source(tx, CommitSource::SyncPull)?;
4013 tx = self.begin();
4014 continue;
4015 }
4016 }
4017 }
4018
4019 let mut overflow: Option<Error> = None;
4021 for v in values.values() {
4022 if let Value::TxId(incoming) = v
4023 && let Err(err) =
4024 self.tx_mgr.advance_for_sync(&row.table, *incoming)
4025 {
4026 overflow = Some(err);
4027 break;
4028 }
4029 }
4030 if let Some(err) = overflow {
4031 return Err(err);
4032 }
4033
4034 match self.upsert_row_for_sync(
4035 tx,
4036 &row.table,
4037 &row.natural_key.column,
4038 values.clone(),
4039 ) {
4040 Ok(_) => {
4041 visible_rows_cache.remove(&row.table);
4042 result.applied_rows += 1;
4043 if row_has_vector
4044 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
4045 {
4046 if let Ok(Some(found)) = self.point_lookup_in_tx(
4047 tx,
4048 &row.table,
4049 &row.natural_key.column,
4050 &row.natural_key.value,
4051 self.snapshot(),
4052 ) {
4053 vector_row_map.insert(*remote_row_id, found.row_id);
4054 }
4055 vector_row_idx += 1;
4056 }
4057 }
4058 Err(err) => {
4059 if is_fatal_sync_apply_error(&err) {
4060 return Err(err);
4061 }
4062 result.skipped_rows += 1;
4063 if row_has_vector
4064 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
4065 {
4066 failed_row_ids.insert(*remote_row_id);
4067 vector_row_idx += 1;
4068 }
4069 result.conflicts.push(Conflict {
4070 natural_key: row.natural_key.clone(),
4071 resolution: ConflictPolicy::LatestWins,
4072 reason: Some(format!("state_machine_or_constraint: {err}")),
4073 });
4074 }
4075 }
4076 }
4077 }
4078 (Some(_), ConflictPolicy::EdgeWins) => {
4079 result.conflicts.push(Conflict {
4080 natural_key: row.natural_key.clone(),
4081 resolution: ConflictPolicy::EdgeWins,
4082 reason: Some("edge_wins".to_string()),
4083 });
4084 let mut overflow: Option<Error> = None;
4085 for v in values.values() {
4086 if let Value::TxId(incoming) = v
4087 && let Err(err) = self.tx_mgr.advance_for_sync(&row.table, *incoming)
4088 {
4089 overflow = Some(err);
4090 break;
4091 }
4092 }
4093 if let Some(err) = overflow {
4094 return Err(err);
4095 }
4096
4097 match self.upsert_row_for_sync(
4098 tx,
4099 &row.table,
4100 &row.natural_key.column,
4101 values.clone(),
4102 ) {
4103 Ok(_) => {
4104 visible_rows_cache.remove(&row.table);
4105 result.applied_rows += 1;
4106 if row_has_vector
4107 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
4108 {
4109 if let Ok(Some(found)) = self.point_lookup_in_tx(
4110 tx,
4111 &row.table,
4112 &row.natural_key.column,
4113 &row.natural_key.value,
4114 self.snapshot(),
4115 ) {
4116 vector_row_map.insert(*remote_row_id, found.row_id);
4117 }
4118 vector_row_idx += 1;
4119 }
4120 }
4121 Err(err) => {
4122 if is_fatal_sync_apply_error(&err) {
4123 return Err(err);
4124 }
4125 result.skipped_rows += 1;
4126 if row_has_vector
4127 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
4128 {
4129 failed_row_ids.insert(*remote_row_id);
4130 vector_row_idx += 1;
4131 }
4132 if let Some(last) = result.conflicts.last_mut() {
4133 last.reason = Some(format!("state_machine_or_constraint: {err}"));
4134 }
4135 }
4136 }
4137 }
4138 }
4139
4140 self.commit_with_source(tx, CommitSource::SyncPull)?;
4141 tx = self.begin();
4142 }
4143
4144 for edge in changes.edges {
4145 let is_delete = matches!(edge.properties.get("__deleted"), Some(Value::Bool(true)));
4146 if is_delete {
4147 let _ = self.delete_edge(tx, edge.source, edge.target, &edge.edge_type);
4148 } else {
4149 let _ = self.insert_edge(
4150 tx,
4151 edge.source,
4152 edge.target,
4153 edge.edge_type,
4154 edge.properties,
4155 );
4156 }
4157 }
4158
4159 for vector in changes.vectors {
4160 if failed_row_ids.contains(&vector.row_id) {
4161 continue; }
4163 let local_row_id = vector_row_map
4164 .get(&vector.row_id)
4165 .copied()
4166 .unwrap_or(vector.row_id);
4167 if vector.vector.is_empty() {
4168 let _ = self.vector.delete_vector(tx, local_row_id);
4169 } else {
4170 if self.has_live_vector(local_row_id, self.snapshot()) {
4171 let _ = self.delete_vector(tx, local_row_id);
4172 }
4173 if let Err(err) = self.insert_vector(tx, local_row_id, vector.vector)
4174 && is_fatal_sync_apply_error(&err)
4175 {
4176 return Err(err);
4177 }
4178 }
4179 }
4180
4181 self.commit_with_source(tx, CommitSource::SyncPull)?;
4182 result.new_lsn = self.current_lsn();
4183 Ok(result)
4184 }
4185
4186 fn row_change_values(
4187 &self,
4188 table: &str,
4189 row_id: RowId,
4190 ) -> Option<(NaturalKey, HashMap<String, Value>)> {
4191 let tables = self.relational_store.tables.read();
4192 let meta = self.relational_store.table_meta.read();
4193 let rows = tables.get(table)?;
4194 let row = rows.iter().find(|r| r.row_id == row_id)?;
4195 let key_col = meta
4196 .get(table)
4197 .and_then(|m| m.natural_key_column.clone())
4198 .or_else(|| {
4199 meta.get(table).and_then(|m| {
4200 m.columns
4201 .iter()
4202 .find(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
4203 .map(|_| "id".to_string())
4204 })
4205 })
4206 .or_else(|| {
4207 meta.get(table).and_then(|m| {
4208 m.columns
4209 .iter()
4210 .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
4211 .map(|c| c.name.clone())
4212 })
4213 })?;
4214
4215 let key_val = row.values.get(&key_col)?.clone();
4216 let values = row
4217 .values
4218 .iter()
4219 .map(|(k, v)| (k.clone(), v.clone()))
4220 .collect::<HashMap<_, _>>();
4221 Some((
4222 NaturalKey {
4223 column: key_col,
4224 value: key_val,
4225 },
4226 values,
4227 ))
4228 }
4229
4230 fn edge_properties(
4231 &self,
4232 source: NodeId,
4233 target: NodeId,
4234 edge_type: &str,
4235 lsn: Lsn,
4236 ) -> Option<HashMap<String, Value>> {
4237 self.graph_store
4238 .forward_adj
4239 .read()
4240 .get(&source)
4241 .and_then(|entries| {
4242 entries
4243 .iter()
4244 .find(|e| e.target == target && e.edge_type == edge_type && e.lsn == lsn)
4245 .map(|e| e.properties.clone())
4246 })
4247 }
4248
4249 fn vector_for_row_lsn(&self, row_id: RowId, lsn: Lsn) -> Option<Vec<f32>> {
4250 self.vector_store
4251 .vectors
4252 .read()
4253 .iter()
4254 .find(|v| v.row_id == row_id && v.lsn == lsn)
4255 .map(|v| v.vector.clone())
4256 }
4257}
4258
4259fn strip_internal_row_id(mut qr: QueryResult) -> QueryResult {
4260 if let Some(pos) = qr.columns.iter().position(|c| c == "row_id") {
4261 qr.columns.remove(pos);
4262 for row in &mut qr.rows {
4263 if pos < row.len() {
4264 row.remove(pos);
4265 }
4266 }
4267 }
4268 qr
4269}
4270
4271fn cached_table_meta(
4272 db: &Database,
4273 cache: &mut HashMap<String, Option<TableMeta>>,
4274 table: &str,
4275) -> Option<TableMeta> {
4276 cache
4277 .entry(table.to_string())
4278 .or_insert_with(|| db.table_meta(table))
4279 .clone()
4280}
4281
4282fn cached_visible_rows<'a>(
4283 db: &Database,
4284 cache: &'a mut HashMap<String, Vec<VersionedRow>>,
4285 table: &str,
4286) -> Result<&'a mut Vec<VersionedRow>> {
4287 if !cache.contains_key(table) {
4288 let rows = db.scan(table, db.snapshot())?;
4289 cache.insert(table.to_string(), rows);
4290 }
4291 Ok(cache.get_mut(table).expect("cached visible rows"))
4292}
4293
4294fn cached_point_lookup(
4295 db: &Database,
4296 cache: &mut HashMap<String, Vec<VersionedRow>>,
4297 table: &str,
4298 col: &str,
4299 value: &Value,
4300) -> Result<Option<VersionedRow>> {
4301 let rows = cached_visible_rows(db, cache, table)?;
4302 Ok(rows
4303 .iter()
4304 .find(|r| r.values.get(col) == Some(value))
4305 .cloned())
4306}
4307
4308fn record_cached_insert(
4309 cache: &mut HashMap<String, Vec<VersionedRow>>,
4310 table: &str,
4311 row: VersionedRow,
4312) {
4313 if let Some(rows) = cache.get_mut(table) {
4314 rows.push(row);
4315 }
4316}
4317
4318fn remove_cached_row(cache: &mut HashMap<String, Vec<VersionedRow>>, table: &str, row_id: RowId) {
4319 if let Some(rows) = cache.get_mut(table) {
4320 rows.retain(|row| row.row_id != row_id);
4321 }
4322}
4323
4324fn query_outcome_from_result(result: &Result<QueryResult>) -> QueryOutcome {
4325 match result {
4326 Ok(query_result) => QueryOutcome::Success {
4327 row_count: if query_result.rows.is_empty() {
4328 query_result.rows_affected as usize
4329 } else {
4330 query_result.rows.len()
4331 },
4332 },
4333 Err(error) => QueryOutcome::Error {
4334 error: error.to_string(),
4335 },
4336 }
4337}
4338
4339fn maybe_prebuild_hnsw(vector_store: &VectorStore, accountant: &MemoryAccountant) {
4340 if vector_store.vector_count() >= 1000 {
4341 let entries = vector_store.all_entries();
4342 let dim = vector_store.dimension().unwrap_or(0);
4343 let estimated_bytes = estimate_hnsw_index_bytes(entries.len(), dim);
4344 if accountant
4345 .try_allocate_for(
4346 estimated_bytes,
4347 "vector_index",
4348 "prebuild_hnsw",
4349 "Open the database with a larger MEMORY_LIMIT to enable HNSW indexing.",
4350 )
4351 .is_err()
4352 {
4353 return;
4354 }
4355
4356 let hnsw_opt = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4357 HnswIndex::new(&entries, dim)
4358 }))
4359 .ok();
4360 if hnsw_opt.is_some() {
4361 vector_store.set_hnsw_bytes(estimated_bytes);
4362 } else {
4363 accountant.release(estimated_bytes);
4364 }
4365 let _ = vector_store.hnsw.set(RwLock::new(hnsw_opt));
4366 }
4367}
4368
4369fn estimate_row_bytes_for_meta(
4370 values: &HashMap<ColName, Value>,
4371 meta: &TableMeta,
4372 include_vectors: bool,
4373) -> usize {
4374 let mut bytes = 96usize;
4375 for column in &meta.columns {
4376 let Some(value) = values.get(&column.name) else {
4377 continue;
4378 };
4379 if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
4380 continue;
4381 }
4382 bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
4383 }
4384 bytes
4385}
4386
4387fn estimate_vector_bytes(vector: &[f32]) -> usize {
4388 24 + vector.len().saturating_mul(std::mem::size_of::<f32>())
4389}
4390
4391fn estimate_edge_bytes(
4392 source: NodeId,
4393 target: NodeId,
4394 edge_type: &str,
4395 properties: &HashMap<String, Value>,
4396) -> usize {
4397 AdjEntry {
4398 source,
4399 target,
4400 edge_type: edge_type.to_string(),
4401 properties: properties.clone(),
4402 created_tx: TxId(0),
4403 deleted_tx: None,
4404 lsn: Lsn(0),
4405 }
4406 .estimated_bytes()
4407}
4408
4409fn estimate_hnsw_index_bytes(entry_count: usize, dimension: usize) -> usize {
4410 entry_count
4411 .saturating_mul(dimension)
4412 .saturating_mul(std::mem::size_of::<f32>())
4413 .saturating_mul(3)
4414}
4415
4416impl Drop for Database {
4417 fn drop(&mut self) {
4418 if self.closed.swap(true, Ordering::SeqCst) {
4419 return;
4420 }
4421 let runtime = self.pruning_runtime.get_mut();
4422 runtime.shutdown.store(true, Ordering::SeqCst);
4423 if let Some(handle) = runtime.handle.take() {
4424 let _ = handle.join();
4425 }
4426 self.subscriptions.lock().subscribers.clear();
4427 if let Some(persistence) = &self.persistence {
4428 persistence.close();
4429 }
4430 }
4431}
4432
4433fn sleep_with_shutdown(shutdown: &AtomicBool, interval: Duration) {
4434 let deadline = Instant::now() + interval;
4435 while !shutdown.load(Ordering::SeqCst) {
4436 let now = Instant::now();
4437 if now >= deadline {
4438 break;
4439 }
4440 let remaining = deadline.saturating_duration_since(now);
4441 thread::sleep(remaining.min(Duration::from_millis(50)));
4442 }
4443}
4444
4445fn prune_expired_rows(
4446 relational_store: &Arc<RelationalStore>,
4447 graph_store: &Arc<GraphStore>,
4448 vector_store: &Arc<VectorStore>,
4449 accountant: &MemoryAccountant,
4450 persistence: Option<&Arc<RedbPersistence>>,
4451 sync_watermark: Lsn,
4452) -> u64 {
4453 let now = Wallclock::now();
4454 let metas = relational_store.table_meta.read().clone();
4455 let mut pruned_by_table: HashMap<String, Vec<RowId>> = HashMap::new();
4456 let mut pruned_node_ids = HashSet::new();
4457 let mut released_row_bytes = 0usize;
4458
4459 {
4460 let mut tables = relational_store.tables.write();
4461 for (table_name, rows) in tables.iter_mut() {
4462 let Some(meta) = metas.get(table_name) else {
4463 continue;
4464 };
4465 if meta.default_ttl_seconds.is_none() {
4466 continue;
4467 }
4468
4469 rows.retain(|row| {
4470 if !row_is_prunable(row, meta, now, sync_watermark) {
4471 return true;
4472 }
4473
4474 pruned_by_table
4475 .entry(table_name.clone())
4476 .or_default()
4477 .push(row.row_id);
4478 released_row_bytes = released_row_bytes
4479 .saturating_add(estimate_row_bytes_for_meta(&row.values, meta, false));
4480 if let Some(Value::Uuid(id)) = row.values.get("id") {
4481 pruned_node_ids.insert(*id);
4482 }
4483 false
4484 });
4485 }
4486 }
4487
4488 let pruned_row_ids: HashSet<RowId> = pruned_by_table
4489 .values()
4490 .flat_map(|rows| rows.iter().copied())
4491 .collect();
4492 if pruned_row_ids.is_empty() {
4493 return 0;
4494 }
4495
4496 let mut released_vector_bytes = 0usize;
4497 {
4498 let mut vectors = vector_store.vectors.write();
4499 vectors.retain(|entry| {
4500 if pruned_row_ids.contains(&entry.row_id) {
4501 released_vector_bytes =
4502 released_vector_bytes.saturating_add(entry.estimated_bytes());
4503 false
4504 } else {
4505 true
4506 }
4507 });
4508 }
4509 if let Some(hnsw) = vector_store.hnsw.get() {
4510 *hnsw.write() = None;
4511 }
4512
4513 let mut released_edge_bytes = 0usize;
4514 {
4515 let mut forward = graph_store.forward_adj.write();
4516 for entries in forward.values_mut() {
4517 entries.retain(|entry| {
4518 if pruned_node_ids.contains(&entry.source)
4519 || pruned_node_ids.contains(&entry.target)
4520 {
4521 released_edge_bytes =
4522 released_edge_bytes.saturating_add(entry.estimated_bytes());
4523 false
4524 } else {
4525 true
4526 }
4527 });
4528 }
4529 forward.retain(|_, entries| !entries.is_empty());
4530 }
4531 {
4532 let mut reverse = graph_store.reverse_adj.write();
4533 for entries in reverse.values_mut() {
4534 entries.retain(|entry| {
4535 !pruned_node_ids.contains(&entry.source) && !pruned_node_ids.contains(&entry.target)
4536 });
4537 }
4538 reverse.retain(|_, entries| !entries.is_empty());
4539 }
4540
4541 if let Some(persistence) = persistence {
4542 for table_name in pruned_by_table.keys() {
4543 let rows = relational_store
4544 .tables
4545 .read()
4546 .get(table_name)
4547 .cloned()
4548 .unwrap_or_default();
4549 let _ = persistence.rewrite_table_rows(table_name, &rows);
4550 }
4551
4552 let vectors = vector_store.vectors.read().clone();
4553 let edges = graph_store
4554 .forward_adj
4555 .read()
4556 .values()
4557 .flat_map(|entries| entries.iter().cloned())
4558 .collect::<Vec<_>>();
4559 let _ = persistence.rewrite_vectors(&vectors);
4560 let _ = persistence.rewrite_graph_edges(&edges);
4561 }
4562
4563 accountant.release(
4564 released_row_bytes
4565 .saturating_add(released_vector_bytes)
4566 .saturating_add(released_edge_bytes),
4567 );
4568
4569 pruned_row_ids.len() as u64
4570}
4571
4572fn row_is_prunable(
4573 row: &VersionedRow,
4574 meta: &TableMeta,
4575 now: Wallclock,
4576 sync_watermark: Lsn,
4577) -> bool {
4578 if meta.sync_safe && row.lsn >= sync_watermark {
4579 return false;
4580 }
4581
4582 let Some(default_ttl_seconds) = meta.default_ttl_seconds else {
4583 return false;
4584 };
4585
4586 if let Some(expires_column) = &meta.expires_column {
4587 match row.values.get(expires_column) {
4588 Some(Value::Timestamp(millis)) if *millis == i64::MAX => return false,
4589 Some(Value::Timestamp(millis)) if *millis < 0 => return true,
4590 Some(Value::Timestamp(millis)) => return (*millis as u64) <= now.0,
4591 Some(Value::Null) | None => {}
4592 Some(_) => {}
4593 }
4594 }
4595
4596 let ttl_millis = default_ttl_seconds.saturating_mul(1000);
4597 row.created_at
4598 .map(|created_at| now.0.saturating_sub(created_at.0) > ttl_millis)
4599 .unwrap_or(false)
4600}
4601
4602fn max_tx_across_all(
4603 relational: &RelationalStore,
4604 graph: &GraphStore,
4605 vector: &VectorStore,
4606) -> TxId {
4607 let relational_max = relational
4608 .tables
4609 .read()
4610 .values()
4611 .flat_map(|rows| rows.iter())
4612 .flat_map(|row| std::iter::once(row.created_tx).chain(row.deleted_tx))
4613 .max()
4614 .unwrap_or(TxId(0));
4615 let graph_max = graph
4616 .forward_adj
4617 .read()
4618 .values()
4619 .flat_map(|entries| entries.iter())
4620 .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
4621 .max()
4622 .unwrap_or(TxId(0));
4623 let vector_max = vector
4624 .vectors
4625 .read()
4626 .iter()
4627 .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
4628 .max()
4629 .unwrap_or(TxId(0));
4630
4631 relational_max.max(graph_max).max(vector_max)
4632}
4633
4634fn max_lsn_across_all(
4635 relational: &RelationalStore,
4636 graph: &GraphStore,
4637 vector: &VectorStore,
4638) -> Lsn {
4639 let relational_max = relational
4640 .tables
4641 .read()
4642 .values()
4643 .flat_map(|rows| rows.iter().map(|row| row.lsn))
4644 .max()
4645 .unwrap_or(Lsn(0));
4646 let graph_max = graph
4647 .forward_adj
4648 .read()
4649 .values()
4650 .flat_map(|entries| entries.iter().map(|entry| entry.lsn))
4651 .max()
4652 .unwrap_or(Lsn(0));
4653 let vector_max = vector
4654 .vectors
4655 .read()
4656 .iter()
4657 .map(|entry| entry.lsn)
4658 .max()
4659 .unwrap_or(Lsn(0));
4660
4661 relational_max.max(graph_max).max(vector_max)
4662}
4663
4664fn persisted_memory_limit(path: &Path) -> Result<Option<usize>> {
4665 if !path.exists() {
4666 return Ok(None);
4667 }
4668 let persistence = RedbPersistence::open(path)?;
4669 let limit = persistence.load_config_value::<usize>("memory_limit")?;
4670 persistence.close();
4671 Ok(limit)
4672}
4673
4674fn is_fatal_sync_apply_error(err: &Error) -> bool {
4675 matches!(
4676 err,
4677 Error::MemoryBudgetExceeded { .. } | Error::DiskBudgetExceeded { .. }
4678 )
4679}
4680
4681fn ddl_change_from_create_table(ct: &CreateTable) -> DdlChange {
4682 DdlChange::CreateTable {
4683 name: ct.name.clone(),
4684 columns: ct
4685 .columns
4686 .iter()
4687 .map(|col| {
4688 (
4689 col.name.clone(),
4690 sql_type_for_ast_column(col, &ct.propagation_rules),
4691 )
4692 })
4693 .collect(),
4694 constraints: create_table_constraints_from_ast(ct),
4695 }
4696}
4697
4698fn ddl_change_from_meta(name: &str, meta: &TableMeta) -> DdlChange {
4699 DdlChange::CreateTable {
4700 name: name.to_string(),
4701 columns: meta
4702 .columns
4703 .iter()
4704 .map(|col| {
4705 (
4706 col.name.clone(),
4707 sql_type_for_meta_column(col, &meta.propagation_rules),
4708 )
4709 })
4710 .collect(),
4711 constraints: create_table_constraints_from_meta(meta),
4712 }
4713}
4714
4715fn sql_type_for_ast(data_type: &DataType) -> String {
4716 match data_type {
4717 DataType::Uuid => "UUID".to_string(),
4718 DataType::Text => "TEXT".to_string(),
4719 DataType::Integer => "INTEGER".to_string(),
4720 DataType::Real => "REAL".to_string(),
4721 DataType::Boolean => "BOOLEAN".to_string(),
4722 DataType::Timestamp => "TIMESTAMP".to_string(),
4723 DataType::Json => "JSON".to_string(),
4724 DataType::Vector(dim) => format!("VECTOR({dim})"),
4725 DataType::TxId => "TXID".to_string(),
4726 }
4727}
4728
4729fn sql_type_for_ast_column(
4730 col: &contextdb_parser::ast::ColumnDef,
4731 _rules: &[contextdb_parser::ast::AstPropagationRule],
4732) -> String {
4733 let mut ty = sql_type_for_ast(&col.data_type);
4734 if let Some(reference) = &col.references {
4735 ty.push_str(&format!(
4736 " REFERENCES {}({})",
4737 reference.table, reference.column
4738 ));
4739 for rule in &reference.propagation_rules {
4740 if let contextdb_parser::ast::AstPropagationRule::FkState {
4741 trigger_state,
4742 target_state,
4743 max_depth,
4744 abort_on_failure,
4745 } = rule
4746 {
4747 ty.push_str(&format!(
4748 " ON STATE {} PROPAGATE SET {}",
4749 trigger_state, target_state
4750 ));
4751 if max_depth.unwrap_or(10) != 10 {
4752 ty.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
4753 }
4754 if *abort_on_failure {
4755 ty.push_str(" ABORT ON FAILURE");
4756 }
4757 }
4758 }
4759 }
4760 if col.primary_key {
4761 ty.push_str(" PRIMARY KEY");
4762 }
4763 if !col.nullable && !col.primary_key {
4764 ty.push_str(" NOT NULL");
4765 }
4766 if col.unique {
4767 ty.push_str(" UNIQUE");
4768 }
4769 if col.immutable {
4770 ty.push_str(" IMMUTABLE");
4771 }
4772 ty
4773}
4774
4775fn sql_type_for_meta_column(col: &contextdb_core::ColumnDef, rules: &[PropagationRule]) -> String {
4776 let mut ty = match col.column_type {
4777 ColumnType::Integer => "INTEGER".to_string(),
4778 ColumnType::Real => "REAL".to_string(),
4779 ColumnType::Text => "TEXT".to_string(),
4780 ColumnType::Boolean => "BOOLEAN".to_string(),
4781 ColumnType::Json => "JSON".to_string(),
4782 ColumnType::Uuid => "UUID".to_string(),
4783 ColumnType::Vector(dim) => format!("VECTOR({dim})"),
4784 ColumnType::Timestamp => "TIMESTAMP".to_string(),
4785 ColumnType::TxId => "TXID".to_string(),
4786 };
4787
4788 let fk_rules = rules
4789 .iter()
4790 .filter_map(|rule| match rule {
4791 PropagationRule::ForeignKey {
4792 fk_column,
4793 referenced_table,
4794 referenced_column,
4795 trigger_state,
4796 target_state,
4797 max_depth,
4798 abort_on_failure,
4799 } if fk_column == &col.name => Some((
4800 referenced_table,
4801 referenced_column,
4802 trigger_state,
4803 target_state,
4804 *max_depth,
4805 *abort_on_failure,
4806 )),
4807 _ => None,
4808 })
4809 .collect::<Vec<_>>();
4810
4811 if let Some(reference) = &col.references {
4812 ty.push_str(&format!(
4813 " REFERENCES {}({})",
4814 reference.table, reference.column
4815 ));
4816 } else if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
4817 ty.push_str(&format!(
4818 " REFERENCES {}({})",
4819 referenced_table, referenced_column
4820 ));
4821 }
4822
4823 if col.references.is_some() || !fk_rules.is_empty() {
4824 for (_, _, trigger_state, target_state, max_depth, abort_on_failure) in fk_rules {
4825 ty.push_str(&format!(
4826 " ON STATE {} PROPAGATE SET {}",
4827 trigger_state, target_state
4828 ));
4829 if max_depth != 10 {
4830 ty.push_str(&format!(" MAX DEPTH {max_depth}"));
4831 }
4832 if abort_on_failure {
4833 ty.push_str(" ABORT ON FAILURE");
4834 }
4835 }
4836 }
4837 if col.primary_key {
4838 ty.push_str(" PRIMARY KEY");
4839 }
4840 if !col.nullable && !col.primary_key {
4841 ty.push_str(" NOT NULL");
4842 }
4843 if col.unique {
4844 ty.push_str(" UNIQUE");
4845 }
4846 if col.expires {
4847 ty.push_str(" EXPIRES");
4848 }
4849 if col.immutable {
4850 ty.push_str(" IMMUTABLE");
4851 }
4852
4853 ty
4854}
4855
4856fn create_table_constraints_from_ast(ct: &CreateTable) -> Vec<String> {
4857 let mut constraints = Vec::new();
4858
4859 if ct.immutable {
4860 constraints.push("IMMUTABLE".to_string());
4861 }
4862
4863 if let Some(sm) = &ct.state_machine {
4864 let transitions = sm
4865 .transitions
4866 .iter()
4867 .map(|(from, tos)| format!("{from} -> [{}]", tos.join(", ")))
4868 .collect::<Vec<_>>()
4869 .join(", ");
4870 constraints.push(format!("STATE MACHINE ({}: {})", sm.column, transitions));
4871 }
4872
4873 if !ct.dag_edge_types.is_empty() {
4874 let edge_types = ct
4875 .dag_edge_types
4876 .iter()
4877 .map(|edge_type| format!("'{edge_type}'"))
4878 .collect::<Vec<_>>()
4879 .join(", ");
4880 constraints.push(format!("DAG({edge_types})"));
4881 }
4882
4883 if let Some(retain) = &ct.retain {
4884 let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(retain.duration_seconds));
4885 if retain.sync_safe {
4886 clause.push_str(" SYNC SAFE");
4887 }
4888 constraints.push(clause);
4889 }
4890
4891 for unique_constraint in &ct.unique_constraints {
4892 constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
4893 }
4894
4895 for rule in &ct.propagation_rules {
4896 match rule {
4897 contextdb_parser::ast::AstPropagationRule::EdgeState {
4898 edge_type,
4899 direction,
4900 trigger_state,
4901 target_state,
4902 max_depth,
4903 abort_on_failure,
4904 } => {
4905 let mut clause = format!(
4906 "PROPAGATE ON EDGE {} {} STATE {} SET {}",
4907 edge_type, direction, trigger_state, target_state
4908 );
4909 if max_depth.unwrap_or(10) != 10 {
4910 clause.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
4911 }
4912 if *abort_on_failure {
4913 clause.push_str(" ABORT ON FAILURE");
4914 }
4915 constraints.push(clause);
4916 }
4917 contextdb_parser::ast::AstPropagationRule::VectorExclusion { trigger_state } => {
4918 constraints.push(format!(
4919 "PROPAGATE ON STATE {} EXCLUDE VECTOR",
4920 trigger_state
4921 ));
4922 }
4923 contextdb_parser::ast::AstPropagationRule::FkState { .. } => {}
4924 }
4925 }
4926
4927 constraints
4928}
4929
4930fn create_table_constraints_from_meta(meta: &TableMeta) -> Vec<String> {
4931 let mut constraints = Vec::new();
4932
4933 if meta.immutable {
4934 constraints.push("IMMUTABLE".to_string());
4935 }
4936
4937 if let Some(sm) = &meta.state_machine {
4938 let states = sm
4939 .transitions
4940 .iter()
4941 .map(|(from, to)| format!("{from} -> [{}]", to.join(", ")))
4942 .collect::<Vec<_>>()
4943 .join(", ");
4944 constraints.push(format!("STATE MACHINE ({}: {})", sm.column, states));
4945 }
4946
4947 if !meta.dag_edge_types.is_empty() {
4948 let edge_types = meta
4949 .dag_edge_types
4950 .iter()
4951 .map(|edge_type| format!("'{edge_type}'"))
4952 .collect::<Vec<_>>()
4953 .join(", ");
4954 constraints.push(format!("DAG({edge_types})"));
4955 }
4956
4957 if let Some(ttl_seconds) = meta.default_ttl_seconds {
4958 let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(ttl_seconds));
4959 if meta.sync_safe {
4960 clause.push_str(" SYNC SAFE");
4961 }
4962 constraints.push(clause);
4963 }
4964
4965 for unique_constraint in &meta.unique_constraints {
4966 constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
4967 }
4968
4969 for rule in &meta.propagation_rules {
4970 match rule {
4971 PropagationRule::Edge {
4972 edge_type,
4973 direction,
4974 trigger_state,
4975 target_state,
4976 max_depth,
4977 abort_on_failure,
4978 } => {
4979 let dir = match direction {
4980 Direction::Incoming => "INCOMING",
4981 Direction::Outgoing => "OUTGOING",
4982 Direction::Both => "BOTH",
4983 };
4984 let mut clause = format!(
4985 "PROPAGATE ON EDGE {} {} STATE {} SET {}",
4986 edge_type, dir, trigger_state, target_state
4987 );
4988 if *max_depth != 10 {
4989 clause.push_str(&format!(" MAX DEPTH {max_depth}"));
4990 }
4991 if *abort_on_failure {
4992 clause.push_str(" ABORT ON FAILURE");
4993 }
4994 constraints.push(clause);
4995 }
4996 PropagationRule::VectorExclusion { trigger_state } => {
4997 constraints.push(format!(
4998 "PROPAGATE ON STATE {} EXCLUDE VECTOR",
4999 trigger_state
5000 ));
5001 }
5002 PropagationRule::ForeignKey { .. } => {}
5003 }
5004 }
5005
5006 constraints
5007}
5008
5009fn ttl_seconds_to_sql(seconds: u64) -> String {
5010 if seconds.is_multiple_of(24 * 60 * 60) {
5011 format!("{} DAYS", seconds / (24 * 60 * 60))
5012 } else if seconds.is_multiple_of(60 * 60) {
5013 format!("{} HOURS", seconds / (60 * 60))
5014 } else if seconds.is_multiple_of(60) {
5015 format!("{} MINUTES", seconds / 60)
5016 } else {
5017 format!("{seconds} SECONDS")
5018 }
5019}