1use crate::composite_store::{ChangeLogEntry, CompositeStore};
2use crate::executor::execute_plan;
3use crate::persistence::RedbPersistence;
4use crate::persistent_store::PersistentCompositeStore;
5use crate::plugin::{
6 CommitEvent, CommitSource, CorePlugin, DatabasePlugin, PluginHealth, QueryOutcome,
7 SubscriptionMetrics,
8};
9use crate::rank_formula::{FormulaEvalError, RankFormula};
10use crate::schema_enforcer::validate_dml;
11use crate::sync_types::{
12 ApplyResult, ChangeSet, Conflict, ConflictPolicies, ConflictPolicy, DdlChange, EdgeChange,
13 NaturalKey, RowChange, VectorChange,
14};
15use contextdb_core::*;
16use contextdb_graph::{GraphStore, MemGraphExecutor};
17use contextdb_parser::Statement;
18use contextdb_parser::ast::{AlterAction, CreateTable, DataType};
19use contextdb_planner::PhysicalPlan;
20use contextdb_relational::{MemRelationalExecutor, RelationalStore};
21use contextdb_tx::{TxManager, WriteSetApplicator};
22use contextdb_vector::{HnswGraphStats, HnswIndex, MemVectorExecutor, VectorStore};
23use parking_lot::{Mutex, RwLock};
24use roaring::RoaringTreemap;
25use std::collections::{HashMap, HashSet, VecDeque};
26use std::path::Path;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
29use std::sync::{Arc, OnceLock};
30use std::thread::{self, JoinHandle};
31use std::time::{Duration, Instant};
32
33type DynStore = Box<dyn WriteSetApplicator>;
34const DEFAULT_SUBSCRIPTION_CAPACITY: usize = 64;
35const MAX_STATEMENT_CACHE_ENTRIES: usize = 1024;
36const MIN_DISK_WRITE_HEADROOM_BYTES: u64 = 1024;
40
41#[derive(Debug, Clone)]
42pub struct IndexCandidate {
43 pub name: String,
44 pub rejected_reason: std::borrow::Cow<'static, str>,
45}
46
47#[derive(Debug, Clone, Default)]
48pub struct QueryTrace {
49 pub physical_plan: &'static str,
50 pub index_used: Option<String>,
51 pub predicates_pushed: smallvec::SmallVec<[std::borrow::Cow<'static, str>; 4]>,
52 pub indexes_considered: smallvec::SmallVec<[IndexCandidate; 4]>,
53 pub sort_elided: bool,
54}
55
56impl QueryTrace {
57 pub fn scan() -> Self {
61 Self {
62 physical_plan: "Scan",
63 ..Default::default()
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
69pub struct CascadeReport {
70 pub dropped_indexes: Vec<String>,
71}
72
73#[derive(Debug, Clone)]
74pub struct QueryResult {
75 pub columns: Vec<String>,
76 pub rows: Vec<Vec<Value>>,
77 pub rows_affected: u64,
78 pub trace: QueryTrace,
79 pub cascade: Option<CascadeReport>,
80}
81
82#[derive(Debug, Clone)]
83pub struct SemanticQuery {
84 pub table: String,
85 pub vector_column: String,
86 pub query: Vec<f32>,
87 pub limit: usize,
88 pub sort_key: Option<String>,
89 pub min_similarity: Option<f32>,
90 pub where_clause: Option<String>,
91}
92
93impl SemanticQuery {
94 pub fn new(
95 table: impl Into<String>,
96 vector_column: impl Into<String>,
97 query: Vec<f32>,
98 limit: usize,
99 ) -> Self {
100 Self {
101 table: table.into(),
102 vector_column: vector_column.into(),
103 query,
104 limit,
105 sort_key: None,
106 min_similarity: None,
107 where_clause: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, PartialEq)]
113pub struct SearchResult {
114 pub row_id: RowId,
115 pub values: HashMap<String, Value>,
116 pub vector_score: f32,
117 pub rank: f32,
121}
122
123#[derive(Debug, Clone)]
124struct CachedStatement {
125 stmt: Statement,
126 plan: PhysicalPlan,
127}
128
129impl QueryResult {
130 pub fn empty() -> Self {
131 Self {
132 columns: vec![],
133 rows: vec![],
134 rows_affected: 0,
135 trace: QueryTrace::scan(),
136 cascade: None,
137 }
138 }
139
140 pub fn empty_with_affected(rows_affected: u64) -> Self {
141 Self {
142 columns: vec![],
143 rows: vec![],
144 rows_affected,
145 trace: QueryTrace::scan(),
146 cascade: None,
147 }
148 }
149}
150
151thread_local! {
152 static SNAPSHOT_OVERRIDE: std::cell::RefCell<Option<SnapshotId>> =
153 const { std::cell::RefCell::new(None) };
154}
155
156pub struct Database {
157 tx_mgr: Arc<TxManager<DynStore>>,
158 relational_store: Arc<RelationalStore>,
159 graph_store: Arc<GraphStore>,
160 vector_store: Arc<VectorStore>,
161 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
162 ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
163 persistence: Option<Arc<RedbPersistence>>,
164 relational: MemRelationalExecutor<DynStore>,
165 graph: MemGraphExecutor<DynStore>,
166 vector: MemVectorExecutor<DynStore>,
167 session_tx: Mutex<Option<TxId>>,
168 instance_id: uuid::Uuid,
169 plugin: Arc<dyn DatabasePlugin>,
170 accountant: Arc<MemoryAccountant>,
171 conflict_policies: RwLock<ConflictPolicies>,
172 subscriptions: Mutex<SubscriptionState>,
173 pruning_runtime: Mutex<PruningRuntime>,
174 pruning_guard: Arc<Mutex<()>>,
175 disk_limit: AtomicU64,
176 disk_limit_startup_ceiling: AtomicU64,
177 sync_watermark: Arc<AtomicLsn>,
178 closed: AtomicBool,
179 rows_examined: AtomicU64,
180 statement_cache: RwLock<HashMap<String, Arc<CachedStatement>>>,
181 rank_formula_cache: RwLock<HashMap<(String, String), Arc<RankFormula>>>,
182 rank_policy_eval_count: AtomicU64,
183 rank_policy_formula_parse_count: AtomicU64,
184 corrupt_joined_values: RwLock<HashSet<(String, RowId, String)>>,
185}
186
187pub(crate) enum InsertRowResult {
188 Inserted(RowId),
189 NoOp,
190}
191
192#[derive(Debug, Default)]
193pub(crate) struct IndexScanTxOverlay {
194 pub deleted_row_ids: std::collections::HashSet<RowId>,
195 pub matching_inserts: Vec<VersionedRow>,
196}
197
198enum RowConstraintCheck {
199 Valid,
200 DuplicateUniqueNoOp,
201}
202
203enum ConstraintProbe {
204 NoIndex,
205 NoMatch,
206 Match(RowId),
207}
208
209impl std::fmt::Debug for Database {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("Database")
212 .field("instance_id", &self.instance_id)
213 .finish_non_exhaustive()
214 }
215}
216
217#[derive(Debug, Clone)]
218struct PropagationQueueEntry {
219 table: String,
220 uuid: uuid::Uuid,
221 target_state: String,
222 depth: u32,
223 abort_on_failure: bool,
224}
225
226#[derive(Debug, Clone, Copy)]
227struct PropagationSource<'a> {
228 table: &'a str,
229 uuid: uuid::Uuid,
230 state: &'a str,
231 depth: u32,
232}
233
234#[derive(Debug, Clone, Copy)]
235struct PropagationContext<'a> {
236 tx: TxId,
237 snapshot: SnapshotId,
238 metas: &'a HashMap<String, TableMeta>,
239}
240
241#[derive(Debug)]
242struct SubscriptionState {
243 subscribers: Vec<SyncSender<CommitEvent>>,
244 events_sent: u64,
245 events_dropped: u64,
246}
247
248impl SubscriptionState {
249 fn new() -> Self {
250 Self {
251 subscribers: Vec::new(),
252 events_sent: 0,
253 events_dropped: 0,
254 }
255 }
256}
257
258#[derive(Debug)]
259struct PruningRuntime {
260 shutdown: Arc<AtomicBool>,
261 handle: Option<JoinHandle<()>>,
262}
263
264impl PruningRuntime {
265 fn new() -> Self {
266 Self {
267 shutdown: Arc::new(AtomicBool::new(false)),
268 handle: None,
269 }
270 }
271}
272
273impl Database {
274 #[allow(clippy::too_many_arguments)]
275 fn build_db(
276 tx_mgr: Arc<TxManager<DynStore>>,
277 relational: Arc<RelationalStore>,
278 graph: Arc<GraphStore>,
279 vector_store: Arc<VectorStore>,
280 hnsw: Arc<OnceLock<parking_lot::RwLock<Option<HnswIndex>>>>,
281 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
282 ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
283 persistence: Option<Arc<RedbPersistence>>,
284 plugin: Arc<dyn DatabasePlugin>,
285 accountant: Arc<MemoryAccountant>,
286 disk_limit: Option<u64>,
287 disk_limit_startup_ceiling: Option<u64>,
288 ) -> Self {
289 Self {
290 tx_mgr: tx_mgr.clone(),
291 relational_store: relational.clone(),
292 graph_store: graph.clone(),
293 vector_store: vector_store.clone(),
294 change_log,
295 ddl_log,
296 persistence,
297 relational: MemRelationalExecutor::new(relational, tx_mgr.clone()),
298 graph: MemGraphExecutor::new(graph, tx_mgr.clone()),
299 vector: MemVectorExecutor::new_with_accountant(
300 vector_store,
301 tx_mgr.clone(),
302 hnsw,
303 accountant.clone(),
304 ),
305 session_tx: Mutex::new(None),
306 instance_id: uuid::Uuid::new_v4(),
307 plugin,
308 accountant,
309 conflict_policies: RwLock::new(ConflictPolicies::uniform(ConflictPolicy::LatestWins)),
310 subscriptions: Mutex::new(SubscriptionState::new()),
311 pruning_runtime: Mutex::new(PruningRuntime::new()),
312 pruning_guard: Arc::new(Mutex::new(())),
313 disk_limit: AtomicU64::new(disk_limit.unwrap_or(0)),
314 disk_limit_startup_ceiling: AtomicU64::new(disk_limit_startup_ceiling.unwrap_or(0)),
315 sync_watermark: Arc::new(AtomicLsn::new(Lsn(0))),
316 closed: AtomicBool::new(false),
317 rows_examined: AtomicU64::new(0),
318 statement_cache: RwLock::new(HashMap::new()),
319 rank_formula_cache: RwLock::new(HashMap::new()),
320 rank_policy_eval_count: AtomicU64::new(0),
321 rank_policy_formula_parse_count: AtomicU64::new(0),
322 corrupt_joined_values: RwLock::new(HashSet::new()),
323 }
324 }
325
326 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
327 Self::open_with_config(
328 path,
329 Arc::new(CorePlugin),
330 Arc::new(MemoryAccountant::no_limit()),
331 )
332 }
333
334 pub fn open_memory() -> Self {
335 Self::open_memory_with_plugin_and_accountant(
336 Arc::new(CorePlugin),
337 Arc::new(MemoryAccountant::no_limit()),
338 )
339 .expect("failed to open in-memory database")
340 }
341
342 fn open_loaded(
343 path: impl AsRef<Path>,
344 plugin: Arc<dyn DatabasePlugin>,
345 mut accountant: Arc<MemoryAccountant>,
346 startup_disk_limit: Option<u64>,
347 ) -> Result<Self> {
348 let path = path.as_ref();
349 let persistence = if path.exists() {
350 Arc::new(RedbPersistence::open(path)?)
351 } else {
352 Arc::new(RedbPersistence::create(path)?)
353 };
354 if accountant.usage().limit.is_none()
355 && let Some(limit) = persistence.load_config_value::<usize>("memory_limit")?
356 {
357 accountant = Arc::new(MemoryAccountant::with_budget(limit));
358 }
359 let persisted_disk_limit = persistence.load_config_value::<u64>("disk_limit")?;
360 let startup_disk_ceiling = startup_disk_limit;
361 let effective_disk_limit = match (persisted_disk_limit, startup_disk_limit) {
362 (Some(persisted), Some(ceiling)) => Some(persisted.min(ceiling)),
363 (Some(persisted), None) => Some(persisted),
364 (None, Some(ceiling)) => Some(ceiling),
365 (None, None) => None,
366 };
367
368 let all_meta = persistence.load_all_table_meta()?;
369
370 let relational = Arc::new(RelationalStore::new());
371 for (name, meta) in &all_meta {
372 relational.create_table(name, meta.clone());
373 for decl in &meta.indexes {
377 relational.create_index_storage(name, &decl.name, decl.columns.clone());
378 }
379 for row in persistence.load_relational_table(name)? {
380 relational.insert_loaded_row(name, row);
381 }
382 }
383
384 let graph = Arc::new(GraphStore::new());
385 for edge in persistence.load_forward_edges()? {
386 graph.insert_loaded_edge(edge);
387 }
388
389 let hnsw = Arc::new(OnceLock::new());
390 let vector = Arc::new(VectorStore::new(hnsw.clone()));
391 for (table_name, meta) in &all_meta {
392 for column in &meta.columns {
393 if let ColumnType::Vector(dimension) = column.column_type {
394 vector.register_index(
395 VectorIndexRef::new(table_name, column.name.clone()),
396 dimension,
397 column.quantization,
398 );
399 }
400 }
401 }
402 let loaded_vectors = persistence.load_vectors()?;
403 for entry in &loaded_vectors {
404 vector.insert_loaded_vector(entry.clone());
405 }
406 hydrate_relational_vector_values(&relational, &loaded_vectors);
407
408 let max_row_id = relational.max_row_id();
409 let max_tx = max_tx_across_all(&relational, &graph, &vector);
410 let max_lsn = max_lsn_across_all(&relational, &graph, &vector);
411 relational.set_next_row_id(RowId(max_row_id.0.saturating_add(1)));
412
413 let change_log = Arc::new(RwLock::new(persistence.load_change_log()?));
414 let ddl_log = Arc::new(RwLock::new(persistence.load_ddl_log()?));
415 let composite = CompositeStore::new(
416 relational.clone(),
417 graph.clone(),
418 vector.clone(),
419 change_log.clone(),
420 ddl_log.clone(),
421 );
422 let persistent = PersistentCompositeStore::new(composite, persistence.clone());
423 let store: DynStore = Box::new(persistent);
424 let tx_mgr = Arc::new(TxManager::new_with_counters(
425 store,
426 TxId(max_tx.0.saturating_add(1)),
427 Lsn(max_lsn.0.saturating_add(1)),
428 max_tx,
429 ));
430
431 let db = Self::build_db(
432 tx_mgr,
433 relational,
434 graph,
435 vector,
436 hnsw,
437 change_log,
438 ddl_log,
439 Some(persistence),
440 plugin,
441 accountant,
442 effective_disk_limit,
443 startup_disk_ceiling,
444 );
445
446 for meta in all_meta.values() {
447 if !meta.dag_edge_types.is_empty() {
448 db.graph.register_dag_edge_types(&meta.dag_edge_types);
449 }
450 }
451 db.rebuild_rank_formula_cache_from_meta(&all_meta)?;
452
453 db.account_loaded_state()?;
454 maybe_prebuild_hnsw(&db.vector_store, db.accountant());
455
456 Ok(db)
457 }
458
459 fn open_memory_internal(
460 plugin: Arc<dyn DatabasePlugin>,
461 accountant: Arc<MemoryAccountant>,
462 ) -> Result<Self> {
463 let relational = Arc::new(RelationalStore::new());
464 let graph = Arc::new(GraphStore::new());
465 let hnsw = Arc::new(OnceLock::new());
466 let vector = Arc::new(VectorStore::new(hnsw.clone()));
467 let change_log = Arc::new(RwLock::new(Vec::new()));
468 let ddl_log = Arc::new(RwLock::new(Vec::new()));
469 let store: DynStore = Box::new(CompositeStore::new(
470 relational.clone(),
471 graph.clone(),
472 vector.clone(),
473 change_log.clone(),
474 ddl_log.clone(),
475 ));
476 let tx_mgr = Arc::new(TxManager::new(store));
477
478 let db = Self::build_db(
479 tx_mgr, relational, graph, vector, hnsw, change_log, ddl_log, None, plugin, accountant,
480 None, None,
481 );
482 maybe_prebuild_hnsw(&db.vector_store, db.accountant());
483 Ok(db)
484 }
485
486 pub fn begin(&self) -> TxId {
487 self.tx_mgr.begin()
488 }
489
490 pub fn commit(&self, tx: TxId) -> Result<()> {
491 self.commit_with_source(tx, CommitSource::User)
492 }
493
494 pub fn rollback(&self, tx: TxId) -> Result<()> {
495 let ws = self.tx_mgr.cloned_write_set(tx)?;
496 self.release_insert_allocations(&ws);
497 self.tx_mgr.rollback(tx)
498 }
499
500 pub fn snapshot(&self) -> SnapshotId {
501 self.tx_mgr.snapshot()
502 }
503
504 pub fn execute(&self, sql: &str, params: &HashMap<String, Value>) -> Result<QueryResult> {
505 if let Some(cached) = self.cached_statement(sql) {
506 let active_tx = *self.session_tx.lock();
507 return self.execute_statement_with_plan(
508 &cached.stmt,
509 sql,
510 params,
511 active_tx,
512 Some(&cached.plan),
513 );
514 }
515
516 let stmt = contextdb_parser::parse(sql)?;
517
518 match &stmt {
519 Statement::Begin => {
520 let mut session = self.session_tx.lock();
521 if session.is_none() {
522 *session = Some(self.begin());
523 }
524 return Ok(QueryResult::empty());
525 }
526 Statement::Commit => {
527 let mut session = self.session_tx.lock();
528 if let Some(tx) = session.take() {
529 self.commit_with_source(tx, CommitSource::User)?;
530 }
531 return Ok(QueryResult::empty());
532 }
533 Statement::Rollback => {
534 let mut session = self.session_tx.lock();
535 if let Some(tx) = *session {
536 self.rollback(tx)?;
537 *session = None;
538 }
539 return Ok(QueryResult::empty());
540 }
541 _ => {}
542 }
543
544 let active_tx = *self.session_tx.lock();
545 self.execute_statement(&stmt, sql, params, active_tx)
546 }
547
548 fn cached_statement(&self, sql: &str) -> Option<Arc<CachedStatement>> {
549 self.statement_cache.read().get(sql).cloned()
550 }
551
552 fn cache_statement_if_eligible(&self, sql: &str, stmt: &Statement, plan: &PhysicalPlan) {
553 if !Self::is_statement_cache_eligible(stmt, plan) {
554 return;
555 }
556
557 let mut cache = self.statement_cache.write();
558 if cache.contains_key(sql) {
559 return;
560 }
561 if cache.len() >= MAX_STATEMENT_CACHE_ENTRIES {
562 return;
563 }
564 cache.insert(
565 sql.to_string(),
566 Arc::new(CachedStatement {
567 stmt: stmt.clone(),
568 plan: plan.clone(),
569 }),
570 );
571 }
572
573 fn is_statement_cache_eligible(stmt: &Statement, plan: &PhysicalPlan) -> bool {
574 matches!((stmt, plan), (Statement::Insert(ins), PhysicalPlan::Insert(_))
575 if !ins.table.eq_ignore_ascii_case("GRAPH")
576 && !ins.table.eq_ignore_ascii_case("__edges"))
577 }
578
579 pub(crate) fn clear_statement_cache(&self) {
580 self.statement_cache.write().clear();
581 }
582
583 #[doc(hidden)]
584 pub fn __statement_cache_len(&self) -> usize {
585 self.statement_cache.read().len()
586 }
587
588 fn execute_autocommit(
589 &self,
590 plan: &PhysicalPlan,
591 params: &HashMap<String, Value>,
592 ) -> Result<QueryResult> {
593 self.__reset_rows_examined();
597 match plan {
598 PhysicalPlan::Insert(_) | PhysicalPlan::Delete(_) | PhysicalPlan::Update(_) => {
599 let tx = self.begin();
600 let result = execute_plan(self, plan, params, Some(tx));
601 match result {
602 Ok(qr) => {
603 self.commit_with_source(tx, CommitSource::AutoCommit)?;
604 Ok(qr)
605 }
606 Err(e) => {
607 let _ = self.rollback(tx);
608 Err(e)
609 }
610 }
611 }
612 _ => execute_plan(self, plan, params, None),
613 }
614 }
615
616 pub fn explain(&self, sql: &str) -> Result<String> {
617 let stmt = contextdb_parser::parse(sql)?;
618 let plan = contextdb_planner::plan(&stmt)?;
619 let vector_index = vector_index_from_plan(&plan);
620 if let Some(index) = &vector_index
621 && let Some(state) = self.vector_store.try_state(index)
622 && state.entry_count() >= 1000
623 && state.hnsw_len().is_none()
624 {
625 let query = vec![0.0_f32; state.dimension()];
626 let _ = self.query_vector_strict(index.clone(), &query, 1, None, self.snapshot());
627 }
628 let mut output = plan.explain();
629 let uses_hnsw = vector_index
630 .as_ref()
631 .is_some_and(|index| self.vector_store.has_hnsw_index_for(index));
632 if uses_hnsw {
633 output = output.replace("VectorSearch(", "HNSWSearch(");
634 output = output.replace("VectorSearch {", "HNSWSearch {");
635 } else {
636 output = output.replace("VectorSearch(", "VectorSearch(strategy=BruteForce, ");
637 output = output.replace("VectorSearch {", "VectorSearch { strategy: BruteForce,");
638 }
639 Ok(output)
640 }
641
642 pub fn execute_in_tx(
643 &self,
644 tx: TxId,
645 sql: &str,
646 params: &HashMap<String, Value>,
647 ) -> Result<QueryResult> {
648 let stmt = contextdb_parser::parse(sql)?;
649 self.execute_statement(&stmt, sql, params, Some(tx))
650 }
651
652 fn commit_with_source(&self, tx: TxId, source: CommitSource) -> Result<()> {
653 let mut ws = self.tx_mgr.cloned_write_set(tx)?;
654
655 if !ws.is_empty()
656 && let Err(err) = self.validate_foreign_keys_in_tx(tx)
657 {
658 let _ = self.rollback(tx);
659 return Err(err);
660 }
661
662 if !ws.is_empty()
663 && let Err(err) = self.plugin.pre_commit(&ws, source)
664 {
665 let _ = self.rollback(tx);
666 return Err(err);
667 }
668
669 let lsn = self.tx_mgr.commit_with_lsn(tx)?;
670
671 if !ws.is_empty() {
672 self.release_delete_allocations(&ws);
673 ws.stamp_lsn(lsn);
674 self.plugin.post_commit(&ws, source);
675 self.publish_commit_event_if_subscribers(&ws, source, lsn);
676 }
677
678 Ok(())
679 }
680
681 fn build_commit_event(
682 ws: &contextdb_tx::WriteSet,
683 source: CommitSource,
684 lsn: Lsn,
685 ) -> CommitEvent {
686 let mut tables_changed: Vec<String> = ws
687 .relational_inserts
688 .iter()
689 .map(|(table, _)| table.clone())
690 .chain(
691 ws.relational_deletes
692 .iter()
693 .map(|(table, _, _)| table.clone()),
694 )
695 .chain(
696 ws.vector_inserts
697 .iter()
698 .map(|entry| entry.index.table.clone()),
699 )
700 .chain(
701 ws.vector_deletes
702 .iter()
703 .map(|(index, _, _)| index.table.clone()),
704 )
705 .chain(
706 ws.vector_moves
707 .iter()
708 .map(|(index, _, _, _)| index.table.clone()),
709 )
710 .collect::<HashSet<_>>()
711 .into_iter()
712 .collect();
713 tables_changed.sort();
714
715 CommitEvent {
716 source,
717 lsn,
718 tables_changed,
719 row_count: ws.relational_inserts.len()
720 + ws.relational_deletes.len()
721 + ws.adj_inserts.len()
722 + ws.adj_deletes.len()
723 + ws.vector_inserts.len()
724 + ws.vector_deletes.len()
725 + ws.vector_moves.len(),
726 }
727 }
728
729 fn publish_commit_event_if_subscribers(
730 &self,
731 ws: &contextdb_tx::WriteSet,
732 source: CommitSource,
733 lsn: Lsn,
734 ) {
735 let mut subscriptions = self.subscriptions.lock();
736 if subscriptions.subscribers.is_empty() {
737 return;
738 }
739 let event = Self::build_commit_event(ws, source, lsn);
740 let subscribers = std::mem::take(&mut subscriptions.subscribers);
741 let mut live_subscribers = Vec::with_capacity(subscribers.len());
742
743 for sender in subscribers {
744 match sender.try_send(event.clone()) {
745 Ok(()) => {
746 subscriptions.events_sent += 1;
747 live_subscribers.push(sender);
748 }
749 Err(TrySendError::Full(_)) => {
750 subscriptions.events_dropped += 1;
751 live_subscribers.push(sender);
752 }
753 Err(TrySendError::Disconnected(_)) => {}
754 }
755 }
756
757 subscriptions.subscribers = live_subscribers;
758 }
759
760 fn stop_pruning_thread(&self) {
761 let handle = {
762 let mut runtime = self.pruning_runtime.lock();
763 runtime.shutdown.store(true, Ordering::SeqCst);
764 let handle = runtime.handle.take();
765 runtime.shutdown = Arc::new(AtomicBool::new(false));
766 handle
767 };
768
769 if let Some(handle) = handle {
770 let _ = handle.join();
771 }
772 }
773
774 fn execute_statement(
775 &self,
776 stmt: &Statement,
777 sql: &str,
778 params: &HashMap<String, Value>,
779 tx: Option<TxId>,
780 ) -> Result<QueryResult> {
781 self.execute_statement_with_plan(stmt, sql, params, tx, None)
782 }
783
784 fn execute_statement_with_plan(
785 &self,
786 stmt: &Statement,
787 sql: &str,
788 params: &HashMap<String, Value>,
789 tx: Option<TxId>,
790 cached_plan: Option<&PhysicalPlan>,
791 ) -> Result<QueryResult> {
792 self.plugin.on_query(sql)?;
793
794 if let Some(change) = self.ddl_change_for_statement(stmt).as_ref() {
795 self.plugin.on_ddl(change)?;
796 }
797
798 if let Statement::Insert(ins) = stmt
800 && (ins.table.eq_ignore_ascii_case("GRAPH")
801 || ins.table.eq_ignore_ascii_case("__edges"))
802 {
803 return self.execute_graph_insert(ins, params, tx);
804 }
805
806 let started = Instant::now();
807 let result = (|| {
808 if let Some(plan) = cached_plan {
809 return self.run_planned_statement(stmt, plan, params, tx);
810 }
811
812 let (stmt, plan) = {
813 let stmt = self.pre_resolve_cte_subqueries(stmt, params, tx)?;
815 let plan = contextdb_planner::plan(&stmt)?;
816 self.cache_statement_if_eligible(sql, &stmt, &plan);
817 (stmt, plan)
818 };
819 self.run_planned_statement(&stmt, &plan, params, tx)
820 })();
821 let duration = started.elapsed();
822 let outcome = query_outcome_from_result(&result);
823 self.plugin.post_query(sql, duration, &outcome);
824 result.map(strip_internal_row_id)
825 }
826
827 fn run_planned_statement(
828 &self,
829 stmt: &Statement,
830 plan: &PhysicalPlan,
831 params: &HashMap<String, Value>,
832 tx: Option<TxId>,
833 ) -> Result<QueryResult> {
834 validate_dml(plan, self, params)?;
835 let result = match tx {
836 Some(tx) => {
837 self.__reset_rows_examined();
840 execute_plan(self, plan, params, Some(tx))
841 }
842 None => self.execute_autocommit(plan, params),
843 };
844 if result.is_ok()
845 && let Statement::CreateTable(ct) = stmt
846 && !ct.dag_edge_types.is_empty()
847 {
848 self.graph.register_dag_edge_types(&ct.dag_edge_types);
849 }
850 result
851 }
852
853 fn execute_graph_insert(
855 &self,
856 ins: &contextdb_parser::ast::Insert,
857 params: &HashMap<String, Value>,
858 tx: Option<TxId>,
859 ) -> Result<QueryResult> {
860 use crate::executor::resolve_expr;
861
862 let col_index = |name: &str| {
863 ins.columns
864 .iter()
865 .position(|c| c.eq_ignore_ascii_case(name))
866 };
867 let source_idx = col_index("source_id")
868 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires source_id column".into()))?;
869 let target_idx = col_index("target_id")
870 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires target_id column".into()))?;
871 let edge_type_idx = col_index("edge_type")
872 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires edge_type column".into()))?;
873
874 let auto_commit = tx.is_none();
875 let tx = tx.unwrap_or_else(|| self.begin());
876 let mut count = 0u64;
877 for row_exprs in &ins.values {
878 let source = resolve_expr(&row_exprs[source_idx], params)?;
879 let target = resolve_expr(&row_exprs[target_idx], params)?;
880 let edge_type = resolve_expr(&row_exprs[edge_type_idx], params)?;
881
882 let source_uuid = match &source {
883 Value::Uuid(u) => *u,
884 Value::Text(t) => uuid::Uuid::parse_str(t)
885 .map_err(|e| Error::PlanError(format!("invalid source_id uuid: {e}")))?,
886 _ => return Err(Error::PlanError("source_id must be UUID".into())),
887 };
888 let target_uuid = match &target {
889 Value::Uuid(u) => *u,
890 Value::Text(t) => uuid::Uuid::parse_str(t)
891 .map_err(|e| Error::PlanError(format!("invalid target_id uuid: {e}")))?,
892 _ => return Err(Error::PlanError("target_id must be UUID".into())),
893 };
894 let edge_type_str = match &edge_type {
895 Value::Text(t) => t.clone(),
896 _ => return Err(Error::PlanError("edge_type must be TEXT".into())),
897 };
898
899 self.insert_edge(
900 tx,
901 source_uuid,
902 target_uuid,
903 edge_type_str,
904 Default::default(),
905 )?;
906 count += 1;
907 }
908
909 if auto_commit {
910 self.commit_with_source(tx, CommitSource::AutoCommit)?;
911 }
912
913 Ok(QueryResult::empty_with_affected(count))
914 }
915
916 fn ddl_change_for_statement(&self, stmt: &Statement) -> Option<DdlChange> {
917 match stmt {
918 Statement::CreateTable(ct) => Some(ddl_change_from_create_table(ct)),
919 Statement::DropTable(dt) => Some(DdlChange::DropTable {
920 name: dt.name.clone(),
921 }),
922 Statement::AlterTable(at) => {
923 let mut meta = self.table_meta(&at.table).unwrap_or_default();
924 match &at.action {
926 AlterAction::AddColumn(col) => {
927 meta.columns.push(contextdb_core::ColumnDef {
928 name: col.name.clone(),
929 column_type: crate::executor::map_column_type(&col.data_type),
930 nullable: col.nullable,
931 primary_key: col.primary_key,
932 unique: col.unique,
933 default: col
934 .default
935 .as_ref()
936 .map(crate::executor::stored_default_expr),
937 references: col.references.as_ref().map(|reference| {
938 contextdb_core::ForeignKeyReference {
939 table: reference.table.clone(),
940 column: reference.column.clone(),
941 }
942 }),
943 expires: col.expires,
944 immutable: col.immutable,
945 quantization: match col.quantization {
946 contextdb_parser::ast::VectorQuantization::F32 => {
947 contextdb_core::VectorQuantization::F32
948 }
949 contextdb_parser::ast::VectorQuantization::SQ8 => {
950 contextdb_core::VectorQuantization::SQ8
951 }
952 contextdb_parser::ast::VectorQuantization::SQ4 => {
953 contextdb_core::VectorQuantization::SQ4
954 }
955 },
956 rank_policy: col
957 .rank_policy
958 .as_deref()
959 .map(crate::executor::map_rank_policy),
960 });
961 if col.expires {
962 meta.expires_column = Some(col.name.clone());
963 }
964 }
965 AlterAction::DropColumn {
966 column: name,
967 cascade: _,
968 } => {
969 meta.columns.retain(|c| c.name != *name);
970 if meta.expires_column.as_deref() == Some(name.as_str()) {
971 meta.expires_column = None;
972 }
973 }
974 AlterAction::RenameColumn { from, to } => {
975 if let Some(c) = meta.columns.iter_mut().find(|c| c.name == *from) {
976 c.name = to.clone();
977 }
978 if meta.expires_column.as_deref() == Some(from.as_str()) {
979 meta.expires_column = Some(to.clone());
980 }
981 }
982 AlterAction::SetRetain {
983 duration_seconds,
984 sync_safe,
985 } => {
986 meta.default_ttl_seconds = Some(*duration_seconds);
987 meta.sync_safe = *sync_safe;
988 }
989 AlterAction::DropRetain => {
990 meta.default_ttl_seconds = None;
991 meta.sync_safe = false;
992 }
993 AlterAction::SetSyncConflictPolicy(_) | AlterAction::DropSyncConflictPolicy => { }
995 }
996 Some(DdlChange::AlterTable {
997 name: at.table.clone(),
998 columns: meta
999 .columns
1000 .iter()
1001 .map(|c| {
1002 (
1003 c.name.clone(),
1004 sql_type_for_meta_column(c, &meta.propagation_rules),
1005 )
1006 })
1007 .collect(),
1008 constraints: create_table_constraints_from_meta(&meta),
1009 })
1010 }
1011 _ => None,
1012 }
1013 }
1014
1015 fn pre_resolve_cte_subqueries(
1018 &self,
1019 stmt: &Statement,
1020 params: &HashMap<String, Value>,
1021 tx: Option<TxId>,
1022 ) -> Result<Statement> {
1023 if let Statement::Select(sel) = stmt
1024 && !sel.ctes.is_empty()
1025 && sel.body.where_clause.is_some()
1026 {
1027 use crate::executor::resolve_in_subqueries_with_ctes;
1028 let resolved_where = sel
1029 .body
1030 .where_clause
1031 .as_ref()
1032 .map(|expr| resolve_in_subqueries_with_ctes(self, expr, params, tx, &sel.ctes))
1033 .transpose()?;
1034 let mut new_body = sel.body.clone();
1035 new_body.where_clause = resolved_where;
1036 Ok(Statement::Select(contextdb_parser::ast::SelectStatement {
1037 ctes: sel.ctes.clone(),
1038 body: new_body,
1039 }))
1040 } else {
1041 Ok(stmt.clone())
1042 }
1043 }
1044
1045 pub fn insert_row(
1046 &self,
1047 tx: TxId,
1048 table: &str,
1049 values: HashMap<ColName, Value>,
1050 ) -> Result<RowId> {
1051 let values =
1056 self.coerce_row_for_insert(table, values, Some(self.committed_watermark()), Some(tx))?;
1057 self.validate_row_constraints(tx, table, &values, None)?;
1058 self.relational.insert(tx, table, values)
1059 }
1060
1061 pub(crate) fn insert_row_replacing(
1067 &self,
1068 tx: TxId,
1069 table: &str,
1070 values: HashMap<ColName, Value>,
1071 old_row_id: RowId,
1072 ) -> Result<RowId> {
1073 let values =
1074 self.coerce_row_for_insert(table, values, Some(self.committed_watermark()), Some(tx))?;
1075 self.validate_row_constraints(tx, table, &values, Some(old_row_id))?;
1076 self.relational.insert(tx, table, values)
1077 }
1078
1079 pub(crate) fn insert_row_for_sync(
1083 &self,
1084 tx: TxId,
1085 table: &str,
1086 values: HashMap<ColName, Value>,
1087 ) -> Result<RowId> {
1088 let values = self.coerce_row_for_insert(table, values, None, None)?;
1089 self.validate_row_constraints(tx, table, &values, None)?;
1090 self.relational.insert(tx, table, values)
1091 }
1092
1093 pub(crate) fn upsert_row_for_sync(
1094 &self,
1095 tx: TxId,
1096 table: &str,
1097 conflict_col: &str,
1098 values: HashMap<ColName, Value>,
1099 ) -> Result<UpsertResult> {
1100 let values = self.coerce_row_for_insert(table, values, None, None)?;
1101 self.upsert_row(tx, table, conflict_col, values)
1102 }
1103
1104 fn coerce_row_for_insert(
1113 &self,
1114 table: &str,
1115 values: HashMap<ColName, Value>,
1116 current_tx_max: Option<TxId>,
1117 active_tx: Option<TxId>,
1118 ) -> Result<HashMap<ColName, Value>> {
1119 let meta = self.table_meta(table);
1120 let mut out: HashMap<ColName, Value> = HashMap::with_capacity(values.len());
1121 for (col, v) in values {
1122 let is_vector_bypass = matches!(&v, Value::Vector(_))
1124 && meta
1125 .as_ref()
1126 .and_then(|m| m.columns.iter().find(|c| c.name == col))
1127 .map(|c| matches!(c.column_type, contextdb_core::ColumnType::Vector(_)))
1128 .unwrap_or(false);
1129
1130 let coerced = if is_vector_bypass {
1131 v
1132 } else {
1133 crate::executor::coerce_into_column(
1134 self,
1135 table,
1136 &col,
1137 v,
1138 current_tx_max,
1139 active_tx,
1140 )?
1141 };
1142 out.insert(col, coerced);
1143 }
1144 Ok(out)
1145 }
1146
1147 pub(crate) fn insert_row_with_unique_noop(
1148 &self,
1149 tx: TxId,
1150 table: &str,
1151 values: HashMap<ColName, Value>,
1152 ) -> Result<InsertRowResult> {
1153 match self.check_row_constraints(tx, table, &values, None, true)? {
1154 RowConstraintCheck::Valid => self
1155 .relational
1156 .insert(tx, table, values)
1157 .map(InsertRowResult::Inserted),
1158 RowConstraintCheck::DuplicateUniqueNoOp => Ok(InsertRowResult::NoOp),
1159 }
1160 }
1161
1162 pub fn upsert_row(
1163 &self,
1164 tx: TxId,
1165 table: &str,
1166 conflict_col: &str,
1167 values: HashMap<ColName, Value>,
1168 ) -> Result<UpsertResult> {
1169 let snapshot = self.snapshot_for_read();
1170 let existing_row = values
1171 .get(conflict_col)
1172 .map(|conflict_value| {
1173 self.point_lookup_in_tx(tx, table, conflict_col, conflict_value, snapshot)
1174 })
1175 .transpose()?
1176 .flatten();
1177 let existing_row_id = existing_row.as_ref().map(|row| row.row_id);
1178 if let (Some(existing), Some(meta)) = (existing_row.as_ref(), self.table_meta(table)) {
1182 for col_def in meta.columns.iter().filter(|c| c.immutable) {
1183 let Some(incoming) = values.get(&col_def.name) else {
1184 continue;
1185 };
1186 let existing_value = existing.values.get(&col_def.name);
1187 if existing_value != Some(incoming) {
1188 return Err(Error::ImmutableColumn {
1189 table: table.to_string(),
1190 column: col_def.name.clone(),
1191 });
1192 }
1193 }
1194 }
1195 self.validate_row_constraints(tx, table, &values, existing_row_id)?;
1196
1197 let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1198 let meta = self.table_meta(table);
1199 let new_state = meta
1200 .as_ref()
1201 .and_then(|m| m.state_machine.as_ref())
1202 .and_then(|sm| values.get(&sm.column))
1203 .and_then(Value::as_text)
1204 .map(std::borrow::ToOwned::to_owned);
1205
1206 let result = self
1207 .relational
1208 .upsert(tx, table, conflict_col, values, snapshot)?;
1209
1210 if let (Some(uuid), Some(state), Some(_meta)) =
1211 (row_uuid, new_state.as_deref(), meta.as_ref())
1212 && matches!(result, UpsertResult::Updated)
1213 {
1214 self.propagate_state_change_if_needed(tx, table, Some(uuid), Some(state))?;
1215 }
1216
1217 Ok(result)
1218 }
1219
1220 fn validate_row_constraints(
1221 &self,
1222 tx: TxId,
1223 table: &str,
1224 values: &HashMap<ColName, Value>,
1225 skip_row_id: Option<RowId>,
1226 ) -> Result<()> {
1227 match self.check_row_constraints(tx, table, values, skip_row_id, false)? {
1228 RowConstraintCheck::Valid => Ok(()),
1229 RowConstraintCheck::DuplicateUniqueNoOp => {
1230 unreachable!("strict constraint validation cannot return no-op")
1231 }
1232 }
1233 }
1234
1235 fn check_row_constraints(
1236 &self,
1237 tx: TxId,
1238 table: &str,
1239 values: &HashMap<ColName, Value>,
1240 skip_row_id: Option<RowId>,
1241 allow_duplicate_unique_noop: bool,
1242 ) -> Result<RowConstraintCheck> {
1243 let metas = self.relational_store.table_meta.read();
1244 let meta = metas
1245 .get(table)
1246 .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1247 let snapshot = self.snapshot();
1252
1253 let mut visible_rows_cache: Option<Vec<VersionedRow>> = None;
1256
1257 for column in meta.columns.iter().filter(|column| column.primary_key) {
1258 let Some(value) = values.get(&column.name) else {
1259 continue;
1260 };
1261 if *value == Value::Null {
1262 continue;
1263 }
1264 match self.probe_column_for_constraint(
1265 tx,
1266 table,
1267 &column.name,
1268 value,
1269 snapshot,
1270 skip_row_id,
1271 )? {
1272 ConstraintProbe::Match(_) => {
1273 return Err(Error::UniqueViolation {
1274 table: table.to_string(),
1275 column: column.name.clone(),
1276 });
1277 }
1278 ConstraintProbe::NoMatch => {}
1279 ConstraintProbe::NoIndex => {
1280 if visible_rows_cache.is_none() {
1282 visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1283 Some(tx),
1284 table,
1285 snapshot,
1286 &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1287 )?);
1288 }
1289 let rows = visible_rows_cache.as_deref().unwrap();
1290 if rows
1291 .iter()
1292 .any(|existing| existing.values.get(&column.name) == Some(value))
1293 {
1294 return Err(Error::UniqueViolation {
1295 table: table.to_string(),
1296 column: column.name.clone(),
1297 });
1298 }
1299 }
1300 }
1301 }
1302
1303 let mut duplicate_unique_row_id = None;
1304
1305 for column in meta
1306 .columns
1307 .iter()
1308 .filter(|column| column.unique && !column.primary_key)
1309 {
1310 let Some(value) = values.get(&column.name) else {
1311 continue;
1312 };
1313 if *value == Value::Null {
1314 continue;
1315 }
1316 let matching_row_ids: Vec<RowId> = match self.probe_column_for_constraint(
1317 tx,
1318 table,
1319 &column.name,
1320 value,
1321 snapshot,
1322 skip_row_id,
1323 )? {
1324 ConstraintProbe::Match(rid) => vec![rid],
1325 ConstraintProbe::NoMatch => Vec::new(),
1326 ConstraintProbe::NoIndex => {
1327 if visible_rows_cache.is_none() {
1328 visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1329 Some(tx),
1330 table,
1331 snapshot,
1332 &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1333 )?);
1334 }
1335 let rows = visible_rows_cache.as_deref().unwrap();
1336 rows.iter()
1337 .filter(|existing| existing.values.get(&column.name) == Some(value))
1338 .map(|existing| existing.row_id)
1339 .collect()
1340 }
1341 };
1342 self.merge_unique_conflict(
1343 table,
1344 &column.name,
1345 &matching_row_ids,
1346 allow_duplicate_unique_noop,
1347 &mut duplicate_unique_row_id,
1348 )?;
1349 }
1350
1351 for unique_constraint in &meta.unique_constraints {
1352 let mut candidate_values = Vec::with_capacity(unique_constraint.len());
1353 let mut has_null = false;
1354
1355 for column_name in unique_constraint {
1356 match values.get(column_name) {
1357 Some(Value::Null) | None => {
1358 has_null = true;
1359 break;
1360 }
1361 Some(value) => candidate_values.push(value.clone()),
1362 }
1363 }
1364
1365 if has_null {
1366 continue;
1367 }
1368
1369 let matching_row_ids: Vec<RowId> = if let Some(rid) = self.probe_composite_unique(
1370 tx,
1371 table,
1372 unique_constraint,
1373 &candidate_values,
1374 snapshot,
1375 skip_row_id,
1376 )? {
1377 vec![rid]
1378 } else if self.index_covers_composite(table, unique_constraint) {
1379 Vec::new()
1380 } else {
1381 if visible_rows_cache.is_none() {
1382 visible_rows_cache = Some(self.relational.scan_filter_with_tx(
1383 Some(tx),
1384 table,
1385 snapshot,
1386 &|row| skip_row_id.is_none_or(|row_id| row.row_id != row_id),
1387 )?);
1388 }
1389 let rows = visible_rows_cache.as_deref().unwrap();
1390 rows.iter()
1391 .filter(|existing| {
1392 unique_constraint.iter().zip(candidate_values.iter()).all(
1393 |(column_name, value)| existing.values.get(column_name) == Some(value),
1394 )
1395 })
1396 .map(|existing| existing.row_id)
1397 .collect()
1398 };
1399 let column_label = unique_constraint.first().map(|s| s.as_str()).unwrap_or("");
1402 self.merge_unique_conflict(
1403 table,
1404 column_label,
1405 &matching_row_ids,
1406 allow_duplicate_unique_noop,
1407 &mut duplicate_unique_row_id,
1408 )?;
1409 }
1410
1411 if duplicate_unique_row_id.is_some() {
1412 Ok(RowConstraintCheck::DuplicateUniqueNoOp)
1413 } else {
1414 Ok(RowConstraintCheck::Valid)
1415 }
1416 }
1417
1418 fn merge_unique_conflict(
1419 &self,
1420 table: &str,
1421 column: &str,
1422 matching_row_ids: &[RowId],
1423 allow_duplicate_unique_noop: bool,
1424 duplicate_unique_row_id: &mut Option<RowId>,
1425 ) -> Result<()> {
1426 if matching_row_ids.is_empty() {
1427 return Ok(());
1428 }
1429
1430 if !allow_duplicate_unique_noop || matching_row_ids.len() != 1 {
1431 return Err(Error::UniqueViolation {
1432 table: table.to_string(),
1433 column: column.to_string(),
1434 });
1435 }
1436
1437 let matched_row_id = matching_row_ids[0];
1438
1439 if let Some(existing_row_id) = duplicate_unique_row_id {
1440 if *existing_row_id != matched_row_id {
1441 return Err(Error::UniqueViolation {
1442 table: table.to_string(),
1443 column: column.to_string(),
1444 });
1445 }
1446 } else {
1447 *duplicate_unique_row_id = Some(matched_row_id);
1448 }
1449
1450 Ok(())
1451 }
1452
1453 fn index_covers_column(&self, table: &str, column: &str) -> bool {
1455 let indexes = self.relational_store.indexes.read();
1456 indexes
1457 .iter()
1458 .any(|((t, _), idx)| t == table && idx.columns.len() == 1 && idx.columns[0].0 == column)
1459 }
1460
1461 fn index_covers_composite(&self, table: &str, cols: &[String]) -> bool {
1464 let indexes = self.relational_store.indexes.read();
1465 indexes.iter().any(|((t, _), idx)| {
1466 t == table
1467 && idx.columns.len() >= cols.len()
1468 && idx
1469 .columns
1470 .iter()
1471 .zip(cols.iter())
1472 .all(|((c, _), want)| c == want)
1473 })
1474 }
1475
1476 fn probe_column_for_constraint(
1479 &self,
1480 tx: TxId,
1481 table: &str,
1482 column: &str,
1483 value: &Value,
1484 snapshot: SnapshotId,
1485 skip_row_id: Option<RowId>,
1486 ) -> Result<ConstraintProbe> {
1487 use contextdb_core::{DirectedValue, TotalOrdAsc};
1488 let (tx_staged_deletes, staged_overlap) = self.tx_mgr.with_write_set(tx, |ws| {
1489 let deletes = if ws.relational_deletes.is_empty() {
1493 std::collections::HashSet::new()
1494 } else {
1495 ws.relational_deletes
1496 .iter()
1497 .filter(|(t, _, _)| t == table)
1498 .map(|(_, row_id, _)| *row_id)
1499 .collect()
1500 };
1501 let overlap = ws.relational_inserts.iter().find_map(|(t, row)| {
1502 if t != table {
1503 return None;
1504 }
1505 if let Some(sid) = skip_row_id
1506 && row.row_id == sid
1507 {
1508 return None;
1509 }
1510 if row.values.get(column) == Some(value) {
1511 Some(row.row_id)
1512 } else {
1513 None
1514 }
1515 });
1516 (deletes, overlap)
1517 })?;
1518 let indexes = self.relational_store.indexes.read();
1519 let table_key = table.to_string();
1522 let pk_key = (table_key.clone(), format!("__pk_{column}"));
1523 let unique_key = (table_key, format!("__unique_{column}"));
1524 let storage = indexes
1525 .get(&pk_key)
1526 .or_else(|| indexes.get(&unique_key))
1527 .or_else(|| {
1528 indexes.iter().find_map(|((t, _), idx)| {
1529 if t == table && idx.columns.len() == 1 && idx.columns[0].0 == column {
1530 Some(idx)
1531 } else {
1532 None
1533 }
1534 })
1535 });
1536 let Some(storage) = storage else {
1537 return Ok(ConstraintProbe::NoIndex);
1538 };
1539 let key = vec![DirectedValue::Asc(TotalOrdAsc(value.clone()))];
1540 if let Some(entries) = storage.tree.get(&key) {
1541 for entry in entries {
1542 if let Some(sid) = skip_row_id
1543 && entry.row_id == sid
1544 {
1545 continue;
1546 }
1547 if tx_staged_deletes.contains(&entry.row_id) {
1548 continue;
1549 }
1550 if entry.visible_at(snapshot) {
1551 return Ok(ConstraintProbe::Match(entry.row_id));
1552 }
1553 }
1554 }
1555 drop(indexes);
1556 Ok(match staged_overlap {
1557 Some(row_id) => ConstraintProbe::Match(row_id),
1558 None => ConstraintProbe::NoMatch,
1559 })
1560 }
1561
1562 fn probe_composite_unique(
1566 &self,
1567 tx: TxId,
1568 table: &str,
1569 cols: &[String],
1570 values: &[Value],
1571 snapshot: SnapshotId,
1572 skip_row_id: Option<RowId>,
1573 ) -> Result<Option<RowId>> {
1574 use contextdb_core::{DirectedValue, TotalOrdAsc};
1575 if cols.is_empty() || values.is_empty() || cols.len() != values.len() {
1576 return Ok(None);
1577 }
1578 let tx_staged_deletes: std::collections::HashSet<RowId> =
1580 self.tx_mgr.with_write_set(tx, |ws| {
1581 ws.relational_deletes
1582 .iter()
1583 .filter(|(t, _, _)| t == table)
1584 .map(|(_, row_id, _)| *row_id)
1585 .collect()
1586 })?;
1587 let indexes = self.relational_store.indexes.read();
1588 let storage_entry = indexes.iter().find(|((t, _), idx)| {
1589 t == table
1590 && idx.columns.len() >= cols.len()
1591 && idx
1592 .columns
1593 .iter()
1594 .zip(cols.iter())
1595 .all(|((c, _), w)| c == w)
1596 });
1597 let (_, storage) = match storage_entry {
1598 Some(e) => e,
1599 None => return Ok(None),
1600 };
1601 for (key, entries) in storage.tree.iter() {
1604 if key.len() < cols.len() {
1605 continue;
1606 }
1607 let prefix_match = values.iter().enumerate().all(|(i, v)| match &key[i] {
1608 DirectedValue::Asc(TotalOrdAsc(k)) => k == v,
1609 DirectedValue::Desc(contextdb_core::TotalOrdDesc(k)) => k == v,
1610 });
1611 if !prefix_match {
1612 continue;
1613 }
1614 for entry in entries {
1615 if let Some(sid) = skip_row_id
1616 && entry.row_id == sid
1617 {
1618 continue;
1619 }
1620 if tx_staged_deletes.contains(&entry.row_id) {
1621 continue;
1622 }
1623 if entry.visible_at(snapshot) {
1624 return Ok(Some(entry.row_id));
1625 }
1626 }
1627 }
1628 drop(indexes);
1629 let overlap = self.tx_mgr.with_write_set(tx, |ws| {
1631 for (t, row) in &ws.relational_inserts {
1632 if t != table {
1633 continue;
1634 }
1635 if let Some(sid) = skip_row_id
1636 && row.row_id == sid
1637 {
1638 continue;
1639 }
1640 let matches = cols
1641 .iter()
1642 .zip(values.iter())
1643 .all(|(c, v)| row.values.get(c) == Some(v));
1644 if matches {
1645 return Some(row.row_id);
1646 }
1647 }
1648 None
1649 })?;
1650 Ok(overlap)
1651 }
1652
1653 fn validate_foreign_keys_in_tx(&self, tx: TxId) -> Result<()> {
1654 let snapshot = self.snapshot();
1657 let relational_inserts = self
1658 .tx_mgr
1659 .with_write_set(tx, |ws| ws.relational_inserts.clone())?;
1660
1661 for (table, row) in relational_inserts {
1662 let meta = self
1663 .table_meta(&table)
1664 .ok_or_else(|| Error::TableNotFound(table.clone()))?;
1665 for column in &meta.columns {
1666 let Some(reference) = &column.references else {
1667 continue;
1668 };
1669 let Some(value) = row.values.get(&column.name) else {
1670 continue;
1671 };
1672 if *value == Value::Null {
1673 continue;
1674 }
1675 if self
1676 .point_lookup_in_tx(tx, &reference.table, &reference.column, value, snapshot)?
1677 .is_none()
1678 {
1679 return Err(Error::ForeignKeyViolation {
1680 table: table.clone(),
1681 column: column.name.clone(),
1682 ref_table: reference.table.clone(),
1683 });
1684 }
1685 }
1686 }
1687
1688 Ok(())
1689 }
1690
1691 pub(crate) fn propagate_state_change_if_needed(
1692 &self,
1693 tx: TxId,
1694 table: &str,
1695 row_uuid: Option<uuid::Uuid>,
1696 new_state: Option<&str>,
1697 ) -> Result<()> {
1698 if let (Some(uuid), Some(state)) = (row_uuid, new_state) {
1699 let already_propagating = self
1700 .tx_mgr
1701 .with_write_set(tx, |ws| ws.propagation_in_progress)?;
1702 if !already_propagating {
1703 self.tx_mgr
1704 .with_write_set(tx, |ws| ws.propagation_in_progress = true)?;
1705 let propagate_result = self.propagate(tx, table, uuid, state);
1706 self.tx_mgr
1707 .with_write_set(tx, |ws| ws.propagation_in_progress = false)?;
1708 propagate_result?;
1709 }
1710 }
1711
1712 Ok(())
1713 }
1714
1715 fn propagate(
1716 &self,
1717 tx: TxId,
1718 table: &str,
1719 row_uuid: uuid::Uuid,
1720 new_state: &str,
1721 ) -> Result<()> {
1722 let snapshot = self.snapshot_for_read();
1723 let metas = self.relational_store().table_meta.read().clone();
1724 let mut queue: VecDeque<PropagationQueueEntry> = VecDeque::new();
1725 let mut visited: HashSet<(String, uuid::Uuid)> = HashSet::new();
1726 let mut abort_violation: Option<Error> = None;
1727 let ctx = PropagationContext {
1728 tx,
1729 snapshot,
1730 metas: &metas,
1731 };
1732 let root = PropagationSource {
1733 table,
1734 uuid: row_uuid,
1735 state: new_state,
1736 depth: 0,
1737 };
1738
1739 self.enqueue_fk_children(&ctx, &mut queue, root);
1740 self.enqueue_edge_children(&ctx, &mut queue, root)?;
1741 self.apply_vector_exclusions(&ctx, root)?;
1742
1743 while let Some(entry) = queue.pop_front() {
1744 if !visited.insert((entry.table.clone(), entry.uuid)) {
1745 continue;
1746 }
1747
1748 let Some(meta) = metas.get(&entry.table) else {
1749 continue;
1750 };
1751
1752 let Some(state_machine) = &meta.state_machine else {
1753 let msg = format!(
1754 "warning: propagation target table {} has no state machine",
1755 entry.table
1756 );
1757 eprintln!("{msg}");
1758 if entry.abort_on_failure && abort_violation.is_none() {
1759 abort_violation = Some(Error::PropagationAborted {
1760 table: entry.table.clone(),
1761 column: String::new(),
1762 from: String::new(),
1763 to: entry.target_state.clone(),
1764 });
1765 }
1766 continue;
1767 };
1768
1769 let state_column = state_machine.column.clone();
1770 let Some(existing) = self.relational.point_lookup_with_tx(
1771 Some(tx),
1772 &entry.table,
1773 "id",
1774 &Value::Uuid(entry.uuid),
1775 snapshot,
1776 )?
1777 else {
1778 continue;
1779 };
1780
1781 let from_state = existing
1782 .values
1783 .get(&state_column)
1784 .and_then(Value::as_text)
1785 .unwrap_or("")
1786 .to_string();
1787
1788 let mut next_values = existing.values.clone();
1789 next_values.insert(
1790 state_column.clone(),
1791 Value::Text(entry.target_state.clone()),
1792 );
1793
1794 let upsert_outcome =
1795 self.relational
1796 .upsert(tx, &entry.table, "id", next_values, snapshot);
1797
1798 let reached_state = match upsert_outcome {
1799 Ok(UpsertResult::Updated) => entry.target_state.as_str(),
1800 Ok(UpsertResult::NoOp) | Ok(UpsertResult::Inserted) => continue,
1801 Err(Error::InvalidStateTransition(_)) => {
1802 eprintln!(
1803 "warning: skipped invalid propagated transition {}.{} {} -> {}",
1804 entry.table, state_column, from_state, entry.target_state
1805 );
1806 if entry.abort_on_failure && abort_violation.is_none() {
1807 abort_violation = Some(Error::PropagationAborted {
1808 table: entry.table.clone(),
1809 column: state_column.clone(),
1810 from: from_state,
1811 to: entry.target_state.clone(),
1812 });
1813 }
1814 continue;
1815 }
1816 Err(err) => return Err(err),
1817 };
1818
1819 self.enqueue_edge_children(
1820 &ctx,
1821 &mut queue,
1822 PropagationSource {
1823 table: &entry.table,
1824 uuid: entry.uuid,
1825 state: reached_state,
1826 depth: entry.depth,
1827 },
1828 )?;
1829 self.apply_vector_exclusions(
1830 &ctx,
1831 PropagationSource {
1832 table: &entry.table,
1833 uuid: entry.uuid,
1834 state: reached_state,
1835 depth: entry.depth,
1836 },
1837 )?;
1838
1839 self.enqueue_fk_children(
1840 &ctx,
1841 &mut queue,
1842 PropagationSource {
1843 table: &entry.table,
1844 uuid: entry.uuid,
1845 state: reached_state,
1846 depth: entry.depth,
1847 },
1848 );
1849 }
1850
1851 if let Some(err) = abort_violation {
1852 return Err(err);
1853 }
1854
1855 Ok(())
1856 }
1857
1858 fn enqueue_fk_children(
1859 &self,
1860 ctx: &PropagationContext<'_>,
1861 queue: &mut VecDeque<PropagationQueueEntry>,
1862 source: PropagationSource<'_>,
1863 ) {
1864 for (owner_table, owner_meta) in ctx.metas {
1865 for rule in &owner_meta.propagation_rules {
1866 let PropagationRule::ForeignKey {
1867 fk_column,
1868 referenced_table,
1869 trigger_state,
1870 target_state,
1871 max_depth,
1872 abort_on_failure,
1873 ..
1874 } = rule
1875 else {
1876 continue;
1877 };
1878
1879 if referenced_table != source.table || trigger_state != source.state {
1880 continue;
1881 }
1882
1883 if source.depth >= *max_depth {
1884 continue;
1885 }
1886
1887 let rows = match self.relational.scan_filter_with_tx(
1888 Some(ctx.tx),
1889 owner_table,
1890 ctx.snapshot,
1891 &|row| row.values.get(fk_column) == Some(&Value::Uuid(source.uuid)),
1892 ) {
1893 Ok(rows) => rows,
1894 Err(err) => {
1895 eprintln!(
1896 "warning: propagation scan failed for {owner_table}.{fk_column}: {err}"
1897 );
1898 continue;
1899 }
1900 };
1901
1902 for row in rows {
1903 if let Some(id) = row.values.get("id").and_then(Value::as_uuid).copied() {
1904 queue.push_back(PropagationQueueEntry {
1905 table: owner_table.clone(),
1906 uuid: id,
1907 target_state: target_state.clone(),
1908 depth: source.depth + 1,
1909 abort_on_failure: *abort_on_failure,
1910 });
1911 }
1912 }
1913 }
1914 }
1915 }
1916
1917 fn enqueue_edge_children(
1918 &self,
1919 ctx: &PropagationContext<'_>,
1920 queue: &mut VecDeque<PropagationQueueEntry>,
1921 source: PropagationSource<'_>,
1922 ) -> Result<()> {
1923 let Some(meta) = ctx.metas.get(source.table) else {
1924 return Ok(());
1925 };
1926
1927 for rule in &meta.propagation_rules {
1928 let PropagationRule::Edge {
1929 edge_type,
1930 direction,
1931 trigger_state,
1932 target_state,
1933 max_depth,
1934 abort_on_failure,
1935 } = rule
1936 else {
1937 continue;
1938 };
1939
1940 if trigger_state != source.state || source.depth >= *max_depth {
1941 continue;
1942 }
1943
1944 let bfs = self.query_bfs(
1945 source.uuid,
1946 Some(std::slice::from_ref(edge_type)),
1947 *direction,
1948 1,
1949 ctx.snapshot,
1950 )?;
1951
1952 for node in bfs.nodes {
1953 if self
1954 .relational
1955 .point_lookup_with_tx(
1956 Some(ctx.tx),
1957 source.table,
1958 "id",
1959 &Value::Uuid(node.id),
1960 ctx.snapshot,
1961 )?
1962 .is_some()
1963 {
1964 queue.push_back(PropagationQueueEntry {
1965 table: source.table.to_string(),
1966 uuid: node.id,
1967 target_state: target_state.clone(),
1968 depth: source.depth + 1,
1969 abort_on_failure: *abort_on_failure,
1970 });
1971 }
1972 }
1973 }
1974
1975 Ok(())
1976 }
1977
1978 fn apply_vector_exclusions(
1979 &self,
1980 ctx: &PropagationContext<'_>,
1981 source: PropagationSource<'_>,
1982 ) -> Result<()> {
1983 let Some(meta) = ctx.metas.get(source.table) else {
1984 return Ok(());
1985 };
1986
1987 for rule in &meta.propagation_rules {
1988 let PropagationRule::VectorExclusion { trigger_state } = rule else {
1989 continue;
1990 };
1991 if trigger_state != source.state {
1992 continue;
1993 }
1994 for row_id in self.logical_row_ids_for_uuid(ctx.tx, source.table, source.uuid) {
1995 let index = self
1996 .table_meta(source.table)
1997 .and_then(|meta| {
1998 meta.columns
1999 .iter()
2000 .find(|column| {
2001 matches!(column.column_type, contextdb_core::ColumnType::Vector(_))
2002 })
2003 .map(|column| VectorIndexRef::new(source.table, column.name.clone()))
2004 })
2005 .unwrap_or_default();
2006 self.delete_vector(ctx.tx, index, row_id)?;
2007 }
2008 }
2009
2010 Ok(())
2011 }
2012
2013 pub fn delete_row(&self, tx: TxId, table: &str, row_id: RowId) -> Result<()> {
2014 self.relational.delete(tx, table, row_id)
2015 }
2016
2017 pub fn scan(&self, table: &str, snapshot: SnapshotId) -> Result<Vec<VersionedRow>> {
2018 self.relational.scan(table, snapshot)
2019 }
2020
2021 pub(crate) fn scan_in_tx(
2024 &self,
2025 tx: TxId,
2026 table: &str,
2027 snapshot: SnapshotId,
2028 ) -> Result<Vec<VersionedRow>> {
2029 self.relational.scan_with_tx(Some(tx), table, snapshot)
2030 }
2031
2032 pub(crate) fn index_scan_tx_overlay(
2036 &self,
2037 tx: TxId,
2038 table: &str,
2039 column: &str,
2040 shape: &crate::executor::IndexPredicateShape,
2041 ) -> Result<IndexScanTxOverlay> {
2042 use crate::executor::{IndexPredicateShape, range_includes};
2043 let mut overlay = IndexScanTxOverlay::default();
2044 self.tx_mgr.with_write_set(tx, |ws| {
2045 for (t, _row_id, _) in &ws.relational_deletes {
2046 if t == table {
2047 overlay.deleted_row_ids.insert(*_row_id);
2048 }
2049 }
2050 for (t, row) in &ws.relational_inserts {
2051 if t != table {
2052 continue;
2053 }
2054 let v = row.values.get(column).cloned().unwrap_or(Value::Null);
2055 let include = match shape {
2056 IndexPredicateShape::Equality(target) => v == *target,
2057 IndexPredicateShape::NotEqual(target) => v != *target,
2058 IndexPredicateShape::InList(list) => list.contains(&v),
2059 IndexPredicateShape::Range { lower, upper } => range_includes(&v, lower, upper),
2060 IndexPredicateShape::IsNull => v == Value::Null,
2061 IndexPredicateShape::IsNotNull => v != Value::Null,
2062 };
2063 if include {
2064 overlay.matching_inserts.push(row.clone());
2065 }
2066 }
2067 })?;
2068 Ok(overlay)
2069 }
2070
2071 pub fn scan_filter(
2072 &self,
2073 table: &str,
2074 snapshot: SnapshotId,
2075 predicate: &dyn Fn(&VersionedRow) -> bool,
2076 ) -> Result<Vec<VersionedRow>> {
2077 self.relational.scan_filter(table, snapshot, predicate)
2078 }
2079
2080 pub fn point_lookup(
2081 &self,
2082 table: &str,
2083 col: &str,
2084 value: &Value,
2085 snapshot: SnapshotId,
2086 ) -> Result<Option<VersionedRow>> {
2087 self.relational.point_lookup(table, col, value, snapshot)
2088 }
2089
2090 pub(crate) fn point_lookup_in_tx(
2091 &self,
2092 tx: TxId,
2093 table: &str,
2094 col: &str,
2095 value: &Value,
2096 snapshot: SnapshotId,
2097 ) -> Result<Option<VersionedRow>> {
2098 self.relational
2099 .point_lookup_with_tx(Some(tx), table, col, value, snapshot)
2100 }
2101
2102 pub(crate) fn logical_row_ids_for_uuid(
2103 &self,
2104 tx: TxId,
2105 table: &str,
2106 uuid: uuid::Uuid,
2107 ) -> Vec<RowId> {
2108 let mut row_ids = HashSet::new();
2109
2110 if let Some(rows) = self.relational_store.tables.read().get(table) {
2111 for row in rows {
2112 if row.values.get("id") == Some(&Value::Uuid(uuid)) {
2113 row_ids.insert(row.row_id);
2114 }
2115 }
2116 }
2117
2118 let _ = self.tx_mgr.with_write_set(tx, |ws| {
2119 for (insert_table, row) in &ws.relational_inserts {
2120 if insert_table == table && row.values.get("id") == Some(&Value::Uuid(uuid)) {
2121 row_ids.insert(row.row_id);
2122 }
2123 }
2124 });
2125
2126 row_ids.into_iter().collect()
2127 }
2128
2129 pub fn insert_edge(
2130 &self,
2131 tx: TxId,
2132 source: NodeId,
2133 target: NodeId,
2134 edge_type: EdgeType,
2135 properties: HashMap<String, Value>,
2136 ) -> Result<bool> {
2137 let bytes = estimate_edge_bytes(source, target, &edge_type, &properties);
2138 self.accountant.try_allocate_for(
2139 bytes,
2140 "graph_insert",
2141 "insert_edge",
2142 "Reduce edge fan-out or raise MEMORY_LIMIT before inserting more graph edges.",
2143 )?;
2144
2145 match self
2146 .graph
2147 .insert_edge(tx, source, target, edge_type, properties)
2148 {
2149 Ok(inserted) => {
2150 if !inserted {
2151 self.accountant.release(bytes);
2152 }
2153 Ok(inserted)
2154 }
2155 Err(err) => {
2156 self.accountant.release(bytes);
2157 Err(err)
2158 }
2159 }
2160 }
2161
2162 pub fn delete_edge(
2163 &self,
2164 tx: TxId,
2165 source: NodeId,
2166 target: NodeId,
2167 edge_type: &str,
2168 ) -> Result<()> {
2169 self.graph.delete_edge(tx, source, target, edge_type)
2170 }
2171
2172 pub fn query_bfs(
2173 &self,
2174 start: NodeId,
2175 edge_types: Option<&[EdgeType]>,
2176 direction: Direction,
2177 max_depth: u32,
2178 snapshot: SnapshotId,
2179 ) -> Result<TraversalResult> {
2180 self.graph
2181 .bfs(start, edge_types, direction, 1, max_depth, snapshot)
2182 }
2183
2184 pub fn edge_count(
2185 &self,
2186 source: NodeId,
2187 edge_type: &str,
2188 snapshot: SnapshotId,
2189 ) -> Result<usize> {
2190 Ok(self.graph.edge_count(source, edge_type, snapshot))
2191 }
2192
2193 pub fn get_edge_properties(
2194 &self,
2195 source: NodeId,
2196 target: NodeId,
2197 edge_type: &str,
2198 snapshot: SnapshotId,
2199 ) -> Result<Option<HashMap<String, Value>>> {
2200 let props = self
2201 .graph_store
2202 .forward_adj
2203 .read()
2204 .get(&source)
2205 .and_then(|entries| {
2206 entries
2207 .iter()
2208 .rev()
2209 .find(|entry| {
2210 entry.target == target
2211 && entry.edge_type == edge_type
2212 && entry.visible_at(snapshot)
2213 })
2214 .map(|entry| entry.properties.clone())
2215 });
2216 Ok(props)
2217 }
2218
2219 pub fn insert_vector(
2220 &self,
2221 tx: TxId,
2222 index: VectorIndexRef,
2223 row_id: RowId,
2224 vector: Vec<f32>,
2225 ) -> Result<()> {
2226 self.vector_store.state(&index)?;
2227 if let Some(expected) = self.pending_vector_dimension(tx, &index)?
2228 && expected != vector.len()
2229 {
2230 return Err(self.direct_vector_dimension_error(&index, expected, vector.len()));
2231 }
2232 self.insert_vector_strict(tx, index.clone(), row_id, vector)
2233 .map_err(|err| match err {
2234 Error::VectorIndexDimensionMismatch {
2235 expected, actual, ..
2236 } => self.direct_vector_dimension_error(&index, expected, actual),
2237 other => other,
2238 })
2239 }
2240
2241 pub(crate) fn insert_vector_strict(
2242 &self,
2243 tx: TxId,
2244 index: VectorIndexRef,
2245 row_id: RowId,
2246 vector: Vec<f32>,
2247 ) -> Result<()> {
2248 self.vector_store.validate_vector(&index, vector.len())?;
2249 let bytes = self.vector_insert_accounted_bytes(&index, vector.len());
2250 self.accountant.try_allocate_for(
2251 bytes,
2252 "insert",
2253 &format!("vector_insert@{}.{}", index.table, index.column),
2254 "Reduce vector dimensionality, insert fewer rows, or raise MEMORY_LIMIT.",
2255 )?;
2256 self.vector
2257 .insert_vector(tx, index, row_id, vector)
2258 .inspect_err(|_| self.accountant.release(bytes))
2259 }
2260
2261 pub fn delete_vector(&self, tx: TxId, index: VectorIndexRef, row_id: RowId) -> Result<()> {
2262 self.vector.delete_vector(tx, index, row_id)
2263 }
2264
2265 pub(crate) fn move_vector(
2266 &self,
2267 tx: TxId,
2268 index: VectorIndexRef,
2269 old_row_id: RowId,
2270 new_row_id: RowId,
2271 ) -> Result<()> {
2272 self.vector_store.state(&index)?;
2273 self.tx_mgr.with_write_set(tx, |ws| {
2274 ws.vector_moves.push((index, old_row_id, new_row_id, tx));
2275 })?;
2276 Ok(())
2277 }
2278
2279 pub fn query_vector(
2280 &self,
2281 index: VectorIndexRef,
2282 query: &[f32],
2283 k: usize,
2284 candidates: Option<&RoaringTreemap>,
2285 snapshot: SnapshotId,
2286 ) -> Result<Vec<(RowId, f32)>> {
2287 if self.vector_store.try_state(&index).is_none() {
2288 return Err(Error::UnknownVectorIndex { index });
2289 }
2290 self.vector.search(index, query, k, candidates, snapshot)
2291 }
2292
2293 pub fn semantic_search(&self, query: SemanticQuery) -> Result<Vec<SearchResult>> {
2294 self.semantic_search_with_candidates(query, None)
2295 }
2296
2297 pub(crate) fn semantic_search_with_candidates(
2298 &self,
2299 query: SemanticQuery,
2300 candidates: Option<RoaringTreemap>,
2301 ) -> Result<Vec<SearchResult>> {
2302 let index = VectorIndexRef::new(query.table.clone(), query.vector_column.clone());
2303 let snapshot = self.snapshot_for_read();
2304 let meta = self
2305 .table_meta(&query.table)
2306 .ok_or_else(|| Error::TableNotFound(query.table.clone()))?;
2307 let vector_column = meta
2308 .columns
2309 .iter()
2310 .find(|column| column.name == query.vector_column)
2311 .ok_or_else(|| Error::UnknownVectorIndex {
2312 index: index.clone(),
2313 })?;
2314
2315 let mut candidate_bitmap = candidates;
2316 if let Some(where_clause) = &query.where_clause {
2317 let where_bitmap =
2318 self.semantic_where_candidate_bitmap(&query.table, where_clause, snapshot)?;
2319 candidate_bitmap = Some(match candidate_bitmap {
2320 Some(mut existing) => {
2321 existing &= where_bitmap;
2322 existing
2323 }
2324 None => where_bitmap,
2325 });
2326 }
2327
2328 let Some(sort_key) = query.sort_key.as_deref() else {
2329 let raw_k = if query.min_similarity.is_some() || candidate_bitmap.is_some() {
2330 self.vector_entry_count(&index).max(query.limit)
2331 } else {
2332 query.limit
2333 };
2334 let mut rows = self.query_vector_strict(
2335 index.clone(),
2336 &query.query,
2337 raw_k,
2338 candidate_bitmap.as_ref(),
2339 snapshot,
2340 )?;
2341 if let Some(min_similarity) = query.min_similarity {
2342 rows.retain(|(_, score)| *score >= min_similarity);
2343 rows.truncate(query.limit);
2344 }
2345 return rows
2346 .into_iter()
2347 .map(|(row_id, vector_score)| {
2348 let anchor = self.find_row_by_id_at(&query.table, row_id, snapshot)?;
2349 let values = self.search_result_values(&index, row_id, snapshot, anchor.values);
2350 Ok(SearchResult {
2351 row_id,
2352 values,
2353 vector_score,
2354 rank: vector_score,
2355 })
2356 })
2357 .collect();
2358 };
2359
2360 let Some(policy) = vector_column.rank_policy.as_ref() else {
2361 return Err(Error::RankPolicyNotFound {
2362 index: rank_index_name(&query.table, &query.vector_column),
2363 sort_key: sort_key.to_string(),
2364 });
2365 };
2366 if policy.sort_key != sort_key {
2367 return Err(Error::RankPolicyNotFound {
2368 index: rank_index_name(&query.table, &query.vector_column),
2369 sort_key: sort_key.to_string(),
2370 });
2371 }
2372 let formula = self.rank_formula(&query.table, &query.vector_column)?;
2373 let entry_count = self.vector_entry_count(&index);
2374 let internal_k = self.rank_policy_candidate_k(entry_count, query.limit);
2375 let mut raw = self.query_vector_strict(
2376 index.clone(),
2377 &query.query,
2378 internal_k,
2379 candidate_bitmap.as_ref(),
2380 snapshot,
2381 )?;
2382 if let Some(min_similarity) = query.min_similarity {
2383 raw.retain(|(_, score)| *score >= min_similarity);
2384 }
2385
2386 let mut ranked = Vec::with_capacity(raw.len());
2387 for (row_id, vector_score) in raw {
2388 let anchor = self.find_row_by_id_at(&query.table, row_id, snapshot)?;
2389 let joined = self.joined_row_for_rank_policy(policy, &anchor, snapshot)?;
2390 self.rank_policy_eval_count.fetch_add(1, Ordering::SeqCst);
2391 let eval = formula.eval_with_resolver(vector_score, |column| {
2392 self.resolve_rank_formula_column(policy, &anchor, joined.as_ref(), column)
2393 });
2394 let rank = match eval {
2395 Ok(Some(rank)) => rank,
2396 Ok(None) => f32::NAN,
2397 Err(err) => {
2398 let error_row_id =
2399 if matches!(err, FormulaEvalError::CorruptJoinedColumn { .. }) {
2400 joined.as_ref().map(|row| row.row_id).unwrap_or(row_id)
2401 } else {
2402 row_id
2403 };
2404 self.warn_rank_eval_error(
2405 &query.table,
2406 &query.vector_column,
2407 error_row_id,
2408 &err,
2409 );
2410 continue;
2411 }
2412 };
2413 let values = self.search_result_values(
2414 &index,
2415 row_id,
2416 snapshot,
2417 merged_rank_values(&anchor, joined.as_ref()),
2418 );
2419 ranked.push(SearchResult {
2420 row_id,
2421 values,
2422 vector_score,
2423 rank,
2424 });
2425 }
2426 ranked.sort_by(compare_ranked_results);
2427 ranked.truncate(query.limit);
2428 Ok(ranked)
2429 }
2430
2431 #[doc(hidden)]
2432 pub fn __rank_policy_eval_count(&self) -> u64 {
2433 self.rank_policy_eval_count.load(Ordering::SeqCst)
2434 }
2435
2436 #[doc(hidden)]
2437 pub fn __reset_rank_policy_eval_count(&self) {
2438 self.rank_policy_eval_count.store(0, Ordering::SeqCst);
2439 }
2440
2441 #[doc(hidden)]
2442 pub fn __rank_policy_formula_parse_count(&self) -> u64 {
2443 self.rank_policy_formula_parse_count.load(Ordering::SeqCst)
2444 }
2445
2446 #[doc(hidden)]
2447 pub fn __inject_raw_joined_row_value_for_test(
2448 &self,
2449 table: &str,
2450 row_id: RowId,
2451 column: &str,
2452 _raw_bytes: Vec<u8>,
2453 ) -> Result<()> {
2454 self.corrupt_joined_values
2455 .write()
2456 .insert((table.to_string(), row_id, column.to_string()));
2457 Ok(())
2458 }
2459
2460 pub(crate) fn query_vector_strict(
2461 &self,
2462 index: VectorIndexRef,
2463 query: &[f32],
2464 k: usize,
2465 candidates: Option<&RoaringTreemap>,
2466 snapshot: SnapshotId,
2467 ) -> Result<Vec<(RowId, f32)>> {
2468 self.vector_store.validate_vector(&index, query.len())?;
2469 self.vector.search(index, query, k, candidates, snapshot)
2470 }
2471
2472 pub(crate) fn register_rank_formula(
2473 &self,
2474 table: &str,
2475 column: &str,
2476 formula: Arc<RankFormula>,
2477 ) {
2478 let mut cache = self.rank_formula_cache.write();
2479 cache.insert((table.to_string(), column.to_string()), formula);
2480 self.rank_policy_formula_parse_count
2481 .store(cache.len() as u64, Ordering::SeqCst);
2482 }
2483
2484 pub(crate) fn remove_rank_formula(&self, table: &str, column: &str) {
2485 let mut cache = self.rank_formula_cache.write();
2486 cache.remove(&(table.to_string(), column.to_string()));
2487 self.rank_policy_formula_parse_count
2488 .store(cache.len() as u64, Ordering::SeqCst);
2489 }
2490
2491 pub(crate) fn remove_rank_formulas_for_table(&self, table: &str) {
2492 let mut cache = self.rank_formula_cache.write();
2493 cache.retain(|(policy_table, _), _| policy_table != table);
2494 self.rank_policy_formula_parse_count
2495 .store(cache.len() as u64, Ordering::SeqCst);
2496 }
2497
2498 fn rebuild_rank_formula_cache_from_meta(
2499 &self,
2500 metas: &HashMap<String, TableMeta>,
2501 ) -> Result<()> {
2502 let mut cache = self.rank_formula_cache.write();
2503 cache.clear();
2504 for (table, meta) in metas {
2505 for column in &meta.columns {
2506 if let Some(policy) = &column.rank_policy {
2507 let formula = RankFormula::compile_for_index(
2508 &rank_index_name(table, &column.name),
2509 &policy.formula,
2510 )?;
2511 cache.insert((table.clone(), column.name.clone()), Arc::new(formula));
2512 }
2513 }
2514 }
2515 self.rank_policy_formula_parse_count
2516 .store(cache.len() as u64, Ordering::SeqCst);
2517 Ok(())
2518 }
2519
2520 fn rank_formula(&self, table: &str, column: &str) -> Result<Arc<RankFormula>> {
2521 self.rank_formula_cache
2522 .read()
2523 .get(&(table.to_string(), column.to_string()))
2524 .cloned()
2525 .ok_or_else(|| {
2526 Error::Other(format!(
2527 "rank policy formula cache missing for {}",
2528 rank_index_name(table, column)
2529 ))
2530 })
2531 }
2532
2533 fn vector_entry_count(&self, index: &VectorIndexRef) -> usize {
2534 self.vector_store
2535 .try_state(index)
2536 .map(|state| state.entry_count())
2537 .unwrap_or(0)
2538 }
2539
2540 fn rank_policy_candidate_k(&self, entry_count: usize, limit: usize) -> usize {
2541 if entry_count == 0 || limit == 0 {
2542 return limit;
2543 }
2544 if entry_count < 1000 {
2545 return entry_count;
2546 }
2547 entry_count
2548 .saturating_sub(1)
2549 .min(limit.saturating_mul(30).max(1500))
2550 .max(limit)
2551 }
2552
2553 fn semantic_where_candidate_bitmap(
2554 &self,
2555 table: &str,
2556 where_clause: &str,
2557 snapshot: SnapshotId,
2558 ) -> Result<RoaringTreemap> {
2559 let sql = format!("SELECT * FROM {table} WHERE {where_clause}");
2560 let stmt = contextdb_parser::parse(&sql)?;
2561 let expr = match stmt {
2562 Statement::Select(select) => select
2563 .body
2564 .where_clause
2565 .ok_or_else(|| Error::ParseError("semantic WHERE missing expression".into()))?,
2566 _ => return Err(Error::ParseError("semantic WHERE parse failed".into())),
2567 };
2568 let mut bitmap = RoaringTreemap::new();
2569 for row in self.scan(table, snapshot)? {
2570 if crate::executor::row_matches(&row, &expr, &HashMap::new())? {
2571 bitmap.insert(row.row_id.0);
2572 }
2573 }
2574 Ok(bitmap)
2575 }
2576
2577 fn find_row_by_id_at(
2578 &self,
2579 table: &str,
2580 row_id: RowId,
2581 snapshot: SnapshotId,
2582 ) -> Result<VersionedRow> {
2583 self.relational_store
2584 .row_by_id(table, row_id, snapshot)
2585 .ok_or_else(|| Error::NotFound(format!("row {row_id} in table {table}")))
2586 }
2587
2588 fn joined_row_for_rank_policy(
2589 &self,
2590 policy: &RankPolicy,
2591 anchor: &VersionedRow,
2592 snapshot: SnapshotId,
2593 ) -> Result<Option<VersionedRow>> {
2594 if policy.anchor_column.is_empty() {
2595 return Err(Error::Other(format!(
2596 "rank policy on index {}.{} has no resolved anchor join column",
2597 policy.joined_table, policy.joined_column
2598 )));
2599 }
2600 let join_value = anchor
2601 .values
2602 .get(&policy.anchor_column)
2603 .cloned()
2604 .unwrap_or(Value::Null);
2605 if join_value == Value::Null {
2606 return Ok(None);
2607 }
2608
2609 let indexes = self.relational_store.indexes.read();
2610 let storage = indexes
2611 .get(&(policy.joined_table.clone(), policy.protected_index.clone()))
2612 .ok_or_else(|| {
2613 Error::Other(format!(
2614 "rank policy protected index `{}` missing on table `{}`",
2615 policy.protected_index, policy.joined_table
2616 ))
2617 })?;
2618 let Some((first_column, direction)) = storage.columns.first() else {
2619 return Err(Error::Other(format!(
2620 "rank policy protected index `{}` on `{}` has no columns",
2621 policy.protected_index, policy.joined_table
2622 )));
2623 };
2624 if first_column != &policy.joined_column {
2625 return Err(Error::Other(format!(
2626 "rank policy protected index `{}` on `{}` no longer leads with `{}`",
2627 policy.protected_index, policy.joined_table, policy.joined_column
2628 )));
2629 }
2630
2631 let key_component = match direction {
2632 SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(join_value.clone())),
2633 SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(join_value.clone())),
2634 };
2635 let mut best_row_id: Option<RowId> = None;
2636 let mut consider = |entries: &[contextdb_relational::IndexEntry]| {
2637 for entry in entries {
2638 if entry.visible_at(snapshot)
2639 && best_row_id.is_none_or(|current| current < entry.row_id)
2640 {
2641 best_row_id = Some(entry.row_id);
2642 }
2643 }
2644 };
2645
2646 if storage.columns.len() == 1 {
2647 if let Some(entries) = storage.tree.get(&vec![key_component.clone()]) {
2648 consider(entries);
2649 }
2650 } else {
2651 for (key, entries) in storage.tree.range(vec![key_component.clone()]..) {
2652 if key.first() != Some(&key_component) {
2653 break;
2654 }
2655 consider(entries);
2656 }
2657 }
2658 drop(indexes);
2659
2660 let Some(row_id) = best_row_id else {
2661 return Ok(None);
2662 };
2663 let Some(row) = self
2664 .relational_store
2665 .row_by_id(&policy.joined_table, row_id, snapshot)
2666 else {
2667 return Ok(None);
2668 };
2669 let Some(value) = row.values.get(&policy.joined_column) else {
2670 return Ok(None);
2671 };
2672 if !values_equal_for_rank_join(value, &join_value) {
2673 return Ok(None);
2674 }
2675 Ok(Some(row))
2676 }
2677
2678 fn resolve_rank_formula_column(
2679 &self,
2680 policy: &RankPolicy,
2681 anchor: &VersionedRow,
2682 joined: Option<&VersionedRow>,
2683 column: &str,
2684 ) -> std::result::Result<Option<f32>, FormulaEvalError> {
2685 if let Some(value) = anchor.values.get(column) {
2686 return rank_value_to_number(value, column);
2687 }
2688 let Some(joined) = joined else {
2689 return Ok(None);
2690 };
2691 if self.corrupt_joined_values.read().contains(&(
2692 policy.joined_table.clone(),
2693 joined.row_id,
2694 column.to_string(),
2695 )) {
2696 return Err(FormulaEvalError::CorruptJoinedColumn {
2697 column: column.to_string(),
2698 });
2699 }
2700 let value = joined.values.get(column).unwrap_or(&Value::Null);
2701 rank_value_to_number(value, column)
2702 }
2703
2704 fn warn_rank_eval_error(
2705 &self,
2706 table: &str,
2707 column: &str,
2708 row_id: RowId,
2709 err: &FormulaEvalError,
2710 ) {
2711 let mut reason = err.reason();
2712 if reason.len() > 256 {
2713 reason.truncate(253);
2714 reason.push_str("...");
2715 }
2716 tracing::warn!(
2717 name: "rank_policy_eval_error",
2718 target: "rank_policy_eval_error",
2719 index = %rank_index_name(table, column),
2720 row_id = row_id.0,
2721 reason = %reason,
2722 "rank_policy_eval_error"
2723 );
2724 }
2725
2726 fn search_result_values(
2727 &self,
2728 index: &VectorIndexRef,
2729 row_id: RowId,
2730 snapshot: SnapshotId,
2731 mut values: HashMap<String, Value>,
2732 ) -> HashMap<String, Value> {
2733 if let Some(entry) = self.vector_store_live_entry_for_row(index, row_id, snapshot) {
2734 values.insert(index.column.clone(), Value::Vector(entry.vector));
2735 }
2736 values
2737 }
2738
2739 fn pending_vector_dimension(&self, tx: TxId, index: &VectorIndexRef) -> Result<Option<usize>> {
2740 Ok(self
2741 .tx_mgr
2742 .cloned_write_set(tx)?
2743 .vector_inserts
2744 .iter()
2745 .rev()
2746 .find(|entry| entry.index == *index && entry.deleted_tx.is_none())
2747 .map(|entry| entry.vector.len()))
2748 }
2749
2750 fn direct_vector_dimension_error(
2751 &self,
2752 index: &VectorIndexRef,
2753 expected: usize,
2754 actual: usize,
2755 ) -> Error {
2756 Error::VectorIndexDimensionMismatch {
2757 index: index.clone(),
2758 expected,
2759 actual,
2760 }
2761 }
2762
2763 pub(crate) fn vector_insert_accounted_bytes(
2764 &self,
2765 index: &VectorIndexRef,
2766 dimension: usize,
2767 ) -> usize {
2768 self.vector_store
2769 .try_state(index)
2770 .map(|state| state.quantization().storage_bytes(dimension))
2771 .unwrap_or_else(|| 24 + dimension.saturating_mul(std::mem::size_of::<f32>()))
2772 }
2773
2774 #[doc(hidden)]
2775 pub fn __debug_vector_hnsw_len(&self, index: VectorIndexRef) -> Option<usize> {
2776 self.vector_store
2777 .try_state(&index)
2778 .and_then(|state| state.hnsw_len())
2779 }
2780
2781 #[doc(hidden)]
2782 pub fn __debug_vector_hnsw_stats(&self, index: VectorIndexRef) -> Option<HnswGraphStats> {
2783 self.vector_store
2784 .try_state(&index)
2785 .and_then(|state| state.hnsw_stats())
2786 }
2787
2788 #[doc(hidden)]
2789 pub fn __debug_vector_storage_bytes_per_entry(
2790 &self,
2791 index: VectorIndexRef,
2792 ) -> Result<Vec<usize>> {
2793 self.vector_store.storage_bytes_per_entry(&index)
2794 }
2795
2796 pub fn has_live_vector(&self, row_id: RowId, snapshot: SnapshotId) -> bool {
2797 !self
2798 .vector_store
2799 .live_entries_for_row(row_id, snapshot)
2800 .is_empty()
2801 }
2802
2803 pub fn live_vector_entry(&self, row_id: RowId, snapshot: SnapshotId) -> Option<VectorEntry> {
2804 self.vector_store
2805 .live_entries_for_row(row_id, snapshot)
2806 .into_iter()
2807 .next()
2808 }
2809
2810 pub(crate) fn vector_store_live_entry_for_row(
2811 &self,
2812 index: &VectorIndexRef,
2813 row_id: RowId,
2814 snapshot: SnapshotId,
2815 ) -> Option<VectorEntry> {
2816 self.vector_store
2817 .live_entry_for_row(index, row_id, snapshot)
2818 }
2819
2820 pub(crate) fn drop_table_aux_state(&self, table: &str) {
2821 let snapshot = self.snapshot_for_read();
2822 let rows = self.scan(table, snapshot).unwrap_or_default();
2823 let edge_keys: HashSet<(NodeId, EdgeType, NodeId)> = rows
2824 .iter()
2825 .filter_map(|row| {
2826 match (
2827 row.values.get("source_id").and_then(Value::as_uuid),
2828 row.values.get("target_id").and_then(Value::as_uuid),
2829 row.values.get("edge_type").and_then(Value::as_text),
2830 ) {
2831 (Some(source), Some(target), Some(edge_type)) => {
2832 Some((*source, edge_type.to_string(), *target))
2833 }
2834 _ => None,
2835 }
2836 })
2837 .collect();
2838
2839 if !edge_keys.is_empty() {
2840 {
2841 let mut forward = self.graph_store.forward_adj.write();
2842 for entries in forward.values_mut() {
2843 entries.retain(|entry| {
2844 !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
2845 });
2846 }
2847 forward.retain(|_, entries| !entries.is_empty());
2848 }
2849 {
2850 let mut reverse = self.graph_store.reverse_adj.write();
2851 for entries in reverse.values_mut() {
2852 entries.retain(|entry| {
2853 !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
2854 });
2855 }
2856 reverse.retain(|_, entries| !entries.is_empty());
2857 }
2858 }
2859 }
2860
2861 pub fn table_names(&self) -> Vec<String> {
2862 self.relational_store.table_names()
2863 }
2864
2865 pub fn table_meta(&self, table: &str) -> Option<TableMeta> {
2866 self.relational_store.table_meta(table)
2867 }
2868
2869 #[doc(hidden)]
2873 pub fn execute_at_snapshot(
2874 &self,
2875 sql: &str,
2876 params: &HashMap<String, Value>,
2877 snapshot: SnapshotId,
2878 ) -> Result<QueryResult> {
2879 SNAPSHOT_OVERRIDE.with(|cell| {
2880 let prior = cell.replace(Some(snapshot));
2881 let r = self.execute(sql, params);
2882 cell.replace(prior);
2883 r
2884 })
2885 }
2886
2887 pub(crate) fn snapshot_for_read(&self) -> SnapshotId {
2888 SNAPSHOT_OVERRIDE.with(|cell| cell.borrow().unwrap_or_else(|| self.snapshot()))
2889 }
2890
2891 #[doc(hidden)]
2896 pub fn change_log_rows_since(&self, since: Lsn) -> Result<Vec<RowChange>> {
2897 let entries = self.change_log_since(since);
2898 let tables = self.relational_store.tables.read();
2899 let mut out = Vec::new();
2900 for e in entries {
2901 match e {
2902 ChangeLogEntry::RowInsert { table, row_id, lsn } => {
2903 let Some(rows) = tables.get(&table) else {
2904 continue;
2905 };
2906 let Some(row) = rows.iter().find(|r| r.row_id == row_id) else {
2907 continue;
2908 };
2909 let natural_key = row
2910 .values
2911 .get("id")
2912 .cloned()
2913 .map(|value| NaturalKey {
2914 column: "id".to_string(),
2915 value,
2916 })
2917 .unwrap_or_else(|| NaturalKey {
2918 column: "id".to_string(),
2919 value: Value::Int64(row_id.0 as i64),
2920 });
2921 out.push(RowChange {
2922 table,
2923 natural_key,
2924 values: row.values.clone(),
2925 deleted: row.deleted_tx.is_some(),
2926 lsn,
2927 });
2928 }
2929 ChangeLogEntry::RowDelete {
2930 table,
2931 row_id: _,
2932 natural_key,
2933 lsn,
2934 } => {
2935 out.push(RowChange {
2936 table,
2937 natural_key,
2938 values: HashMap::new(),
2939 deleted: true,
2940 lsn,
2941 });
2942 }
2943 _ => {}
2944 }
2945 }
2946 Ok(out)
2947 }
2948
2949 #[doc(hidden)]
2951 pub fn __rows_examined(&self) -> u64 {
2952 self.rows_examined.load(Ordering::SeqCst)
2953 }
2954
2955 #[doc(hidden)]
2956 pub fn __reset_rows_examined(&self) {
2957 self.rows_examined.store(0, Ordering::SeqCst);
2958 }
2959
2960 #[doc(hidden)]
2961 pub fn __bump_rows_examined(&self, delta: u64) {
2962 self.rows_examined.fetch_add(delta, Ordering::SeqCst);
2963 }
2964
2965 #[doc(hidden)]
2968 pub fn __index_write_lock_count(&self) -> u64 {
2969 self.relational_store.index_write_lock_count()
2970 }
2971
2972 #[doc(hidden)]
2974 pub fn __introspect_indexes_total_entries(&self) -> u64 {
2975 self.relational_store.introspect_indexes_total_entries()
2976 }
2977
2978 #[doc(hidden)]
2983 pub fn __probe_constraint_check(
2984 &self,
2985 table: &str,
2986 column: &str,
2987 value: Value,
2988 ) -> Result<QueryResult> {
2989 let covered = self.index_covers_column(table, column)
2990 || self
2991 .relational_store
2992 .indexes
2993 .read()
2994 .iter()
2995 .any(|((t, _), idx)| {
2996 t == table && idx.columns.first().is_some_and(|(c, _)| c == column)
2997 });
2998 let trace = if covered {
2999 QueryTrace {
3000 physical_plan: "IndexScan",
3001 index_used: None,
3002 predicates_pushed: Default::default(),
3003 indexes_considered: Default::default(),
3004 sort_elided: false,
3005 }
3006 } else {
3007 QueryTrace::scan()
3008 };
3009 let _ = value;
3010 Ok(QueryResult {
3011 columns: vec![],
3012 rows: vec![],
3013 rows_affected: 0,
3014 trace,
3015 cascade: None,
3016 })
3017 }
3018
3019 pub fn run_pruning_cycle(&self) -> u64 {
3021 let _guard = self.pruning_guard.lock();
3022 prune_expired_rows(
3023 &self.relational_store,
3024 &self.graph_store,
3025 &self.vector_store,
3026 self.accountant(),
3027 self.persistence.as_ref(),
3028 self.sync_watermark(),
3029 )
3030 }
3031
3032 pub fn set_pruning_interval(&self, interval: Duration) {
3034 self.stop_pruning_thread();
3035
3036 let shutdown = Arc::new(AtomicBool::new(false));
3037 let relational = self.relational_store.clone();
3038 let graph = self.graph_store.clone();
3039 let vector = self.vector_store.clone();
3040 let accountant = self.accountant.clone();
3041 let persistence = self.persistence.clone();
3042 let sync_watermark = self.sync_watermark.clone();
3043 let pruning_guard = self.pruning_guard.clone();
3044 let thread_shutdown = shutdown.clone();
3045
3046 let handle = thread::spawn(move || {
3047 while !thread_shutdown.load(Ordering::SeqCst) {
3048 {
3049 let _guard = pruning_guard.lock();
3050 let _ = prune_expired_rows(
3051 &relational,
3052 &graph,
3053 &vector,
3054 accountant.as_ref(),
3055 persistence.as_ref(),
3056 sync_watermark.load(Ordering::SeqCst),
3057 );
3058 }
3059 sleep_with_shutdown(&thread_shutdown, interval);
3060 }
3061 });
3062
3063 let mut runtime = self.pruning_runtime.lock();
3064 runtime.shutdown = shutdown;
3065 runtime.handle = Some(handle);
3066 }
3067
3068 pub fn sync_watermark(&self) -> Lsn {
3069 self.sync_watermark.load(Ordering::SeqCst)
3070 }
3071
3072 pub fn set_sync_watermark(&self, watermark: Lsn) {
3073 self.sync_watermark.store(watermark, Ordering::SeqCst);
3074 }
3075
3076 pub fn instance_id(&self) -> uuid::Uuid {
3077 self.instance_id
3078 }
3079
3080 pub fn open_memory_with_plugin_and_accountant(
3081 plugin: Arc<dyn DatabasePlugin>,
3082 accountant: Arc<MemoryAccountant>,
3083 ) -> Result<Self> {
3084 Self::open_memory_internal(plugin, accountant)
3085 }
3086
3087 pub fn open_memory_with_plugin(plugin: Arc<dyn DatabasePlugin>) -> Result<Self> {
3088 let db = Self::open_memory_with_plugin_and_accountant(
3089 plugin,
3090 Arc::new(MemoryAccountant::no_limit()),
3091 )?;
3092 db.plugin.on_open()?;
3093 Ok(db)
3094 }
3095
3096 pub fn close(&self) -> Result<()> {
3097 if self.closed.swap(true, Ordering::SeqCst) {
3098 return Ok(());
3099 }
3100 let tx = self.session_tx.lock().take();
3101 if let Some(tx) = tx {
3102 self.rollback(tx)?;
3103 }
3104 self.stop_pruning_thread();
3105 self.subscriptions.lock().subscribers.clear();
3106 if let Some(persistence) = &self.persistence {
3107 persistence.close();
3108 }
3109 self.plugin.on_close()
3110 }
3111
3112 pub fn open_with_plugin(
3114 path: impl AsRef<Path>,
3115 plugin: Arc<dyn DatabasePlugin>,
3116 ) -> Result<Self> {
3117 let db = Self::open_loaded(path, plugin, Arc::new(MemoryAccountant::no_limit()), None)?;
3118 db.plugin.on_open()?;
3119 Ok(db)
3120 }
3121
3122 pub fn open_with_config(
3124 path: impl AsRef<Path>,
3125 plugin: Arc<dyn DatabasePlugin>,
3126 accountant: Arc<MemoryAccountant>,
3127 ) -> Result<Self> {
3128 Self::open_with_config_and_disk_limit(path, plugin, accountant, None)
3129 }
3130
3131 pub fn open_with_config_and_disk_limit(
3132 path: impl AsRef<Path>,
3133 plugin: Arc<dyn DatabasePlugin>,
3134 accountant: Arc<MemoryAccountant>,
3135 startup_disk_limit: Option<u64>,
3136 ) -> Result<Self> {
3137 let path = path.as_ref();
3138 if path.as_os_str() == ":memory:" {
3139 return Self::open_memory_with_plugin_and_accountant(plugin, accountant);
3140 }
3141 let accountant = if let Some(limit) = persisted_memory_limit(path)? {
3142 let usage = accountant.usage();
3143 if usage.limit.is_none() && usage.startup_ceiling.is_none() {
3144 Arc::new(MemoryAccountant::with_budget(limit))
3145 } else {
3146 accountant
3147 }
3148 } else {
3149 accountant
3150 };
3151 let db = Self::open_loaded(path, plugin, accountant, startup_disk_limit)?;
3152 db.plugin.on_open()?;
3153 Ok(db)
3154 }
3155
3156 pub fn open_memory_with_accountant(accountant: Arc<MemoryAccountant>) -> Self {
3158 Self::open_memory_internal(Arc::new(CorePlugin), accountant)
3159 .expect("failed to open in-memory database with accountant")
3160 }
3161
3162 pub fn accountant(&self) -> &MemoryAccountant {
3164 &self.accountant
3165 }
3166
3167 pub(crate) fn register_vector_index_for_column(&self, table: &str, column: &ColumnDef) {
3168 if let ColumnType::Vector(dimension) = column.column_type {
3169 self.vector_store.register_index(
3170 VectorIndexRef::new(table, column.name.clone()),
3171 dimension,
3172 column.quantization,
3173 );
3174 }
3175 }
3176
3177 pub(crate) fn deregister_vector_index(&self, table: &str, column: &str) {
3178 self.vector_store
3179 .deregister_index(&VectorIndexRef::new(table, column), self.accountant());
3180 }
3181
3182 pub(crate) fn rename_vector_index(&self, table: &str, from: &str, to: &str) -> Result<()> {
3183 self.vector_store.rename_index(
3184 &VectorIndexRef::new(table, from),
3185 VectorIndexRef::new(table, to),
3186 )
3187 }
3188
3189 pub(crate) fn vector_store_deregister_table(&self, table: &str) {
3190 self.vector_store.deregister_table(table, self.accountant());
3191 }
3192
3193 pub(crate) fn vector_index_infos(&self) -> Vec<contextdb_vector::store::VectorIndexInfo> {
3194 self.vector_store.index_infos()
3195 }
3196
3197 fn account_loaded_state(&self) -> Result<()> {
3198 let metadata_bytes = self
3199 .relational_store
3200 .table_meta
3201 .read()
3202 .values()
3203 .fold(0usize, |acc, meta| {
3204 acc.saturating_add(meta.estimated_bytes())
3205 });
3206 self.accountant.try_allocate_for(
3207 metadata_bytes,
3208 "open",
3209 "load_table_metadata",
3210 "Open the database with a larger MEMORY_LIMIT or reduce stored schema metadata.",
3211 )?;
3212
3213 let row_bytes =
3214 self.relational_store
3215 .tables
3216 .read()
3217 .iter()
3218 .fold(0usize, |acc, (table, rows)| {
3219 let meta = self.table_meta(table);
3220 acc.saturating_add(rows.iter().fold(0usize, |inner, row| {
3221 inner.saturating_add(meta.as_ref().map_or_else(
3222 || row.estimated_bytes(),
3223 |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
3224 ))
3225 }))
3226 });
3227 self.accountant.try_allocate_for(
3228 row_bytes,
3229 "open",
3230 "load_rows",
3231 "Open the database with a larger MEMORY_LIMIT or prune retained rows first.",
3232 )?;
3233
3234 let edge_bytes = self
3235 .graph_store
3236 .forward_adj
3237 .read()
3238 .values()
3239 .flatten()
3240 .filter(|edge| edge.deleted_tx.is_none())
3241 .fold(0usize, |acc, edge| {
3242 acc.saturating_add(edge.estimated_bytes())
3243 });
3244 self.accountant.try_allocate_for(
3245 edge_bytes,
3246 "open",
3247 "load_edges",
3248 "Open the database with a larger MEMORY_LIMIT or reduce graph edge volume.",
3249 )?;
3250
3251 let vector_bytes = self
3252 .vector_store
3253 .index_infos()
3254 .into_iter()
3255 .fold(0usize, |acc, info| acc.saturating_add(info.bytes));
3256 self.accountant.try_allocate_for(
3257 vector_bytes,
3258 "open",
3259 "load_vectors",
3260 "Open the database with a larger MEMORY_LIMIT or reduce stored vector data.",
3261 )?;
3262
3263 Ok(())
3264 }
3265
3266 fn release_insert_allocations(&self, ws: &contextdb_tx::WriteSet) {
3267 for (table, row) in &ws.relational_inserts {
3268 let bytes = self
3269 .table_meta(table)
3270 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
3271 .unwrap_or_else(|| row.estimated_bytes());
3272 self.accountant.release(bytes);
3273 }
3274
3275 for edge in &ws.adj_inserts {
3276 self.accountant.release(edge.estimated_bytes());
3277 }
3278
3279 for entry in &ws.vector_inserts {
3280 self.accountant
3281 .release(self.vector_insert_accounted_bytes(&entry.index, entry.vector.len()));
3282 }
3283 }
3284
3285 fn release_delete_allocations(&self, ws: &contextdb_tx::WriteSet) {
3286 for (table, row_id, _) in &ws.relational_deletes {
3287 if let Some(row) = self.find_row_by_id(table, *row_id) {
3288 let bytes = self
3289 .table_meta(table)
3290 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
3291 .unwrap_or_else(|| row.estimated_bytes());
3292 self.accountant.release(bytes);
3293 }
3294 }
3295
3296 for (source, edge_type, target, _) in &ws.adj_deletes {
3297 if let Some(edge) = self.find_edge(source, target, edge_type) {
3298 self.accountant.release(edge.estimated_bytes());
3299 }
3300 }
3301
3302 for (index, row_id, _) in &ws.vector_deletes {
3303 if let Some(vector) = self.find_vector_by_index_and_row(index, *row_id) {
3304 self.accountant
3305 .release(self.vector_insert_accounted_bytes(index, vector.vector.len()));
3306 }
3307 }
3308
3309 if !ws.vector_deletes.is_empty() {
3310 self.vector_store.clear_hnsw(self.accountant());
3311 }
3312 }
3313
3314 fn find_row_by_id(&self, table: &str, row_id: RowId) -> Option<VersionedRow> {
3315 self.relational_store
3316 .tables
3317 .read()
3318 .get(table)
3319 .and_then(|rows| rows.iter().find(|row| row.row_id == row_id))
3320 .cloned()
3321 }
3322
3323 fn find_vector_by_index_and_row(
3324 &self,
3325 index: &VectorIndexRef,
3326 row_id: RowId,
3327 ) -> Option<VectorEntry> {
3328 self.vector_store
3329 .try_state(index)
3330 .and_then(|state| state.find_by_row_id(index, row_id))
3331 }
3332
3333 fn find_edge(&self, source: &NodeId, target: &NodeId, edge_type: &str) -> Option<AdjEntry> {
3334 self.graph_store
3335 .forward_adj
3336 .read()
3337 .get(source)
3338 .and_then(|entries| {
3339 entries
3340 .iter()
3341 .find(|entry| entry.target == *target && entry.edge_type == edge_type)
3342 .cloned()
3343 })
3344 }
3345
3346 pub(crate) fn write_set_checkpoint(
3347 &self,
3348 tx: TxId,
3349 ) -> Result<(usize, usize, usize, usize, usize, usize)> {
3350 self.tx_mgr.with_write_set(tx, |ws| {
3351 (
3352 ws.relational_inserts.len(),
3353 ws.relational_deletes.len(),
3354 ws.adj_inserts.len(),
3355 ws.vector_inserts.len(),
3356 ws.vector_deletes.len(),
3357 ws.vector_moves.len(),
3358 )
3359 })
3360 }
3361
3362 pub(crate) fn restore_write_set_checkpoint(
3363 &self,
3364 tx: TxId,
3365 checkpoint: (usize, usize, usize, usize, usize, usize),
3366 ) -> Result<()> {
3367 self.tx_mgr.with_write_set(tx, |ws| {
3368 ws.relational_inserts.truncate(checkpoint.0);
3369 ws.relational_deletes.truncate(checkpoint.1);
3370 ws.adj_inserts.truncate(checkpoint.2);
3371 ws.vector_inserts.truncate(checkpoint.3);
3372 ws.vector_deletes.truncate(checkpoint.4);
3373 ws.vector_moves.truncate(checkpoint.5);
3374 })
3375 }
3376
3377 pub fn conflict_policies(&self) -> ConflictPolicies {
3379 self.conflict_policies.read().clone()
3380 }
3381
3382 pub fn set_default_conflict_policy(&self, policy: ConflictPolicy) {
3384 self.conflict_policies.write().default = policy;
3385 }
3386
3387 pub fn set_table_conflict_policy(&self, table: &str, policy: ConflictPolicy) {
3389 self.conflict_policies
3390 .write()
3391 .per_table
3392 .insert(table.to_string(), policy);
3393 }
3394
3395 pub fn drop_table_conflict_policy(&self, table: &str) {
3397 self.conflict_policies.write().per_table.remove(table);
3398 }
3399
3400 pub fn plugin(&self) -> &dyn DatabasePlugin {
3401 self.plugin.as_ref()
3402 }
3403
3404 pub fn plugin_health(&self) -> PluginHealth {
3405 self.plugin.health()
3406 }
3407
3408 pub fn plugin_describe(&self) -> serde_json::Value {
3409 self.plugin.describe()
3410 }
3411
3412 pub(crate) fn graph(&self) -> &MemGraphExecutor<DynStore> {
3413 &self.graph
3414 }
3415
3416 pub(crate) fn relational_store(&self) -> &Arc<RelationalStore> {
3417 &self.relational_store
3418 }
3419
3420 pub(crate) fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
3421 where
3422 F: FnOnce(Lsn) -> R,
3423 {
3424 self.tx_mgr.allocate_ddl_lsn(f)
3425 }
3426
3427 pub(crate) fn with_commit_lock<F, R>(&self, f: F) -> R
3428 where
3429 F: FnOnce() -> R,
3430 {
3431 self.tx_mgr.with_commit_lock(f)
3432 }
3433
3434 pub(crate) fn log_create_table_ddl(
3435 &self,
3436 name: &str,
3437 meta: &TableMeta,
3438 lsn: Lsn,
3439 ) -> Result<()> {
3440 let change = ddl_change_from_meta(name, meta);
3441 self.ddl_log.write().push((lsn, change.clone()));
3442 if let Some(persistence) = &self.persistence {
3443 persistence.append_ddl_log(lsn, &change)?;
3444 }
3445 Ok(())
3446 }
3447
3448 pub(crate) fn log_drop_table_ddl(&self, name: &str, lsn: Lsn) -> Result<()> {
3449 let change = DdlChange::DropTable {
3450 name: name.to_string(),
3451 };
3452 self.ddl_log.write().push((lsn, change.clone()));
3453 if let Some(persistence) = &self.persistence {
3454 persistence.append_ddl_log(lsn, &change)?;
3455 }
3456 Ok(())
3457 }
3458
3459 pub(crate) fn log_alter_table_ddl(&self, name: &str, meta: &TableMeta, lsn: Lsn) -> Result<()> {
3460 let change = DdlChange::AlterTable {
3461 name: name.to_string(),
3462 columns: meta
3463 .columns
3464 .iter()
3465 .map(|c| {
3466 (
3467 c.name.clone(),
3468 sql_type_for_meta_column(c, &meta.propagation_rules),
3469 )
3470 })
3471 .collect(),
3472 constraints: create_table_constraints_from_meta(meta),
3473 };
3474 self.ddl_log.write().push((lsn, change.clone()));
3475 if let Some(persistence) = &self.persistence {
3476 persistence.append_ddl_log(lsn, &change)?;
3477 }
3478 Ok(())
3479 }
3480
3481 pub(crate) fn log_create_index_ddl(
3482 &self,
3483 table: &str,
3484 name: &str,
3485 columns: &[(String, contextdb_core::SortDirection)],
3486 lsn: Lsn,
3487 ) -> Result<()> {
3488 let change = DdlChange::CreateIndex {
3489 table: table.to_string(),
3490 name: name.to_string(),
3491 columns: columns.to_vec(),
3492 };
3493 self.ddl_log.write().push((lsn, change.clone()));
3494 if let Some(persistence) = &self.persistence {
3495 persistence.append_ddl_log(lsn, &change)?;
3496 }
3497 Ok(())
3498 }
3499
3500 pub(crate) fn log_drop_index_ddl(&self, table: &str, name: &str, lsn: Lsn) -> Result<()> {
3501 let change = DdlChange::DropIndex {
3502 table: table.to_string(),
3503 name: name.to_string(),
3504 };
3505 self.ddl_log.write().push((lsn, change.clone()));
3506 if let Some(persistence) = &self.persistence {
3507 persistence.append_ddl_log(lsn, &change)?;
3508 }
3509 Ok(())
3510 }
3511
3512 pub(crate) fn persist_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
3513 if let Some(persistence) = &self.persistence {
3514 persistence.flush_table_meta(name, meta)?;
3515 }
3516 Ok(())
3517 }
3518
3519 pub(crate) fn persist_memory_limit(&self, limit: Option<usize>) -> Result<()> {
3520 if let Some(persistence) = &self.persistence {
3521 match limit {
3522 Some(limit) => persistence.flush_config_value("memory_limit", &limit)?,
3523 None => persistence.remove_config_value("memory_limit")?,
3524 }
3525 }
3526 Ok(())
3527 }
3528
3529 pub fn set_disk_limit(&self, limit: Option<u64>) -> Result<()> {
3530 if self.persistence.is_none() {
3531 self.disk_limit.store(0, Ordering::SeqCst);
3532 return Ok(());
3533 }
3534
3535 let ceiling = self.disk_limit_startup_ceiling();
3536 if let Some(ceiling) = ceiling {
3537 match limit {
3538 Some(bytes) if bytes > ceiling => {
3539 return Err(Error::Other(format!(
3540 "disk limit {bytes} exceeds startup ceiling {ceiling}"
3541 )));
3542 }
3543 None => {
3544 return Err(Error::Other(
3545 "cannot remove disk limit when a startup ceiling is set".to_string(),
3546 ));
3547 }
3548 _ => {}
3549 }
3550 }
3551
3552 self.disk_limit.store(limit.unwrap_or(0), Ordering::SeqCst);
3553 Ok(())
3554 }
3555
3556 pub fn disk_limit(&self) -> Option<u64> {
3557 match self.disk_limit.load(Ordering::SeqCst) {
3558 0 => None,
3559 bytes => Some(bytes),
3560 }
3561 }
3562
3563 pub fn disk_limit_startup_ceiling(&self) -> Option<u64> {
3564 match self.disk_limit_startup_ceiling.load(Ordering::SeqCst) {
3565 0 => None,
3566 bytes => Some(bytes),
3567 }
3568 }
3569
3570 pub fn disk_file_size(&self) -> Option<u64> {
3571 self.persistence
3572 .as_ref()
3573 .map(|persistence| std::fs::metadata(persistence.path()).map(|meta| meta.len()))
3574 .transpose()
3575 .ok()
3576 .flatten()
3577 }
3578
3579 pub(crate) fn persist_disk_limit(&self, limit: Option<u64>) -> Result<()> {
3580 if let Some(persistence) = &self.persistence {
3581 match limit {
3582 Some(limit) => persistence.flush_config_value("disk_limit", &limit)?,
3583 None => persistence.remove_config_value("disk_limit")?,
3584 }
3585 }
3586 Ok(())
3587 }
3588
3589 pub fn check_disk_budget(&self, operation: &str) -> Result<()> {
3590 let Some(limit) = self.disk_limit() else {
3591 return Ok(());
3592 };
3593 let Some(current_bytes) = self.disk_file_size() else {
3594 return Ok(());
3595 };
3596 if current_bytes.saturating_add(MIN_DISK_WRITE_HEADROOM_BYTES) <= limit {
3597 return Ok(());
3598 }
3599 Err(Error::DiskBudgetExceeded {
3600 operation: operation.to_string(),
3601 current_bytes,
3602 budget_limit_bytes: limit,
3603 hint: "Reduce retained file-backed data or raise DISK_LIMIT before writing more data."
3604 .to_string(),
3605 })
3606 }
3607
3608 pub fn persisted_sync_watermarks(&self, tenant_id: &str) -> Result<(Lsn, Lsn)> {
3609 let Some(persistence) = &self.persistence else {
3610 return Ok((Lsn(0), Lsn(0)));
3611 };
3612 let push = persistence
3613 .load_config_value::<u64>(&format!("sync_push_watermark:{tenant_id}"))?
3614 .map(Lsn)
3615 .unwrap_or(Lsn(0));
3616 let pull = persistence
3617 .load_config_value::<u64>(&format!("sync_pull_watermark:{tenant_id}"))?
3618 .map(Lsn)
3619 .unwrap_or(Lsn(0));
3620 Ok((push, pull))
3621 }
3622
3623 pub fn persist_sync_push_watermark(&self, tenant_id: &str, watermark: Lsn) -> Result<()> {
3624 if let Some(persistence) = &self.persistence {
3625 persistence
3626 .flush_config_value(&format!("sync_push_watermark:{tenant_id}"), &watermark.0)?;
3627 }
3628 Ok(())
3629 }
3630
3631 pub fn persist_sync_pull_watermark(&self, tenant_id: &str, watermark: Lsn) -> Result<()> {
3632 if let Some(persistence) = &self.persistence {
3633 persistence
3634 .flush_config_value(&format!("sync_pull_watermark:{tenant_id}"), &watermark.0)?;
3635 }
3636 Ok(())
3637 }
3638
3639 pub(crate) fn persist_table_rows(&self, name: &str) -> Result<()> {
3640 if let Some(persistence) = &self.persistence {
3641 let tables = self.relational_store.tables.read();
3642 if let Some(rows) = tables.get(name) {
3643 persistence.rewrite_table_rows(name, rows)?;
3644 }
3645 }
3646 Ok(())
3647 }
3648
3649 pub(crate) fn persist_vectors(&self) -> Result<()> {
3650 if let Some(persistence) = &self.persistence {
3651 let vectors = self.vector_store.all_entries();
3652 persistence.rewrite_vectors(&vectors)?;
3653 }
3654 Ok(())
3655 }
3656
3657 pub(crate) fn remove_persisted_table(&self, name: &str) -> Result<()> {
3658 if let Some(persistence) = &self.persistence {
3659 persistence.remove_table_meta(name)?;
3660 persistence.remove_table_data(name)?;
3661 }
3662 Ok(())
3663 }
3664
3665 pub fn change_log_since(&self, since_lsn: Lsn) -> Vec<ChangeLogEntry> {
3666 let log = self.change_log.read();
3667 let start = log.partition_point(|e| e.lsn() <= since_lsn);
3668 log[start..].to_vec()
3669 }
3670
3671 pub fn ddl_log_since(&self, since_lsn: Lsn) -> Vec<DdlChange> {
3672 let ddl = self.ddl_log.read();
3673 let start = ddl.partition_point(|(lsn, _)| *lsn <= since_lsn);
3674 ddl[start..].iter().map(|(_, c)| c.clone()).collect()
3675 }
3676
3677 #[allow(dead_code)]
3680 fn full_state_snapshot(&self) -> ChangeSet {
3681 let mut rows = Vec::new();
3682 let mut edges = Vec::new();
3683 let mut vectors = Vec::new();
3684 let mut ddl = Vec::new();
3685
3686 let meta_guard = self.relational_store.table_meta.read();
3687 let tables_guard = self.relational_store.tables.read();
3688
3689 ddl.extend(full_snapshot_ddl(&meta_guard));
3693
3694 let mut live_row_ids: HashSet<RowId> = HashSet::new();
3696 for (table_name, table_rows) in tables_guard.iter() {
3697 let meta = match meta_guard.get(table_name) {
3698 Some(m) => m,
3699 None => continue,
3700 };
3701 let key_col = meta
3702 .natural_key_column
3703 .clone()
3704 .or_else(|| {
3705 if meta
3706 .columns
3707 .iter()
3708 .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3709 {
3710 Some("id".to_string())
3711 } else {
3712 None
3713 }
3714 })
3715 .or_else(|| {
3716 meta.columns
3717 .iter()
3718 .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
3719 .map(|c| c.name.clone())
3720 })
3721 .unwrap_or_default();
3722 if key_col.is_empty() {
3723 continue;
3724 }
3725 for row in table_rows.iter().filter(|r| r.deleted_tx.is_none()) {
3726 let key_val = match row.values.get(&key_col) {
3727 Some(v) => v.clone(),
3728 None => continue,
3729 };
3730 live_row_ids.insert(row.row_id);
3731 rows.push(RowChange {
3732 table: table_name.clone(),
3733 natural_key: NaturalKey {
3734 column: key_col.clone(),
3735 value: key_val,
3736 },
3737 values: row.values.clone(),
3738 deleted: false,
3739 lsn: row.lsn,
3740 });
3741 }
3742 }
3743
3744 drop(tables_guard);
3745 drop(meta_guard);
3746
3747 let fwd = self.graph_store.forward_adj.read();
3749 for (_source, entries) in fwd.iter() {
3750 for entry in entries.iter().filter(|e| e.deleted_tx.is_none()) {
3751 edges.push(EdgeChange {
3752 source: entry.source,
3753 target: entry.target,
3754 edge_type: entry.edge_type.clone(),
3755 properties: entry.properties.clone(),
3756 lsn: entry.lsn,
3757 });
3758 }
3759 }
3760 drop(fwd);
3761
3762 for entry in self
3764 .vector_store
3765 .all_entries()
3766 .into_iter()
3767 .filter(|v| v.deleted_tx.is_none())
3768 {
3769 if !live_row_ids.contains(&entry.row_id) {
3770 continue; }
3772 vectors.push(VectorChange {
3773 index: entry.index.clone(),
3774 row_id: entry.row_id,
3775 vector: entry.vector,
3776 lsn: entry.lsn,
3777 });
3778 }
3779
3780 ChangeSet {
3781 rows,
3782 edges,
3783 vectors,
3784 ddl,
3785 }
3786 }
3787
3788 fn persisted_state_since(&self, since_lsn: Lsn) -> ChangeSet {
3789 if since_lsn == Lsn(0) {
3790 return self.full_state_snapshot();
3791 }
3792
3793 let mut rows = Vec::new();
3794 let mut edges = Vec::new();
3795 let mut vectors = Vec::new();
3796 let ddl = Vec::new();
3797
3798 let meta_guard = self.relational_store.table_meta.read();
3799 let tables_guard = self.relational_store.tables.read();
3800
3801 let mut live_row_ids: HashSet<RowId> = HashSet::new();
3802 for (table_name, table_rows) in tables_guard.iter() {
3803 let meta = match meta_guard.get(table_name) {
3804 Some(meta) => meta,
3805 None => continue,
3806 };
3807 let key_col = meta
3808 .natural_key_column
3809 .clone()
3810 .or_else(|| {
3811 if meta
3812 .columns
3813 .iter()
3814 .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3815 {
3816 Some("id".to_string())
3817 } else {
3818 None
3819 }
3820 })
3821 .or_else(|| {
3822 meta.columns
3823 .iter()
3824 .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
3825 .map(|c| c.name.clone())
3826 })
3827 .unwrap_or_default();
3828 if key_col.is_empty() {
3829 continue;
3830 }
3831 for row in table_rows.iter().filter(|row| row.deleted_tx.is_none()) {
3832 live_row_ids.insert(row.row_id);
3833 if row.lsn <= since_lsn {
3834 continue;
3835 }
3836 let key_val = match row.values.get(&key_col) {
3837 Some(value) => value.clone(),
3838 None => continue,
3839 };
3840 rows.push(RowChange {
3841 table: table_name.clone(),
3842 natural_key: NaturalKey {
3843 column: key_col.clone(),
3844 value: key_val,
3845 },
3846 values: row.values.clone(),
3847 deleted: false,
3848 lsn: row.lsn,
3849 });
3850 }
3851 }
3852 drop(tables_guard);
3853 drop(meta_guard);
3854
3855 let fwd = self.graph_store.forward_adj.read();
3856 for entries in fwd.values() {
3857 for entry in entries
3858 .iter()
3859 .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
3860 {
3861 edges.push(EdgeChange {
3862 source: entry.source,
3863 target: entry.target,
3864 edge_type: entry.edge_type.clone(),
3865 properties: entry.properties.clone(),
3866 lsn: entry.lsn,
3867 });
3868 }
3869 }
3870 drop(fwd);
3871
3872 for entry in self
3873 .vector_store
3874 .all_entries()
3875 .into_iter()
3876 .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
3877 {
3878 if !live_row_ids.contains(&entry.row_id) {
3879 continue;
3880 }
3881 vectors.push(VectorChange {
3882 index: entry.index.clone(),
3883 row_id: entry.row_id,
3884 vector: entry.vector,
3885 lsn: entry.lsn,
3886 });
3887 }
3888
3889 ChangeSet {
3890 rows,
3891 edges,
3892 vectors,
3893 ddl,
3894 }
3895 }
3896
3897 fn preflight_sync_apply_memory(
3898 &self,
3899 changes: &ChangeSet,
3900 policies: &ConflictPolicies,
3901 ) -> Result<()> {
3902 let usage = self.accountant.usage();
3903 let Some(limit) = usage.limit else {
3904 return Ok(());
3905 };
3906 let available = usage.available.unwrap_or(limit);
3907 let mut required = 0usize;
3908
3909 for row in &changes.rows {
3910 if row.deleted || row.values.is_empty() {
3911 continue;
3912 }
3913
3914 let policy = policies
3915 .per_table
3916 .get(&row.table)
3917 .copied()
3918 .unwrap_or(policies.default);
3919 let existing = self.point_lookup(
3920 &row.table,
3921 &row.natural_key.column,
3922 &row.natural_key.value,
3923 self.snapshot(),
3924 )?;
3925
3926 if existing.is_some()
3927 && matches!(
3928 policy,
3929 ConflictPolicy::InsertIfNotExists | ConflictPolicy::ServerWins
3930 )
3931 {
3932 continue;
3933 }
3934
3935 required = required.saturating_add(
3936 self.table_meta(&row.table)
3937 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
3938 .unwrap_or_else(|| estimate_row_value_bytes(&row.values)),
3939 );
3940 }
3941
3942 for edge in &changes.edges {
3943 required = required.saturating_add(
3944 96 + edge.edge_type.len().saturating_mul(16)
3945 + estimate_row_value_bytes(&edge.properties),
3946 );
3947 }
3948
3949 for vector in &changes.vectors {
3950 if vector.vector.is_empty() {
3951 continue;
3952 }
3953 required = required.saturating_add(
3954 24 + vector
3955 .vector
3956 .len()
3957 .saturating_mul(std::mem::size_of::<f32>()),
3958 );
3959 }
3960
3961 if required > available {
3962 return Err(Error::MemoryBudgetExceeded {
3963 subsystem: "sync".to_string(),
3964 operation: "apply_changes".to_string(),
3965 requested_bytes: required,
3966 available_bytes: available,
3967 budget_limit_bytes: limit,
3968 hint:
3969 "Reduce sync batch size, split the push, or raise MEMORY_LIMIT on the server."
3970 .to_string(),
3971 });
3972 }
3973
3974 Ok(())
3975 }
3976
3977 pub fn changes_since(&self, since_lsn: Lsn) -> ChangeSet {
3979 if since_lsn > self.current_lsn() {
3981 return ChangeSet::default();
3982 }
3983
3984 let log = self.change_log.read();
3987 let change_first_lsn = log.first().map(|e| e.lsn());
3988 let change_log_empty = log.is_empty();
3989 drop(log);
3990
3991 let ddl = self.ddl_log.read();
3992 let ddl_first_lsn = ddl.first().map(|(lsn, _)| *lsn);
3993 let ddl_log_empty = ddl.is_empty();
3994 drop(ddl);
3995
3996 let has_table_data = !self
3997 .relational_store
3998 .tables
3999 .read()
4000 .values()
4001 .all(|rows| rows.is_empty());
4002 let has_table_meta = !self.relational_store.table_meta.read().is_empty();
4003
4004 if change_log_empty && ddl_log_empty && (has_table_data || has_table_meta) {
4007 return self.persisted_state_since(since_lsn);
4008 }
4009
4010 let min_first_lsn = match (change_first_lsn, ddl_first_lsn) {
4012 (Some(c), Some(d)) => Some(c.min(d)),
4013 (Some(c), None) => Some(c),
4014 (None, Some(d)) => Some(d),
4015 (None, None) => None, };
4017
4018 if min_first_lsn.is_some_and(|min_lsn| min_lsn.0 > since_lsn.0 + 1) {
4019 return self.persisted_state_since(since_lsn);
4021 }
4022
4023 let (ddl, change_entries) = self.with_commit_lock(|| {
4024 let ddl = self.ddl_log_since(since_lsn);
4025 let changes = self.change_log_since(since_lsn);
4026 (ddl, changes)
4027 });
4028
4029 let mut rows = Vec::new();
4030 let mut edges = Vec::new();
4031 let mut vectors = Vec::new();
4032
4033 for entry in change_entries {
4034 match entry {
4035 ChangeLogEntry::RowInsert { table, row_id, lsn } => {
4036 if let Some((natural_key, values)) = self.row_change_values(&table, row_id) {
4037 rows.push(RowChange {
4038 table,
4039 natural_key,
4040 values,
4041 deleted: false,
4042 lsn,
4043 });
4044 }
4045 }
4046 ChangeLogEntry::RowDelete {
4047 table,
4048 natural_key,
4049 lsn,
4050 ..
4051 } => {
4052 let mut values = HashMap::new();
4053 values.insert("__deleted".to_string(), Value::Bool(true));
4054 rows.push(RowChange {
4055 table,
4056 natural_key,
4057 values,
4058 deleted: true,
4059 lsn,
4060 });
4061 }
4062 ChangeLogEntry::EdgeInsert {
4063 source,
4064 target,
4065 edge_type,
4066 lsn,
4067 } => {
4068 let properties = self
4069 .edge_properties(source, target, &edge_type, lsn)
4070 .unwrap_or_default();
4071 edges.push(EdgeChange {
4072 source,
4073 target,
4074 edge_type,
4075 properties,
4076 lsn,
4077 });
4078 }
4079 ChangeLogEntry::EdgeDelete {
4080 source,
4081 target,
4082 edge_type,
4083 lsn,
4084 } => {
4085 let mut properties = HashMap::new();
4086 properties.insert("__deleted".to_string(), Value::Bool(true));
4087 edges.push(EdgeChange {
4088 source,
4089 target,
4090 edge_type,
4091 properties,
4092 lsn,
4093 });
4094 }
4095 ChangeLogEntry::VectorInsert { index, row_id, lsn } => {
4096 if let Some(vector) = self.vector_for_row_lsn(&index, row_id, lsn) {
4097 vectors.push(VectorChange {
4098 index,
4099 row_id,
4100 vector,
4101 lsn,
4102 });
4103 }
4104 }
4105 ChangeLogEntry::VectorDelete { index, row_id, lsn } => vectors.push(VectorChange {
4106 index,
4107 row_id,
4108 vector: Vec::new(),
4109 lsn,
4110 }),
4111 }
4112 }
4113
4114 let insert_max_lsn: HashMap<(String, String, String), Lsn> = {
4120 let mut map: HashMap<(String, String, String), Lsn> = HashMap::new();
4121 for r in rows.iter().filter(|r| !r.deleted) {
4122 let key = (
4123 r.table.clone(),
4124 r.natural_key.column.clone(),
4125 format!("{:?}", r.natural_key.value),
4126 );
4127 let entry = map.entry(key).or_insert(Lsn(0));
4128 if r.lsn > *entry {
4129 *entry = r.lsn;
4130 }
4131 }
4132 map
4133 };
4134 rows.retain(|r| {
4135 if r.deleted {
4136 let key = (
4137 r.table.clone(),
4138 r.natural_key.column.clone(),
4139 format!("{:?}", r.natural_key.value),
4140 );
4141 match insert_max_lsn.get(&key) {
4144 Some(&insert_lsn) => insert_lsn < r.lsn,
4145 None => true,
4146 }
4147 } else {
4148 true
4149 }
4150 });
4151
4152 let vector_reinserts: HashSet<(VectorIndexRef, Lsn)> = vectors
4153 .iter()
4154 .filter(|v| !v.vector.is_empty())
4155 .map(|v| (v.index.clone(), v.lsn))
4156 .collect();
4157 vectors.retain(|v| {
4158 !v.vector.is_empty() || !vector_reinserts.contains(&(v.index.clone(), v.lsn))
4159 });
4160
4161 ChangeSet {
4162 rows,
4163 edges,
4164 vectors,
4165 ddl,
4166 }
4167 }
4168
4169 pub fn current_lsn(&self) -> Lsn {
4171 self.tx_mgr.current_lsn()
4172 }
4173
4174 pub fn committed_watermark(&self) -> TxId {
4176 self.tx_mgr.current_tx_max()
4177 }
4178
4179 pub fn next_tx(&self) -> TxId {
4181 self.tx_mgr.peek_next_tx()
4182 }
4183
4184 pub fn subscribe(&self) -> Receiver<CommitEvent> {
4187 self.subscribe_with_capacity(DEFAULT_SUBSCRIPTION_CAPACITY)
4188 }
4189
4190 pub fn subscribe_with_capacity(&self, capacity: usize) -> Receiver<CommitEvent> {
4192 let (tx, rx) = mpsc::sync_channel(capacity.max(1));
4193 self.subscriptions.lock().subscribers.push(tx);
4194 rx
4195 }
4196
4197 pub fn subscription_health(&self) -> SubscriptionMetrics {
4199 let subscriptions = self.subscriptions.lock();
4200 SubscriptionMetrics {
4201 active_channels: subscriptions.subscribers.len(),
4202 events_sent: subscriptions.events_sent,
4203 events_dropped: subscriptions.events_dropped,
4204 }
4205 }
4206
4207 pub fn apply_changes(
4209 &self,
4210 mut changes: ChangeSet,
4211 policies: &ConflictPolicies,
4212 ) -> Result<ApplyResult> {
4213 self.relational_store.bump_index_write_lock_count();
4218 self.plugin.on_sync_pull(&mut changes)?;
4219 self.check_disk_budget("sync_pull")?;
4220 self.preflight_sync_apply_memory(&changes, policies)?;
4221
4222 for row in &changes.rows {
4224 for v in row.values.values() {
4225 if let Value::TxId(incoming) = v
4226 && incoming.0 == u64::MAX
4227 {
4228 return Err(Error::TxIdOverflow {
4229 table: row.table.clone(),
4230 incoming: u64::MAX,
4231 });
4232 }
4233 }
4234 }
4235
4236 let mut tx = self.begin();
4237 let batch_row_commits = changes.rows.len() > 128;
4238 let mut result = ApplyResult {
4239 applied_rows: 0,
4240 skipped_rows: 0,
4241 conflicts: Vec::new(),
4242 new_lsn: self.current_lsn(),
4243 };
4244 let vector_row_ids = changes.vectors.iter().map(|v| v.row_id).collect::<Vec<_>>();
4245 let mut vector_row_map: HashMap<RowId, RowId> = HashMap::new();
4246 let mut vector_row_idx = 0usize;
4247 let mut failed_row_ids: HashSet<RowId> = HashSet::new();
4248 let mut table_meta_cache: HashMap<String, Option<TableMeta>> = HashMap::new();
4249 let mut visible_rows_cache: HashMap<String, Vec<VersionedRow>> = HashMap::new();
4250
4251 for ddl in changes.ddl.clone() {
4252 match ddl {
4253 DdlChange::CreateTable {
4254 name,
4255 columns,
4256 constraints,
4257 } => {
4258 if self.table_meta(&name).is_some() {
4259 if let Some(local_meta) = self.table_meta(&name) {
4260 let local_cols: Vec<(String, String)> = local_meta
4261 .columns
4262 .iter()
4263 .map(|c| {
4264 (
4265 c.name.clone(),
4266 normalize_schema_type(&sql_type_for_meta_column(
4267 c,
4268 &local_meta.propagation_rules,
4269 )),
4270 )
4271 })
4272 .collect();
4273 let remote_cols: Vec<(String, String)> = columns
4274 .iter()
4275 .map(|(col_name, col_type)| {
4276 (col_name.clone(), normalize_schema_type(col_type))
4277 })
4278 .collect();
4279 let mut local_sorted = local_cols.clone();
4280 local_sorted.sort();
4281 let mut remote_sorted = remote_cols.clone();
4282 remote_sorted.sort();
4283 if local_sorted != remote_sorted {
4284 result.conflicts.push(Conflict {
4285 natural_key: NaturalKey {
4286 column: "table".to_string(),
4287 value: Value::Text(name.clone()),
4288 },
4289 resolution: ConflictPolicy::ServerWins,
4290 reason: Some(format!(
4291 "schema mismatch: local columns {:?} differ from remote {:?}",
4292 local_cols, remote_cols
4293 )),
4294 });
4295 }
4296 }
4297 continue;
4298 }
4299 let mut sql = format!(
4300 "CREATE TABLE {} ({})",
4301 name,
4302 columns
4303 .iter()
4304 .map(|(col, ty)| format!("{col} {ty}"))
4305 .collect::<Vec<_>>()
4306 .join(", ")
4307 );
4308 if !constraints.is_empty() {
4309 sql.push(' ');
4310 sql.push_str(&constraints.join(" "));
4311 }
4312 self.execute_in_tx(tx, &sql, &HashMap::new())?;
4313 self.clear_statement_cache();
4314 table_meta_cache.remove(&name);
4315 visible_rows_cache.remove(&name);
4316 }
4317 DdlChange::DropTable { name } => {
4318 if self.table_meta(&name).is_some() {
4319 if let Some(block) =
4320 crate::executor::rank_policy_drop_table_blocker(self, &name)
4321 {
4322 return Err(block);
4323 }
4324 self.drop_table_aux_state(&name);
4325 self.remove_rank_formulas_for_table(&name);
4326 self.relational_store().drop_table(&name);
4327 self.remove_persisted_table(&name)?;
4328 self.clear_statement_cache();
4329 }
4330 table_meta_cache.remove(&name);
4331 visible_rows_cache.remove(&name);
4332 }
4333 DdlChange::AlterTable {
4334 name,
4335 columns,
4336 constraints,
4337 } => {
4338 if self.table_meta(&name).is_none() {
4339 continue;
4340 }
4341 let existing = self.table_meta(&name).unwrap_or_default();
4342 let existing_cols: HashSet<String> =
4343 existing.columns.iter().map(|c| c.name.clone()).collect();
4344 for (col, ty) in columns {
4345 if existing_cols.contains(&col) {
4346 continue;
4347 }
4348 let sql = format!("ALTER TABLE {} ADD COLUMN {} {}", name, col, ty);
4349 self.execute_in_tx(tx, &sql, &HashMap::new())?;
4350 }
4351 let _ = constraints;
4352 self.clear_statement_cache();
4353 table_meta_cache.remove(&name);
4354 visible_rows_cache.remove(&name);
4355 }
4356 DdlChange::CreateIndex {
4357 table,
4358 name,
4359 columns,
4360 } => {
4361 if self.table_meta(&table).is_none() {
4368 return Err(Error::TableNotFound(table.clone()));
4369 }
4370 let already = self
4371 .table_meta(&table)
4372 .map(|m| m.indexes.iter().any(|i| i.name == name))
4373 .unwrap_or(false);
4374 if !already {
4375 {
4376 let store = self.relational_store();
4377 let mut metas = store.table_meta.write();
4378 if let Some(m) = metas.get_mut(&table) {
4379 m.indexes.push(contextdb_core::IndexDecl {
4380 name: name.clone(),
4381 columns: columns.clone(),
4382 kind: contextdb_core::IndexKind::UserDeclared,
4383 });
4384 }
4385 }
4386 self.relational_store().create_index_storage(
4387 &table,
4388 &name,
4389 columns.clone(),
4390 );
4391 self.relational_store().rebuild_index(&table, &name);
4392 if let Some(table_meta) = self.table_meta(&table) {
4393 self.persist_table_meta(&table, &table_meta)?;
4394 }
4395 self.allocate_ddl_lsn(|lsn| {
4396 self.log_create_index_ddl(&table, &name, &columns, lsn)
4397 })?;
4398 self.clear_statement_cache();
4399 }
4400 table_meta_cache.remove(&table);
4401 }
4402 DdlChange::DropIndex { table, name } => {
4403 if self.table_meta(&table).is_some() {
4404 let exists = self
4405 .table_meta(&table)
4406 .map(|m| m.indexes.iter().any(|i| i.name == name))
4407 .unwrap_or(false);
4408 if exists {
4409 if let Some(block) =
4410 crate::executor::rank_policy_drop_index_blocker(self, &table, &name)
4411 {
4412 return Err(block);
4413 }
4414 {
4415 let store = self.relational_store();
4416 let mut metas = store.table_meta.write();
4417 if let Some(m) = metas.get_mut(&table) {
4418 m.indexes.retain(|i| i.name != name);
4419 }
4420 }
4421 self.relational_store().drop_index_storage(&table, &name);
4422 if let Some(table_meta) = self.table_meta(&table) {
4423 self.persist_table_meta(&table, &table_meta)?;
4424 }
4425 self.allocate_ddl_lsn(|lsn| {
4426 self.log_drop_index_ddl(&table, &name, lsn)
4427 })?;
4428 self.clear_statement_cache();
4429 }
4430 }
4431 table_meta_cache.remove(&table);
4432 }
4433 }
4434 }
4435
4436 self.preflight_sync_apply_memory(&changes, policies)?;
4437
4438 for row in changes.rows {
4439 if row.values.is_empty() {
4440 result.skipped_rows += 1;
4441 if !batch_row_commits {
4442 self.commit_with_source(tx, CommitSource::SyncPull)?;
4443 tx = self.begin();
4444 }
4445 continue;
4446 }
4447
4448 let policy = policies
4449 .per_table
4450 .get(&row.table)
4451 .copied()
4452 .unwrap_or(policies.default);
4453
4454 let existing = cached_point_lookup(
4455 self,
4456 &mut visible_rows_cache,
4457 &row.table,
4458 &row.natural_key.column,
4459 &row.natural_key.value,
4460 )?;
4461 let is_delete = row.deleted;
4462 let row_has_vector = cached_table_meta(self, &mut table_meta_cache, &row.table)
4463 .is_some_and(|meta| {
4464 meta.columns
4465 .iter()
4466 .any(|col| matches!(col.column_type, ColumnType::Vector(_)))
4467 });
4468
4469 if is_delete {
4470 if let Some(local) = existing {
4471 if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4472 consume_vector_row_group(
4473 &vector_row_ids,
4474 &mut vector_row_idx,
4475 local.row_id,
4476 &mut vector_row_map,
4477 );
4478 }
4479 if let Err(err) = self.delete_row(tx, &row.table, local.row_id) {
4480 result.conflicts.push(Conflict {
4481 natural_key: row.natural_key.clone(),
4482 resolution: policy,
4483 reason: Some(format!("delete failed: {err}")),
4484 });
4485 result.skipped_rows += 1;
4486 } else {
4487 remove_cached_row(&mut visible_rows_cache, &row.table, local.row_id);
4488 result.applied_rows += 1;
4489 }
4490 } else {
4491 result.skipped_rows += 1;
4492 }
4493 if !batch_row_commits {
4494 self.commit_with_source(tx, CommitSource::SyncPull)?;
4495 tx = self.begin();
4496 }
4497 continue;
4498 }
4499
4500 let mut values = row.values.clone();
4501 values.remove("__deleted");
4502
4503 match (existing, policy) {
4504 (None, _) => {
4505 if let Some(meta) = cached_table_meta(self, &mut table_meta_cache, &row.table) {
4506 let mut constraint_error: Option<String> = None;
4507
4508 for col_def in &meta.columns {
4509 if !col_def.nullable
4510 && !col_def.primary_key
4511 && col_def.default.is_none()
4512 {
4513 match values.get(&col_def.name) {
4514 None | Some(Value::Null) => {
4515 constraint_error = Some(format!(
4516 "NOT NULL constraint violated: {}.{}",
4517 row.table, col_def.name
4518 ));
4519 break;
4520 }
4521 _ => {}
4522 }
4523 }
4524 }
4525
4526 let has_unique = meta.columns.iter().any(|c| c.unique && !c.primary_key);
4527 if constraint_error.is_none() && has_unique {
4528 for col_def in &meta.columns {
4529 if col_def.unique
4530 && !col_def.primary_key
4531 && let Some(new_val) = values.get(&col_def.name)
4532 && *new_val != Value::Null
4533 && cached_visible_rows(
4534 self,
4535 &mut visible_rows_cache,
4536 &row.table,
4537 )?
4538 .iter()
4539 .any(|r| r.values.get(&col_def.name) == Some(new_val))
4540 {
4541 constraint_error = Some(format!(
4542 "UNIQUE constraint violated: {}.{}",
4543 row.table, col_def.name
4544 ));
4545 break;
4546 }
4547 }
4548 }
4549
4550 if let Some(err_msg) = constraint_error {
4551 result.skipped_rows += 1;
4552 if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4553 consume_failed_vector_row_group(
4554 &vector_row_ids,
4555 &mut vector_row_idx,
4556 &mut failed_row_ids,
4557 );
4558 }
4559 result.conflicts.push(Conflict {
4560 natural_key: row.natural_key.clone(),
4561 resolution: policy,
4562 reason: Some(err_msg),
4563 });
4564 if !batch_row_commits {
4565 self.commit_with_source(tx, CommitSource::SyncPull)?;
4566 tx = self.begin();
4567 }
4568 continue;
4569 }
4570 }
4571
4572 let mut overflow: Option<Error> = None;
4574 for v in values.values() {
4575 if let Value::TxId(incoming) = v
4576 && let Err(err) = self.tx_mgr.advance_for_sync(&row.table, *incoming)
4577 {
4578 overflow = Some(err);
4579 break;
4580 }
4581 }
4582 if let Some(err) = overflow {
4583 return Err(err);
4584 }
4585
4586 match self.insert_row_for_sync(tx, &row.table, values.clone()) {
4587 Ok(new_row_id) => {
4588 record_cached_insert(
4589 &mut visible_rows_cache,
4590 &row.table,
4591 VersionedRow {
4592 row_id: new_row_id,
4593 values: values.clone(),
4594 created_tx: tx,
4595 deleted_tx: None,
4596 lsn: row.lsn,
4597 created_at: None,
4598 },
4599 );
4600 result.applied_rows += 1;
4601 if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4602 consume_vector_row_group(
4603 &vector_row_ids,
4604 &mut vector_row_idx,
4605 new_row_id,
4606 &mut vector_row_map,
4607 );
4608 }
4609 }
4610 Err(err) => {
4611 if is_fatal_sync_apply_error(&err) {
4612 return Err(err);
4613 }
4614 result.skipped_rows += 1;
4615 if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4616 consume_failed_vector_row_group(
4617 &vector_row_ids,
4618 &mut vector_row_idx,
4619 &mut failed_row_ids,
4620 );
4621 }
4622 result.conflicts.push(Conflict {
4623 natural_key: row.natural_key.clone(),
4624 resolution: policy,
4625 reason: Some(format!("{err}")),
4626 });
4627 }
4628 }
4629 }
4630 (Some(local), ConflictPolicy::InsertIfNotExists) => {
4631 if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4632 consume_vector_row_group(
4633 &vector_row_ids,
4634 &mut vector_row_idx,
4635 local.row_id,
4636 &mut vector_row_map,
4637 );
4638 }
4639 result.skipped_rows += 1;
4640 }
4641 (Some(_), ConflictPolicy::ServerWins) => {
4642 result.skipped_rows += 1;
4643 if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4644 consume_failed_vector_row_group(
4645 &vector_row_ids,
4646 &mut vector_row_idx,
4647 &mut failed_row_ids,
4648 );
4649 }
4650 result.conflicts.push(Conflict {
4651 natural_key: row.natural_key.clone(),
4652 resolution: ConflictPolicy::ServerWins,
4653 reason: Some("server_wins".to_string()),
4654 });
4655 }
4656 (Some(local), ConflictPolicy::LatestWins) => {
4657 let incoming_wins = if row.lsn == local.lsn {
4662 let mut winner = false;
4663 for (col, incoming_val) in values.iter() {
4664 if let (Value::TxId(incoming_tx), Some(Value::TxId(local_tx))) =
4665 (incoming_val, local.values.get(col))
4666 {
4667 if incoming_tx.0 > local_tx.0 {
4668 winner = true;
4669 break;
4670 } else if incoming_tx.0 < local_tx.0 {
4671 winner = false;
4672 break;
4673 }
4674 }
4675 }
4676 winner
4677 } else {
4678 row.lsn > local.lsn
4679 };
4680
4681 if !incoming_wins {
4682 result.skipped_rows += 1;
4683 if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4684 consume_failed_vector_row_group(
4685 &vector_row_ids,
4686 &mut vector_row_idx,
4687 &mut failed_row_ids,
4688 );
4689 }
4690 result.conflicts.push(Conflict {
4691 natural_key: row.natural_key.clone(),
4692 resolution: ConflictPolicy::LatestWins,
4693 reason: Some("local_lsn_newer_or_equal".to_string()),
4694 });
4695 } else {
4696 if let Some(meta) = self.table_meta(&row.table)
4698 && let Some(sm) = &meta.state_machine
4699 {
4700 let sm_col = sm.column.clone();
4701 let transitions = sm.transitions.clone();
4702 let incoming_state = values.get(&sm_col).and_then(|v| match v {
4703 Value::Text(s) => Some(s.clone()),
4704 _ => None,
4705 });
4706 let local_state = local.values.get(&sm_col).and_then(|v| match v {
4707 Value::Text(s) => Some(s.clone()),
4708 _ => None,
4709 });
4710
4711 if let (Some(incoming), Some(current)) = (incoming_state, local_state) {
4712 let valid = transitions
4714 .get(¤t)
4715 .is_some_and(|targets| targets.contains(&incoming));
4716 if !valid && incoming != current {
4717 result.skipped_rows += 1;
4718 if row_has_vector
4719 && vector_row_ids.get(vector_row_idx).is_some()
4720 {
4721 consume_failed_vector_row_group(
4722 &vector_row_ids,
4723 &mut vector_row_idx,
4724 &mut failed_row_ids,
4725 );
4726 }
4727 result.conflicts.push(Conflict {
4728 natural_key: row.natural_key.clone(),
4729 resolution: ConflictPolicy::LatestWins,
4730 reason: Some(format!(
4731 "state_machine: invalid transition {} -> {} (current: {})",
4732 current, incoming, current
4733 )),
4734 });
4735 if !batch_row_commits {
4736 self.commit_with_source(tx, CommitSource::SyncPull)?;
4737 tx = self.begin();
4738 }
4739 continue;
4740 }
4741 }
4742 }
4743
4744 let mut overflow: Option<Error> = None;
4746 for v in values.values() {
4747 if let Value::TxId(incoming) = v
4748 && let Err(err) =
4749 self.tx_mgr.advance_for_sync(&row.table, *incoming)
4750 {
4751 overflow = Some(err);
4752 break;
4753 }
4754 }
4755 if let Some(err) = overflow {
4756 return Err(err);
4757 }
4758
4759 match self.upsert_row_for_sync(
4760 tx,
4761 &row.table,
4762 &row.natural_key.column,
4763 values.clone(),
4764 ) {
4765 Ok(_) => {
4766 visible_rows_cache.remove(&row.table);
4767 result.applied_rows += 1;
4768 if row_has_vector
4769 && vector_row_ids.get(vector_row_idx).is_some()
4770 && let Ok(Some(found)) = self.point_lookup_in_tx(
4771 tx,
4772 &row.table,
4773 &row.natural_key.column,
4774 &row.natural_key.value,
4775 self.snapshot(),
4776 )
4777 {
4778 consume_vector_row_group(
4779 &vector_row_ids,
4780 &mut vector_row_idx,
4781 found.row_id,
4782 &mut vector_row_map,
4783 );
4784 }
4785 }
4786 Err(err) => {
4787 if is_fatal_sync_apply_error(&err) {
4788 return Err(err);
4789 }
4790 result.skipped_rows += 1;
4791 if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4792 consume_failed_vector_row_group(
4793 &vector_row_ids,
4794 &mut vector_row_idx,
4795 &mut failed_row_ids,
4796 );
4797 }
4798 result.conflicts.push(Conflict {
4799 natural_key: row.natural_key.clone(),
4800 resolution: ConflictPolicy::LatestWins,
4801 reason: Some(format!("state_machine_or_constraint: {err}")),
4802 });
4803 }
4804 }
4805 }
4806 }
4807 (Some(_), ConflictPolicy::EdgeWins) => {
4808 result.conflicts.push(Conflict {
4809 natural_key: row.natural_key.clone(),
4810 resolution: ConflictPolicy::EdgeWins,
4811 reason: Some("edge_wins".to_string()),
4812 });
4813 let mut overflow: Option<Error> = None;
4814 for v in values.values() {
4815 if let Value::TxId(incoming) = v
4816 && let Err(err) = self.tx_mgr.advance_for_sync(&row.table, *incoming)
4817 {
4818 overflow = Some(err);
4819 break;
4820 }
4821 }
4822 if let Some(err) = overflow {
4823 return Err(err);
4824 }
4825
4826 match self.upsert_row_for_sync(
4827 tx,
4828 &row.table,
4829 &row.natural_key.column,
4830 values.clone(),
4831 ) {
4832 Ok(_) => {
4833 visible_rows_cache.remove(&row.table);
4834 result.applied_rows += 1;
4835 if row_has_vector
4836 && vector_row_ids.get(vector_row_idx).is_some()
4837 && let Ok(Some(found)) = self.point_lookup_in_tx(
4838 tx,
4839 &row.table,
4840 &row.natural_key.column,
4841 &row.natural_key.value,
4842 self.snapshot(),
4843 )
4844 {
4845 consume_vector_row_group(
4846 &vector_row_ids,
4847 &mut vector_row_idx,
4848 found.row_id,
4849 &mut vector_row_map,
4850 );
4851 }
4852 }
4853 Err(err) => {
4854 if is_fatal_sync_apply_error(&err) {
4855 return Err(err);
4856 }
4857 result.skipped_rows += 1;
4858 if row_has_vector && vector_row_ids.get(vector_row_idx).is_some() {
4859 consume_failed_vector_row_group(
4860 &vector_row_ids,
4861 &mut vector_row_idx,
4862 &mut failed_row_ids,
4863 );
4864 }
4865 if let Some(last) = result.conflicts.last_mut() {
4866 last.reason = Some(format!("state_machine_or_constraint: {err}"));
4867 }
4868 }
4869 }
4870 }
4871 }
4872
4873 if !batch_row_commits {
4874 self.commit_with_source(tx, CommitSource::SyncPull)?;
4875 tx = self.begin();
4876 }
4877 }
4878
4879 if batch_row_commits {
4880 self.commit_with_source(tx, CommitSource::SyncPull)?;
4881 tx = self.begin();
4882 }
4883
4884 for edge in changes.edges {
4885 let is_delete = matches!(edge.properties.get("__deleted"), Some(Value::Bool(true)));
4886 if is_delete {
4887 let _ = self.delete_edge(tx, edge.source, edge.target, &edge.edge_type);
4888 } else {
4889 let _ = self.insert_edge(
4890 tx,
4891 edge.source,
4892 edge.target,
4893 edge.edge_type,
4894 edge.properties,
4895 );
4896 }
4897 }
4898
4899 for vector in changes.vectors {
4900 if failed_row_ids.contains(&vector.row_id) {
4901 continue; }
4903 let local_row_id = vector_row_map
4904 .get(&vector.row_id)
4905 .copied()
4906 .unwrap_or(vector.row_id);
4907 if vector.vector.is_empty() {
4908 self.vector
4909 .delete_vector(tx, vector.index.clone(), local_row_id)?;
4910 } else {
4911 if self.has_live_vector(local_row_id, self.snapshot()) {
4912 let _ = self.delete_vector(tx, vector.index.clone(), local_row_id);
4913 }
4914 self.insert_vector_strict(tx, vector.index.clone(), local_row_id, vector.vector)?;
4915 }
4916 }
4917
4918 self.commit_with_source(tx, CommitSource::SyncPull)?;
4919 result.new_lsn = self.current_lsn();
4920 Ok(result)
4921 }
4922
4923 fn row_change_values(
4924 &self,
4925 table: &str,
4926 row_id: RowId,
4927 ) -> Option<(NaturalKey, HashMap<String, Value>)> {
4928 let tables = self.relational_store.tables.read();
4929 let meta = self.relational_store.table_meta.read();
4930 let rows = tables.get(table)?;
4931 let row = rows.iter().find(|r| r.row_id == row_id)?;
4932 let key_col = meta
4933 .get(table)
4934 .and_then(|m| m.natural_key_column.clone())
4935 .or_else(|| {
4936 meta.get(table).and_then(|m| {
4937 m.columns
4938 .iter()
4939 .find(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
4940 .map(|_| "id".to_string())
4941 })
4942 })
4943 .or_else(|| {
4944 meta.get(table).and_then(|m| {
4945 m.columns
4946 .iter()
4947 .find(|c| c.primary_key && c.column_type == ColumnType::Uuid)
4948 .map(|c| c.name.clone())
4949 })
4950 })?;
4951
4952 let key_val = row.values.get(&key_col)?.clone();
4953 let values = row
4954 .values
4955 .iter()
4956 .map(|(k, v)| (k.clone(), v.clone()))
4957 .collect::<HashMap<_, _>>();
4958 Some((
4959 NaturalKey {
4960 column: key_col,
4961 value: key_val,
4962 },
4963 values,
4964 ))
4965 }
4966
4967 fn edge_properties(
4968 &self,
4969 source: NodeId,
4970 target: NodeId,
4971 edge_type: &str,
4972 lsn: Lsn,
4973 ) -> Option<HashMap<String, Value>> {
4974 self.graph_store
4975 .forward_adj
4976 .read()
4977 .get(&source)
4978 .and_then(|entries| {
4979 entries
4980 .iter()
4981 .find(|e| e.target == target && e.edge_type == edge_type && e.lsn == lsn)
4982 .map(|e| e.properties.clone())
4983 })
4984 }
4985
4986 fn vector_for_row_lsn(
4987 &self,
4988 index: &VectorIndexRef,
4989 row_id: RowId,
4990 lsn: Lsn,
4991 ) -> Option<Vec<f32>> {
4992 self.vector_store.vector_for_row_lsn(index, row_id, lsn)
4993 }
4994}
4995
4996fn strip_internal_row_id(mut qr: QueryResult) -> QueryResult {
4997 if let Some(pos) = qr.columns.iter().position(|c| c == "row_id") {
4998 qr.columns.remove(pos);
4999 for row in &mut qr.rows {
5000 if pos < row.len() {
5001 row.remove(pos);
5002 }
5003 }
5004 }
5005 qr
5006}
5007
5008fn cached_table_meta(
5009 db: &Database,
5010 cache: &mut HashMap<String, Option<TableMeta>>,
5011 table: &str,
5012) -> Option<TableMeta> {
5013 cache
5014 .entry(table.to_string())
5015 .or_insert_with(|| db.table_meta(table))
5016 .clone()
5017}
5018
5019pub(crate) fn rank_index_name(table: &str, column: &str) -> String {
5020 format!("{table}.{column}")
5021}
5022
5023fn rank_value_to_number(
5024 value: &Value,
5025 column: &str,
5026) -> std::result::Result<Option<f32>, FormulaEvalError> {
5027 match value {
5028 Value::Null => Ok(None),
5029 Value::Float64(value) => Ok(Some(*value as f32)),
5030 Value::Int64(value) => Ok(Some(*value as f32)),
5031 Value::Bool(value) => Ok(Some(if *value { 1.0 } else { 0.0 })),
5032 Value::Text(_) => Err(FormulaEvalError::UnsupportedType {
5033 column: column.to_string(),
5034 actual: "TEXT",
5035 }),
5036 Value::Json(_) => Err(FormulaEvalError::UnsupportedType {
5037 column: column.to_string(),
5038 actual: "JSON",
5039 }),
5040 Value::Uuid(_) => Err(FormulaEvalError::UnsupportedType {
5041 column: column.to_string(),
5042 actual: "UUID",
5043 }),
5044 Value::Vector(_) => Err(FormulaEvalError::UnsupportedType {
5045 column: column.to_string(),
5046 actual: "VECTOR",
5047 }),
5048 Value::Timestamp(_) => Err(FormulaEvalError::UnsupportedType {
5049 column: column.to_string(),
5050 actual: "TIMESTAMP",
5051 }),
5052 Value::TxId(_) => Err(FormulaEvalError::UnsupportedType {
5053 column: column.to_string(),
5054 actual: "TXID",
5055 }),
5056 }
5057}
5058
5059fn merged_rank_values(
5060 anchor: &VersionedRow,
5061 joined: Option<&VersionedRow>,
5062) -> HashMap<String, Value> {
5063 let mut values = anchor.values.clone();
5064 if let Some(joined) = joined {
5065 for (key, value) in &joined.values {
5066 values.entry(key.clone()).or_insert_with(|| value.clone());
5067 }
5068 }
5069 values
5070}
5071
5072fn values_equal_for_rank_join(left: &Value, right: &Value) -> bool {
5073 if matches!((left, right), (Value::Null, _) | (_, Value::Null)) {
5074 return false;
5075 }
5076 left == right
5077}
5078
5079fn compare_ranked_results(left: &SearchResult, right: &SearchResult) -> std::cmp::Ordering {
5080 rank_float_desc(left.rank, right.rank)
5081 .then_with(|| rank_float_desc(left.vector_score, right.vector_score))
5082 .then_with(|| right.row_id.cmp(&left.row_id))
5083}
5084
5085fn rank_float_desc(left: f32, right: f32) -> std::cmp::Ordering {
5086 match (left.is_nan(), right.is_nan()) {
5087 (true, true) => std::cmp::Ordering::Equal,
5088 (true, false) => std::cmp::Ordering::Greater,
5089 (false, true) => std::cmp::Ordering::Less,
5090 (false, false) => right.total_cmp(&left),
5091 }
5092}
5093
5094fn cached_visible_rows<'a>(
5095 db: &Database,
5096 cache: &'a mut HashMap<String, Vec<VersionedRow>>,
5097 table: &str,
5098) -> Result<&'a mut Vec<VersionedRow>> {
5099 if !cache.contains_key(table) {
5100 let rows = db.scan(table, db.snapshot())?;
5101 cache.insert(table.to_string(), rows);
5102 }
5103 Ok(cache.get_mut(table).expect("cached visible rows"))
5104}
5105
5106fn cached_point_lookup(
5107 db: &Database,
5108 cache: &mut HashMap<String, Vec<VersionedRow>>,
5109 table: &str,
5110 col: &str,
5111 value: &Value,
5112) -> Result<Option<VersionedRow>> {
5113 let rows = cached_visible_rows(db, cache, table)?;
5114 Ok(rows
5115 .iter()
5116 .find(|r| r.values.get(col) == Some(value))
5117 .cloned())
5118}
5119
5120fn record_cached_insert(
5121 cache: &mut HashMap<String, Vec<VersionedRow>>,
5122 table: &str,
5123 row: VersionedRow,
5124) {
5125 if let Some(rows) = cache.get_mut(table) {
5126 rows.push(row);
5127 }
5128}
5129
5130fn consume_vector_row_group(
5131 remote_row_ids: &[RowId],
5132 cursor: &mut usize,
5133 local_row_id: RowId,
5134 map: &mut HashMap<RowId, RowId>,
5135) {
5136 let Some(remote_row_id) = remote_row_ids.get(*cursor).copied() else {
5137 return;
5138 };
5139 while remote_row_ids.get(*cursor).copied() == Some(remote_row_id) {
5140 map.insert(remote_row_id, local_row_id);
5141 *cursor += 1;
5142 }
5143}
5144
5145fn consume_failed_vector_row_group(
5146 remote_row_ids: &[RowId],
5147 cursor: &mut usize,
5148 failed: &mut HashSet<RowId>,
5149) {
5150 let Some(remote_row_id) = remote_row_ids.get(*cursor).copied() else {
5151 return;
5152 };
5153 while remote_row_ids.get(*cursor).copied() == Some(remote_row_id) {
5154 failed.insert(remote_row_id);
5155 *cursor += 1;
5156 }
5157}
5158
5159fn vector_index_from_plan(plan: &PhysicalPlan) -> Option<VectorIndexRef> {
5160 match plan {
5161 PhysicalPlan::VectorSearch { table, column, .. }
5162 | PhysicalPlan::HnswSearch { table, column, .. } => {
5163 Some(VectorIndexRef::new(table.clone(), column.clone()))
5164 }
5165 PhysicalPlan::Project { input, .. }
5166 | PhysicalPlan::Filter { input, .. }
5167 | PhysicalPlan::Distinct { input }
5168 | PhysicalPlan::Limit { input, .. }
5169 | PhysicalPlan::Sort { input, .. }
5170 | PhysicalPlan::MaterializeCte { input, .. } => vector_index_from_plan(input),
5171 PhysicalPlan::Join { left, right, .. } => {
5172 vector_index_from_plan(left).or_else(|| vector_index_from_plan(right))
5173 }
5174 PhysicalPlan::Pipeline(plans) => plans.iter().find_map(vector_index_from_plan),
5175 _ => None,
5176 }
5177}
5178
5179fn hydrate_relational_vector_values(relational: &RelationalStore, vectors: &[VectorEntry]) {
5180 if vectors.is_empty() {
5181 return;
5182 }
5183 let mut tables = relational.tables.write();
5184 for entry in vectors {
5185 let Some(rows) = tables.get_mut(&entry.index.table) else {
5186 continue;
5187 };
5188 if let Some(row) = rows.iter_mut().find(|row| row.row_id == entry.row_id) {
5189 row.values.insert(
5190 entry.index.column.clone(),
5191 Value::Vector(entry.vector.clone()),
5192 );
5193 }
5194 }
5195}
5196
5197fn remove_cached_row(cache: &mut HashMap<String, Vec<VersionedRow>>, table: &str, row_id: RowId) {
5198 if let Some(rows) = cache.get_mut(table) {
5199 rows.retain(|row| row.row_id != row_id);
5200 }
5201}
5202
5203fn query_outcome_from_result(result: &Result<QueryResult>) -> QueryOutcome {
5204 match result {
5205 Ok(query_result) => QueryOutcome::Success {
5206 row_count: if query_result.rows.is_empty() {
5207 query_result.rows_affected as usize
5208 } else {
5209 query_result.rows.len()
5210 },
5211 },
5212 Err(error) => QueryOutcome::Error {
5213 error: error.to_string(),
5214 },
5215 }
5216}
5217
5218fn maybe_prebuild_hnsw(vector_store: &VectorStore, accountant: &MemoryAccountant) {
5219 let _ = (vector_store, accountant);
5220}
5221
5222fn estimate_row_bytes_for_meta(
5223 values: &HashMap<ColName, Value>,
5224 meta: &TableMeta,
5225 include_vectors: bool,
5226) -> usize {
5227 let mut bytes = 96usize;
5228 for column in &meta.columns {
5229 let Some(value) = values.get(&column.name) else {
5230 continue;
5231 };
5232 if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
5233 continue;
5234 }
5235 bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
5236 }
5237 bytes
5238}
5239
5240fn estimate_edge_bytes(
5241 source: NodeId,
5242 target: NodeId,
5243 edge_type: &str,
5244 properties: &HashMap<String, Value>,
5245) -> usize {
5246 AdjEntry {
5247 source,
5248 target,
5249 edge_type: edge_type.to_string(),
5250 properties: properties.clone(),
5251 created_tx: TxId(0),
5252 deleted_tx: None,
5253 lsn: Lsn(0),
5254 }
5255 .estimated_bytes()
5256}
5257
5258impl Drop for Database {
5259 fn drop(&mut self) {
5260 if self.closed.swap(true, Ordering::SeqCst) {
5261 return;
5262 }
5263 let runtime = self.pruning_runtime.get_mut();
5264 runtime.shutdown.store(true, Ordering::SeqCst);
5265 if let Some(handle) = runtime.handle.take() {
5266 let _ = handle.join();
5267 }
5268 self.subscriptions.lock().subscribers.clear();
5269 if let Some(persistence) = &self.persistence {
5270 persistence.close();
5271 }
5272 }
5273}
5274
5275fn sleep_with_shutdown(shutdown: &AtomicBool, interval: Duration) {
5276 let deadline = Instant::now() + interval;
5277 while !shutdown.load(Ordering::SeqCst) {
5278 let now = Instant::now();
5279 if now >= deadline {
5280 break;
5281 }
5282 let remaining = deadline.saturating_duration_since(now);
5283 thread::sleep(remaining.min(Duration::from_millis(50)));
5284 }
5285}
5286
5287fn prune_expired_rows(
5288 relational_store: &Arc<RelationalStore>,
5289 graph_store: &Arc<GraphStore>,
5290 vector_store: &Arc<VectorStore>,
5291 accountant: &MemoryAccountant,
5292 persistence: Option<&Arc<RedbPersistence>>,
5293 sync_watermark: Lsn,
5294) -> u64 {
5295 let now = Wallclock::now();
5296 let metas = relational_store.table_meta.read().clone();
5297 let mut pruned_by_table: HashMap<String, Vec<RowId>> = HashMap::new();
5298 let mut pruned_node_ids = HashSet::new();
5299 let mut released_row_bytes = 0usize;
5300
5301 {
5302 let mut tables = relational_store.tables.write();
5303 for (table_name, rows) in tables.iter_mut() {
5304 let Some(meta) = metas.get(table_name) else {
5305 continue;
5306 };
5307 if meta.default_ttl_seconds.is_none() {
5308 continue;
5309 }
5310
5311 rows.retain(|row| {
5312 if !row_is_prunable(row, meta, now, sync_watermark) {
5313 return true;
5314 }
5315
5316 pruned_by_table
5317 .entry(table_name.clone())
5318 .or_default()
5319 .push(row.row_id);
5320 released_row_bytes = released_row_bytes
5321 .saturating_add(estimate_row_bytes_for_meta(&row.values, meta, false));
5322 if let Some(Value::Uuid(id)) = row.values.get("id") {
5323 pruned_node_ids.insert(*id);
5324 }
5325 false
5326 });
5327 }
5328 }
5329
5330 let pruned_row_ids: HashSet<RowId> = pruned_by_table
5331 .values()
5332 .flat_map(|rows| rows.iter().copied())
5333 .collect();
5334 if pruned_row_ids.is_empty() {
5335 return 0;
5336 }
5337
5338 let released_vector_bytes = vector_store.prune_row_ids(&pruned_row_ids, accountant);
5339
5340 let mut released_edge_bytes = 0usize;
5341 {
5342 let mut forward = graph_store.forward_adj.write();
5343 for entries in forward.values_mut() {
5344 entries.retain(|entry| {
5345 if pruned_node_ids.contains(&entry.source)
5346 || pruned_node_ids.contains(&entry.target)
5347 {
5348 released_edge_bytes =
5349 released_edge_bytes.saturating_add(entry.estimated_bytes());
5350 false
5351 } else {
5352 true
5353 }
5354 });
5355 }
5356 forward.retain(|_, entries| !entries.is_empty());
5357 }
5358 {
5359 let mut reverse = graph_store.reverse_adj.write();
5360 for entries in reverse.values_mut() {
5361 entries.retain(|entry| {
5362 !pruned_node_ids.contains(&entry.source) && !pruned_node_ids.contains(&entry.target)
5363 });
5364 }
5365 reverse.retain(|_, entries| !entries.is_empty());
5366 }
5367
5368 if let Some(persistence) = persistence {
5369 for table_name in pruned_by_table.keys() {
5370 let rows = relational_store
5371 .tables
5372 .read()
5373 .get(table_name)
5374 .cloned()
5375 .unwrap_or_default();
5376 let _ = persistence.rewrite_table_rows(table_name, &rows);
5377 }
5378
5379 let vectors = vector_store.all_entries();
5380 let edges = graph_store
5381 .forward_adj
5382 .read()
5383 .values()
5384 .flat_map(|entries| entries.iter().cloned())
5385 .collect::<Vec<_>>();
5386 let _ = persistence.rewrite_vectors(&vectors);
5387 let _ = persistence.rewrite_graph_edges(&edges);
5388 }
5389
5390 accountant.release(
5391 released_row_bytes
5392 .saturating_add(released_vector_bytes)
5393 .saturating_add(released_edge_bytes),
5394 );
5395
5396 pruned_row_ids.len() as u64
5397}
5398
5399fn row_is_prunable(
5400 row: &VersionedRow,
5401 meta: &TableMeta,
5402 now: Wallclock,
5403 sync_watermark: Lsn,
5404) -> bool {
5405 if meta.sync_safe && row.lsn >= sync_watermark {
5406 return false;
5407 }
5408
5409 let Some(default_ttl_seconds) = meta.default_ttl_seconds else {
5410 return false;
5411 };
5412
5413 if let Some(expires_column) = &meta.expires_column {
5414 match row.values.get(expires_column) {
5415 Some(Value::Timestamp(millis)) if *millis == i64::MAX => return false,
5416 Some(Value::Timestamp(millis)) if *millis < 0 => return true,
5417 Some(Value::Timestamp(millis)) => return (*millis as u64) <= now.0,
5418 Some(Value::Null) | None => {}
5419 Some(_) => {}
5420 }
5421 }
5422
5423 let ttl_millis = default_ttl_seconds.saturating_mul(1000);
5424 row.created_at
5425 .map(|created_at| now.0.saturating_sub(created_at.0) > ttl_millis)
5426 .unwrap_or(false)
5427}
5428
5429fn max_tx_across_all(
5430 relational: &RelationalStore,
5431 graph: &GraphStore,
5432 vector: &VectorStore,
5433) -> TxId {
5434 let relational_max = relational
5435 .tables
5436 .read()
5437 .values()
5438 .flat_map(|rows| rows.iter())
5439 .flat_map(|row| std::iter::once(row.created_tx).chain(row.deleted_tx))
5440 .max()
5441 .unwrap_or(TxId(0));
5442 let graph_max = graph
5443 .forward_adj
5444 .read()
5445 .values()
5446 .flat_map(|entries| entries.iter())
5447 .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
5448 .max()
5449 .unwrap_or(TxId(0));
5450 let vector_max = vector
5451 .all_entries()
5452 .into_iter()
5453 .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
5454 .max()
5455 .unwrap_or(TxId(0));
5456
5457 relational_max.max(graph_max).max(vector_max)
5458}
5459
5460fn max_lsn_across_all(
5461 relational: &RelationalStore,
5462 graph: &GraphStore,
5463 vector: &VectorStore,
5464) -> Lsn {
5465 let relational_max = relational
5466 .tables
5467 .read()
5468 .values()
5469 .flat_map(|rows| rows.iter().map(|row| row.lsn))
5470 .max()
5471 .unwrap_or(Lsn(0));
5472 let graph_max = graph
5473 .forward_adj
5474 .read()
5475 .values()
5476 .flat_map(|entries| entries.iter().map(|entry| entry.lsn))
5477 .max()
5478 .unwrap_or(Lsn(0));
5479 let vector_max = vector
5480 .all_entries()
5481 .into_iter()
5482 .map(|entry| entry.lsn)
5483 .max()
5484 .unwrap_or(Lsn(0));
5485
5486 relational_max.max(graph_max).max(vector_max)
5487}
5488
5489fn persisted_memory_limit(path: &Path) -> Result<Option<usize>> {
5490 if !path.exists() {
5491 return Ok(None);
5492 }
5493 let persistence = RedbPersistence::open(path)?;
5494 let limit = persistence.load_config_value::<usize>("memory_limit")?;
5495 persistence.close();
5496 Ok(limit)
5497}
5498
5499fn is_fatal_sync_apply_error(err: &Error) -> bool {
5500 matches!(
5501 err,
5502 Error::MemoryBudgetExceeded { .. } | Error::DiskBudgetExceeded { .. }
5503 )
5504}
5505
5506fn ddl_change_from_create_table(ct: &CreateTable) -> DdlChange {
5507 DdlChange::CreateTable {
5508 name: ct.name.clone(),
5509 columns: ct
5510 .columns
5511 .iter()
5512 .map(|col| {
5513 (
5514 col.name.clone(),
5515 sql_type_for_ast_column(col, &ct.propagation_rules),
5516 )
5517 })
5518 .collect(),
5519 constraints: create_table_constraints_from_ast(ct),
5520 }
5521}
5522
5523fn ddl_change_from_meta(name: &str, meta: &TableMeta) -> DdlChange {
5524 ddl_change_from_meta_excluding(name, meta, &HashSet::new())
5525}
5526
5527fn ddl_change_from_meta_excluding(
5528 name: &str,
5529 meta: &TableMeta,
5530 excluded_columns: &HashSet<String>,
5531) -> DdlChange {
5532 DdlChange::CreateTable {
5533 name: name.to_string(),
5534 columns: meta
5535 .columns
5536 .iter()
5537 .filter(|col| !excluded_columns.contains(&col.name))
5538 .map(|col| {
5539 (
5540 col.name.clone(),
5541 sql_type_for_meta_column(col, &meta.propagation_rules),
5542 )
5543 })
5544 .collect(),
5545 constraints: create_table_constraints_from_meta(meta),
5546 }
5547}
5548
5549fn full_snapshot_ddl(metas: &HashMap<String, TableMeta>) -> Vec<DdlChange> {
5550 let mut names = metas.keys().cloned().collect::<Vec<_>>();
5551 names.sort();
5552
5553 let mut emitted = HashSet::new();
5554 let mut ddl = Vec::new();
5555 while emitted.len() < names.len() {
5556 let before = emitted.len();
5557 for name in &names {
5558 if emitted.contains(name) {
5559 continue;
5560 }
5561 let Some(meta) = metas.get(name) else {
5562 continue;
5563 };
5564 let deps_ready = meta
5565 .columns
5566 .iter()
5567 .filter_map(|column| column.rank_policy.as_ref())
5568 .all(|policy| {
5569 policy.joined_table == *name
5570 || !metas.contains_key(&policy.joined_table)
5571 || emitted.contains(&policy.joined_table)
5572 });
5573 if deps_ready {
5574 push_snapshot_table_ddl(&mut ddl, name, meta);
5575 emitted.insert(name.clone());
5576 }
5577 }
5578 if emitted.len() == before {
5579 for name in &names {
5580 if !emitted.contains(name)
5581 && let Some(meta) = metas.get(name)
5582 {
5583 push_snapshot_table_ddl(&mut ddl, name, meta);
5584 emitted.insert(name.clone());
5585 }
5586 }
5587 }
5588 }
5589 ddl
5590}
5591
5592fn push_snapshot_table_ddl(ddl: &mut Vec<DdlChange>, name: &str, meta: &TableMeta) {
5593 let deferred_self_rank_columns = meta
5594 .columns
5595 .iter()
5596 .filter(|column| {
5597 column
5598 .rank_policy
5599 .as_ref()
5600 .is_some_and(|policy| policy.joined_table == name)
5601 })
5602 .map(|column| column.name.clone())
5603 .collect::<HashSet<_>>();
5604 ddl.push(ddl_change_from_meta_excluding(
5605 name,
5606 meta,
5607 &deferred_self_rank_columns,
5608 ));
5609 for index in &meta.indexes {
5610 if index.kind == contextdb_core::IndexKind::UserDeclared {
5611 ddl.push(DdlChange::CreateIndex {
5612 table: name.to_string(),
5613 name: index.name.clone(),
5614 columns: index.columns.clone(),
5615 });
5616 }
5617 }
5618 if !deferred_self_rank_columns.is_empty() {
5619 ddl.push(DdlChange::AlterTable {
5620 name: name.to_string(),
5621 columns: meta
5622 .columns
5623 .iter()
5624 .map(|col| {
5625 (
5626 col.name.clone(),
5627 sql_type_for_meta_column(col, &meta.propagation_rules),
5628 )
5629 })
5630 .collect(),
5631 constraints: create_table_constraints_from_meta(meta),
5632 });
5633 }
5634}
5635
5636fn sql_type_for_ast(data_type: &DataType) -> String {
5637 match data_type {
5638 DataType::Uuid => "UUID".to_string(),
5639 DataType::Text => "TEXT".to_string(),
5640 DataType::Integer => "INTEGER".to_string(),
5641 DataType::Real => "REAL".to_string(),
5642 DataType::Boolean => "BOOLEAN".to_string(),
5643 DataType::Timestamp => "TIMESTAMP".to_string(),
5644 DataType::Json => "JSON".to_string(),
5645 DataType::Vector(dim) => format!("VECTOR({dim})"),
5646 DataType::TxId => "TXID".to_string(),
5647 }
5648}
5649
5650fn sql_type_for_ast_column(
5651 col: &contextdb_parser::ast::ColumnDef,
5652 _rules: &[contextdb_parser::ast::AstPropagationRule],
5653) -> String {
5654 let mut ty = sql_type_for_ast(&col.data_type);
5655 append_ast_quantization(&mut ty, col.quantization);
5656 if let Some(reference) = &col.references {
5657 ty.push_str(&format!(
5658 " REFERENCES {}({})",
5659 reference.table, reference.column
5660 ));
5661 for rule in &reference.propagation_rules {
5662 if let contextdb_parser::ast::AstPropagationRule::FkState {
5663 trigger_state,
5664 target_state,
5665 max_depth,
5666 abort_on_failure,
5667 } = rule
5668 {
5669 ty.push_str(&format!(
5670 " ON STATE {} PROPAGATE SET {}",
5671 trigger_state, target_state
5672 ));
5673 if max_depth.unwrap_or(10) != 10 {
5674 ty.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
5675 }
5676 if *abort_on_failure {
5677 ty.push_str(" ABORT ON FAILURE");
5678 }
5679 }
5680 }
5681 }
5682 if col.primary_key {
5683 ty.push_str(" PRIMARY KEY");
5684 }
5685 if !col.nullable && !col.primary_key {
5686 ty.push_str(" NOT NULL");
5687 }
5688 if col.unique {
5689 ty.push_str(" UNIQUE");
5690 }
5691 if col.immutable {
5692 ty.push_str(" IMMUTABLE");
5693 }
5694 if let Some(policy) = col.rank_policy.as_deref() {
5695 ty.push_str(&format!(
5696 " RANK_POLICY (JOIN {} ON {}, FORMULA '{}', SORT_KEY {})",
5697 policy.joined_table,
5698 policy.joined_column,
5699 sql_quote(&policy.formula),
5700 policy.sort_key
5701 ));
5702 }
5703 ty
5704}
5705
5706fn sql_type_for_meta_column(col: &contextdb_core::ColumnDef, rules: &[PropagationRule]) -> String {
5707 let mut ty = match col.column_type {
5708 ColumnType::Integer => "INTEGER".to_string(),
5709 ColumnType::Real => "REAL".to_string(),
5710 ColumnType::Text => "TEXT".to_string(),
5711 ColumnType::Boolean => "BOOLEAN".to_string(),
5712 ColumnType::Json => "JSON".to_string(),
5713 ColumnType::Uuid => "UUID".to_string(),
5714 ColumnType::Vector(dim) => format!("VECTOR({dim})"),
5715 ColumnType::Timestamp => "TIMESTAMP".to_string(),
5716 ColumnType::TxId => "TXID".to_string(),
5717 };
5718 append_core_quantization(&mut ty, col.quantization);
5719
5720 let fk_rules = rules
5721 .iter()
5722 .filter_map(|rule| match rule {
5723 PropagationRule::ForeignKey {
5724 fk_column,
5725 referenced_table,
5726 referenced_column,
5727 trigger_state,
5728 target_state,
5729 max_depth,
5730 abort_on_failure,
5731 } if fk_column == &col.name => Some((
5732 referenced_table,
5733 referenced_column,
5734 trigger_state,
5735 target_state,
5736 *max_depth,
5737 *abort_on_failure,
5738 )),
5739 _ => None,
5740 })
5741 .collect::<Vec<_>>();
5742
5743 if let Some(reference) = &col.references {
5744 ty.push_str(&format!(
5745 " REFERENCES {}({})",
5746 reference.table, reference.column
5747 ));
5748 } else if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
5749 ty.push_str(&format!(
5750 " REFERENCES {}({})",
5751 referenced_table, referenced_column
5752 ));
5753 }
5754
5755 if col.references.is_some() || !fk_rules.is_empty() {
5756 for (_, _, trigger_state, target_state, max_depth, abort_on_failure) in fk_rules {
5757 ty.push_str(&format!(
5758 " ON STATE {} PROPAGATE SET {}",
5759 trigger_state, target_state
5760 ));
5761 if max_depth != 10 {
5762 ty.push_str(&format!(" MAX DEPTH {max_depth}"));
5763 }
5764 if abort_on_failure {
5765 ty.push_str(" ABORT ON FAILURE");
5766 }
5767 }
5768 }
5769 if col.primary_key {
5770 ty.push_str(" PRIMARY KEY");
5771 }
5772 if !col.nullable && !col.primary_key {
5773 ty.push_str(" NOT NULL");
5774 }
5775 if col.unique {
5776 ty.push_str(" UNIQUE");
5777 }
5778 if col.expires {
5779 ty.push_str(" EXPIRES");
5780 }
5781 if col.immutable {
5782 ty.push_str(" IMMUTABLE");
5783 }
5784 if let Some(policy) = &col.rank_policy {
5785 ty.push_str(&format!(
5786 " RANK_POLICY (JOIN {} ON {}, FORMULA '{}', SORT_KEY {})",
5787 policy.joined_table,
5788 policy.joined_column,
5789 sql_quote(&policy.formula),
5790 policy.sort_key
5791 ));
5792 }
5793
5794 ty
5795}
5796
5797fn append_ast_quantization(
5798 ty: &mut String,
5799 quantization: contextdb_parser::ast::VectorQuantization,
5800) {
5801 let quantization = match quantization {
5802 contextdb_parser::ast::VectorQuantization::F32 => return,
5803 contextdb_parser::ast::VectorQuantization::SQ8 => "SQ8",
5804 contextdb_parser::ast::VectorQuantization::SQ4 => "SQ4",
5805 };
5806 ty.push_str(&format!(" WITH (quantization = '{quantization}')"));
5807}
5808
5809fn append_core_quantization(ty: &mut String, quantization: contextdb_core::VectorQuantization) {
5810 if !matches!(quantization, contextdb_core::VectorQuantization::F32) {
5811 ty.push_str(&format!(
5812 " WITH (quantization = '{}')",
5813 quantization.as_str()
5814 ));
5815 }
5816}
5817
5818fn sql_quote(value: &str) -> String {
5819 value.replace('\'', "''")
5820}
5821
5822fn normalize_schema_type(value: &str) -> String {
5823 value.split_whitespace().collect::<Vec<_>>().join(" ")
5824}
5825
5826fn create_table_constraints_from_ast(ct: &CreateTable) -> Vec<String> {
5827 let mut constraints = Vec::new();
5828
5829 if ct.immutable {
5830 constraints.push("IMMUTABLE".to_string());
5831 }
5832
5833 if let Some(sm) = &ct.state_machine {
5834 let transitions = sm
5835 .transitions
5836 .iter()
5837 .map(|(from, tos)| format!("{from} -> [{}]", tos.join(", ")))
5838 .collect::<Vec<_>>()
5839 .join(", ");
5840 constraints.push(format!("STATE MACHINE ({}: {})", sm.column, transitions));
5841 }
5842
5843 if !ct.dag_edge_types.is_empty() {
5844 let edge_types = ct
5845 .dag_edge_types
5846 .iter()
5847 .map(|edge_type| format!("'{edge_type}'"))
5848 .collect::<Vec<_>>()
5849 .join(", ");
5850 constraints.push(format!("DAG({edge_types})"));
5851 }
5852
5853 if let Some(retain) = &ct.retain {
5854 let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(retain.duration_seconds));
5855 if retain.sync_safe {
5856 clause.push_str(" SYNC SAFE");
5857 }
5858 constraints.push(clause);
5859 }
5860
5861 for unique_constraint in &ct.unique_constraints {
5862 constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
5863 }
5864
5865 for rule in &ct.propagation_rules {
5866 match rule {
5867 contextdb_parser::ast::AstPropagationRule::EdgeState {
5868 edge_type,
5869 direction,
5870 trigger_state,
5871 target_state,
5872 max_depth,
5873 abort_on_failure,
5874 } => {
5875 let mut clause = format!(
5876 "PROPAGATE ON EDGE {} {} STATE {} SET {}",
5877 edge_type, direction, trigger_state, target_state
5878 );
5879 if max_depth.unwrap_or(10) != 10 {
5880 clause.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
5881 }
5882 if *abort_on_failure {
5883 clause.push_str(" ABORT ON FAILURE");
5884 }
5885 constraints.push(clause);
5886 }
5887 contextdb_parser::ast::AstPropagationRule::VectorExclusion { trigger_state } => {
5888 constraints.push(format!(
5889 "PROPAGATE ON STATE {} EXCLUDE VECTOR",
5890 trigger_state
5891 ));
5892 }
5893 contextdb_parser::ast::AstPropagationRule::FkState { .. } => {}
5894 }
5895 }
5896
5897 constraints
5898}
5899
5900fn create_table_constraints_from_meta(meta: &TableMeta) -> Vec<String> {
5901 let mut constraints = Vec::new();
5902
5903 if meta.immutable {
5904 constraints.push("IMMUTABLE".to_string());
5905 }
5906
5907 if let Some(sm) = &meta.state_machine {
5908 let states = sm
5909 .transitions
5910 .iter()
5911 .map(|(from, to)| format!("{from} -> [{}]", to.join(", ")))
5912 .collect::<Vec<_>>()
5913 .join(", ");
5914 constraints.push(format!("STATE MACHINE ({}: {})", sm.column, states));
5915 }
5916
5917 if !meta.dag_edge_types.is_empty() {
5918 let edge_types = meta
5919 .dag_edge_types
5920 .iter()
5921 .map(|edge_type| format!("'{edge_type}'"))
5922 .collect::<Vec<_>>()
5923 .join(", ");
5924 constraints.push(format!("DAG({edge_types})"));
5925 }
5926
5927 if let Some(ttl_seconds) = meta.default_ttl_seconds {
5928 let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(ttl_seconds));
5929 if meta.sync_safe {
5930 clause.push_str(" SYNC SAFE");
5931 }
5932 constraints.push(clause);
5933 }
5934
5935 for unique_constraint in &meta.unique_constraints {
5936 constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
5937 }
5938
5939 for rule in &meta.propagation_rules {
5940 match rule {
5941 PropagationRule::Edge {
5942 edge_type,
5943 direction,
5944 trigger_state,
5945 target_state,
5946 max_depth,
5947 abort_on_failure,
5948 } => {
5949 let dir = match direction {
5950 Direction::Incoming => "INCOMING",
5951 Direction::Outgoing => "OUTGOING",
5952 Direction::Both => "BOTH",
5953 };
5954 let mut clause = format!(
5955 "PROPAGATE ON EDGE {} {} STATE {} SET {}",
5956 edge_type, dir, trigger_state, target_state
5957 );
5958 if *max_depth != 10 {
5959 clause.push_str(&format!(" MAX DEPTH {max_depth}"));
5960 }
5961 if *abort_on_failure {
5962 clause.push_str(" ABORT ON FAILURE");
5963 }
5964 constraints.push(clause);
5965 }
5966 PropagationRule::VectorExclusion { trigger_state } => {
5967 constraints.push(format!(
5968 "PROPAGATE ON STATE {} EXCLUDE VECTOR",
5969 trigger_state
5970 ));
5971 }
5972 PropagationRule::ForeignKey { .. } => {}
5973 }
5974 }
5975
5976 constraints
5977}
5978
5979fn ttl_seconds_to_sql(seconds: u64) -> String {
5980 if seconds.is_multiple_of(24 * 60 * 60) {
5981 format!("{} DAYS", seconds / (24 * 60 * 60))
5982 } else if seconds.is_multiple_of(60 * 60) {
5983 format!("{} HOURS", seconds / (60 * 60))
5984 } else if seconds.is_multiple_of(60) {
5985 format!("{} MINUTES", seconds / 60)
5986 } else {
5987 format!("{seconds} SECONDS")
5988 }
5989}