1use crate::composite_store::{ChangeLogEntry, CompositeStore};
2use crate::executor::execute_plan;
3use crate::persistence::RedbPersistence;
4use crate::persistent_store::PersistentCompositeStore;
5use crate::plugin::{
6 CommitEvent, CommitSource, CorePlugin, DatabasePlugin, PluginHealth, QueryOutcome,
7 SubscriptionMetrics,
8};
9use crate::schema_enforcer::validate_dml;
10use crate::sync_types::{
11 ApplyResult, ChangeSet, Conflict, ConflictPolicies, ConflictPolicy, DdlChange, EdgeChange,
12 NaturalKey, RowChange, VectorChange,
13};
14use contextdb_core::*;
15use contextdb_graph::{GraphStore, MemGraphExecutor};
16use contextdb_parser::Statement;
17use contextdb_parser::ast::{AlterAction, CreateTable, DataType};
18use contextdb_planner::PhysicalPlan;
19use contextdb_relational::{MemRelationalExecutor, RelationalStore};
20use contextdb_tx::{TxManager, WriteSetApplicator};
21use contextdb_vector::{HnswIndex, MemVectorExecutor, VectorStore};
22use parking_lot::{Mutex, RwLock};
23use roaring::RoaringTreemap;
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::path::Path;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
28use std::sync::{Arc, OnceLock};
29use std::thread::{self, JoinHandle};
30use std::time::{Duration, Instant};
31
32type DynStore = Box<dyn WriteSetApplicator>;
33const DEFAULT_SUBSCRIPTION_CAPACITY: usize = 64;
34
35#[derive(Debug, Clone)]
36pub struct QueryResult {
37 pub columns: Vec<String>,
38 pub rows: Vec<Vec<Value>>,
39 pub rows_affected: u64,
40}
41
42impl QueryResult {
43 pub fn empty() -> Self {
44 Self {
45 columns: vec![],
46 rows: vec![],
47 rows_affected: 0,
48 }
49 }
50
51 pub fn empty_with_affected(rows_affected: u64) -> Self {
52 Self {
53 columns: vec![],
54 rows: vec![],
55 rows_affected,
56 }
57 }
58}
59
60pub struct Database {
61 tx_mgr: Arc<TxManager<DynStore>>,
62 relational_store: Arc<RelationalStore>,
63 graph_store: Arc<GraphStore>,
64 vector_store: Arc<VectorStore>,
65 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
66 ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
67 persistence: Option<Arc<RedbPersistence>>,
68 relational: MemRelationalExecutor<DynStore>,
69 graph: MemGraphExecutor<DynStore>,
70 vector: MemVectorExecutor<DynStore>,
71 session_tx: Mutex<Option<TxId>>,
72 instance_id: uuid::Uuid,
73 plugin: Arc<dyn DatabasePlugin>,
74 accountant: Arc<MemoryAccountant>,
75 conflict_policies: RwLock<ConflictPolicies>,
76 subscriptions: Mutex<SubscriptionState>,
77 pruning_runtime: Mutex<PruningRuntime>,
78 pruning_guard: Arc<Mutex<()>>,
79 disk_limit: AtomicU64,
80 disk_limit_startup_ceiling: AtomicU64,
81 sync_watermark: Arc<AtomicU64>,
82 closed: AtomicBool,
83}
84
85impl std::fmt::Debug for Database {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("Database")
88 .field("instance_id", &self.instance_id)
89 .finish_non_exhaustive()
90 }
91}
92
93#[derive(Debug, Clone)]
94struct PropagationQueueEntry {
95 table: String,
96 uuid: uuid::Uuid,
97 target_state: String,
98 depth: u32,
99 abort_on_failure: bool,
100}
101
102#[derive(Debug, Clone, Copy)]
103struct PropagationSource<'a> {
104 table: &'a str,
105 uuid: uuid::Uuid,
106 state: &'a str,
107 depth: u32,
108}
109
110#[derive(Debug, Clone, Copy)]
111struct PropagationContext<'a> {
112 tx: TxId,
113 snapshot: SnapshotId,
114 metas: &'a HashMap<String, TableMeta>,
115}
116
117#[derive(Debug)]
118struct SubscriptionState {
119 subscribers: Vec<SyncSender<CommitEvent>>,
120 events_sent: u64,
121 events_dropped: u64,
122}
123
124impl SubscriptionState {
125 fn new() -> Self {
126 Self {
127 subscribers: Vec::new(),
128 events_sent: 0,
129 events_dropped: 0,
130 }
131 }
132}
133
134#[derive(Debug)]
135struct PruningRuntime {
136 shutdown: Arc<AtomicBool>,
137 handle: Option<JoinHandle<()>>,
138}
139
140impl PruningRuntime {
141 fn new() -> Self {
142 Self {
143 shutdown: Arc::new(AtomicBool::new(false)),
144 handle: None,
145 }
146 }
147}
148
149impl Database {
150 #[allow(clippy::too_many_arguments)]
151 fn build_db(
152 tx_mgr: Arc<TxManager<DynStore>>,
153 relational: Arc<RelationalStore>,
154 graph: Arc<GraphStore>,
155 vector_store: Arc<VectorStore>,
156 hnsw: Arc<OnceLock<parking_lot::RwLock<Option<HnswIndex>>>>,
157 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
158 ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
159 persistence: Option<Arc<RedbPersistence>>,
160 plugin: Arc<dyn DatabasePlugin>,
161 accountant: Arc<MemoryAccountant>,
162 disk_limit: Option<u64>,
163 disk_limit_startup_ceiling: Option<u64>,
164 ) -> Self {
165 Self {
166 tx_mgr: tx_mgr.clone(),
167 relational_store: relational.clone(),
168 graph_store: graph.clone(),
169 vector_store: vector_store.clone(),
170 change_log,
171 ddl_log,
172 persistence,
173 relational: MemRelationalExecutor::new(relational, tx_mgr.clone()),
174 graph: MemGraphExecutor::new(graph, tx_mgr.clone()),
175 vector: MemVectorExecutor::new_with_accountant(
176 vector_store,
177 tx_mgr.clone(),
178 hnsw,
179 accountant.clone(),
180 ),
181 session_tx: Mutex::new(None),
182 instance_id: uuid::Uuid::new_v4(),
183 plugin,
184 accountant,
185 conflict_policies: RwLock::new(ConflictPolicies::uniform(ConflictPolicy::LatestWins)),
186 subscriptions: Mutex::new(SubscriptionState::new()),
187 pruning_runtime: Mutex::new(PruningRuntime::new()),
188 pruning_guard: Arc::new(Mutex::new(())),
189 disk_limit: AtomicU64::new(disk_limit.unwrap_or(0)),
190 disk_limit_startup_ceiling: AtomicU64::new(disk_limit_startup_ceiling.unwrap_or(0)),
191 sync_watermark: Arc::new(AtomicU64::new(0)),
192 closed: AtomicBool::new(false),
193 }
194 }
195
196 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
197 Self::open_with_config(
198 path,
199 Arc::new(CorePlugin),
200 Arc::new(MemoryAccountant::no_limit()),
201 )
202 }
203
204 pub fn open_memory() -> Self {
205 Self::open_memory_with_plugin_and_accountant(
206 Arc::new(CorePlugin),
207 Arc::new(MemoryAccountant::no_limit()),
208 )
209 .expect("failed to open in-memory database")
210 }
211
212 fn open_loaded(
213 path: impl AsRef<Path>,
214 plugin: Arc<dyn DatabasePlugin>,
215 mut accountant: Arc<MemoryAccountant>,
216 startup_disk_limit: Option<u64>,
217 ) -> Result<Self> {
218 let path = path.as_ref();
219 let persistence = if path.exists() {
220 Arc::new(RedbPersistence::open(path)?)
221 } else {
222 Arc::new(RedbPersistence::create(path)?)
223 };
224 if accountant.usage().limit.is_none()
225 && let Some(limit) = persistence.load_config_value::<usize>("memory_limit")?
226 {
227 accountant = Arc::new(MemoryAccountant::with_budget(limit));
228 }
229 let persisted_disk_limit = persistence.load_config_value::<u64>("disk_limit")?;
230 let startup_disk_ceiling = startup_disk_limit;
231 let effective_disk_limit = match (persisted_disk_limit, startup_disk_limit) {
232 (Some(persisted), Some(ceiling)) => Some(persisted.min(ceiling)),
233 (Some(persisted), None) => Some(persisted),
234 (None, Some(ceiling)) => Some(ceiling),
235 (None, None) => None,
236 };
237
238 let all_meta = persistence.load_all_table_meta()?;
239
240 let relational = Arc::new(RelationalStore::new());
241 for (name, meta) in &all_meta {
242 relational.create_table(name, meta.clone());
243 for row in persistence.load_relational_table(name)? {
244 relational.insert_loaded_row(name, row);
245 }
246 }
247
248 let graph = Arc::new(GraphStore::new());
249 for edge in persistence.load_forward_edges()? {
250 graph.insert_loaded_edge(edge);
251 }
252
253 let hnsw = Arc::new(OnceLock::new());
254 let vector = Arc::new(VectorStore::new(hnsw.clone()));
255 for meta in all_meta.values() {
256 for column in &meta.columns {
257 if let ColumnType::Vector(dimension) = column.column_type {
258 vector.set_dimension(dimension);
259 break;
260 }
261 }
262 }
263 for entry in persistence.load_vectors()? {
264 vector.insert_loaded_vector(entry);
265 }
266
267 let max_row_id = relational.max_row_id();
268 let max_tx = max_tx_across_all(&relational, &graph, &vector);
269 let max_lsn = max_lsn_across_all(&relational, &graph, &vector);
270 relational.set_next_row_id(max_row_id.saturating_add(1));
271
272 let change_log = Arc::new(RwLock::new(persistence.load_change_log()?));
273 let ddl_log = Arc::new(RwLock::new(persistence.load_ddl_log()?));
274 let composite = CompositeStore::new(
275 relational.clone(),
276 graph.clone(),
277 vector.clone(),
278 change_log.clone(),
279 ddl_log.clone(),
280 );
281 let persistent = PersistentCompositeStore::new(composite, persistence.clone());
282 let store: DynStore = Box::new(persistent);
283 let tx_mgr = Arc::new(TxManager::new_with_counters(
284 store,
285 max_tx.saturating_add(1),
286 max_lsn.saturating_add(1),
287 max_tx,
288 ));
289
290 let db = Self::build_db(
291 tx_mgr,
292 relational,
293 graph,
294 vector,
295 hnsw,
296 change_log,
297 ddl_log,
298 Some(persistence),
299 plugin,
300 accountant,
301 effective_disk_limit,
302 startup_disk_ceiling,
303 );
304
305 for meta in all_meta.values() {
306 if !meta.dag_edge_types.is_empty() {
307 db.graph.register_dag_edge_types(&meta.dag_edge_types);
308 }
309 }
310
311 db.account_loaded_state()?;
312 maybe_prebuild_hnsw(&db.vector_store, db.accountant());
313
314 Ok(db)
315 }
316
317 fn open_memory_internal(
318 plugin: Arc<dyn DatabasePlugin>,
319 accountant: Arc<MemoryAccountant>,
320 ) -> Result<Self> {
321 let relational = Arc::new(RelationalStore::new());
322 let graph = Arc::new(GraphStore::new());
323 let hnsw = Arc::new(OnceLock::new());
324 let vector = Arc::new(VectorStore::new(hnsw.clone()));
325 let change_log = Arc::new(RwLock::new(Vec::new()));
326 let ddl_log = Arc::new(RwLock::new(Vec::new()));
327 let store: DynStore = Box::new(CompositeStore::new(
328 relational.clone(),
329 graph.clone(),
330 vector.clone(),
331 change_log.clone(),
332 ddl_log.clone(),
333 ));
334 let tx_mgr = Arc::new(TxManager::new(store));
335
336 let db = Self::build_db(
337 tx_mgr, relational, graph, vector, hnsw, change_log, ddl_log, None, plugin, accountant,
338 None, None,
339 );
340 maybe_prebuild_hnsw(&db.vector_store, db.accountant());
341 Ok(db)
342 }
343
344 pub fn begin(&self) -> TxId {
345 self.tx_mgr.begin()
346 }
347
348 pub fn commit(&self, tx: TxId) -> Result<()> {
349 self.commit_with_source(tx, CommitSource::User)
350 }
351
352 pub fn rollback(&self, tx: TxId) -> Result<()> {
353 let ws = self.tx_mgr.cloned_write_set(tx)?;
354 self.release_insert_allocations(&ws);
355 self.tx_mgr.rollback(tx)
356 }
357
358 pub fn snapshot(&self) -> SnapshotId {
359 self.tx_mgr.snapshot()
360 }
361
362 pub fn execute(&self, sql: &str, params: &HashMap<String, Value>) -> Result<QueryResult> {
363 let stmt = contextdb_parser::parse(sql)?;
364
365 match &stmt {
366 Statement::Begin => {
367 let mut session = self.session_tx.lock();
368 if session.is_none() {
369 *session = Some(self.begin());
370 }
371 return Ok(QueryResult::empty());
372 }
373 Statement::Commit => {
374 let mut session = self.session_tx.lock();
375 if let Some(tx) = session.take() {
376 self.commit_with_source(tx, CommitSource::User)?;
377 }
378 return Ok(QueryResult::empty());
379 }
380 Statement::Rollback => {
381 let mut session = self.session_tx.lock();
382 if let Some(tx) = *session {
383 self.rollback(tx)?;
384 *session = None;
385 }
386 return Ok(QueryResult::empty());
387 }
388 _ => {}
389 }
390
391 let active_tx = *self.session_tx.lock();
392 self.execute_statement(&stmt, sql, params, active_tx)
393 }
394
395 fn execute_autocommit(
396 &self,
397 plan: &PhysicalPlan,
398 params: &HashMap<String, Value>,
399 ) -> Result<QueryResult> {
400 match plan {
401 PhysicalPlan::Insert(_) | PhysicalPlan::Delete(_) | PhysicalPlan::Update(_) => {
402 let tx = self.begin();
403 let result = execute_plan(self, plan, params, Some(tx));
404 match result {
405 Ok(qr) => {
406 self.commit_with_source(tx, CommitSource::AutoCommit)?;
407 Ok(qr)
408 }
409 Err(e) => {
410 let _ = self.rollback(tx);
411 Err(e)
412 }
413 }
414 }
415 _ => execute_plan(self, plan, params, None),
416 }
417 }
418
419 pub fn explain(&self, sql: &str) -> Result<String> {
420 let stmt = contextdb_parser::parse(sql)?;
421 let plan = contextdb_planner::plan(&stmt)?;
422 if self.vector_store.vector_count() >= 1000 && !self.vector_store.has_hnsw_index() {
423 maybe_prebuild_hnsw(&self.vector_store, self.accountant());
424 }
425 let mut output = plan.explain();
426 if self.vector_store.has_hnsw_index() {
427 output = output.replace("VectorSearch(", "HNSWSearch(");
428 output = output.replace("VectorSearch {", "HNSWSearch {");
429 } else {
430 output = output.replace("VectorSearch(", "VectorSearch(strategy=BruteForce, ");
431 output = output.replace("VectorSearch {", "VectorSearch { strategy: BruteForce,");
432 }
433 Ok(output)
434 }
435
436 pub fn execute_in_tx(
437 &self,
438 tx: TxId,
439 sql: &str,
440 params: &HashMap<String, Value>,
441 ) -> Result<QueryResult> {
442 let stmt = contextdb_parser::parse(sql)?;
443 self.execute_statement(&stmt, sql, params, Some(tx))
444 }
445
446 fn commit_with_source(&self, tx: TxId, source: CommitSource) -> Result<()> {
447 let mut ws = self.tx_mgr.cloned_write_set(tx)?;
448
449 if !ws.is_empty()
450 && let Err(err) = self.plugin.pre_commit(&ws, source)
451 {
452 let _ = self.rollback(tx);
453 return Err(err);
454 }
455
456 let lsn = self.tx_mgr.commit_with_lsn(tx)?;
457
458 if !ws.is_empty() {
459 self.release_delete_allocations(&ws);
460 ws.stamp_lsn(lsn);
461 self.plugin.post_commit(&ws, source);
462 self.publish_commit_event(Self::build_commit_event(&ws, source, lsn));
463 }
464
465 Ok(())
466 }
467
468 fn build_commit_event(
469 ws: &contextdb_tx::WriteSet,
470 source: CommitSource,
471 lsn: u64,
472 ) -> CommitEvent {
473 let mut tables_changed: Vec<String> = ws
474 .relational_inserts
475 .iter()
476 .map(|(table, _)| table.clone())
477 .chain(
478 ws.relational_deletes
479 .iter()
480 .map(|(table, _, _)| table.clone()),
481 )
482 .collect::<HashSet<_>>()
483 .into_iter()
484 .collect();
485 tables_changed.sort();
486
487 CommitEvent {
488 source,
489 lsn,
490 tables_changed,
491 row_count: ws.relational_inserts.len()
492 + ws.relational_deletes.len()
493 + ws.adj_inserts.len()
494 + ws.adj_deletes.len()
495 + ws.vector_inserts.len()
496 + ws.vector_deletes.len(),
497 }
498 }
499
500 fn publish_commit_event(&self, event: CommitEvent) {
501 let mut subscriptions = self.subscriptions.lock();
502 let subscribers = std::mem::take(&mut subscriptions.subscribers);
503 let mut live_subscribers = Vec::with_capacity(subscribers.len());
504
505 for sender in subscribers {
506 match sender.try_send(event.clone()) {
507 Ok(()) => {
508 subscriptions.events_sent += 1;
509 live_subscribers.push(sender);
510 }
511 Err(TrySendError::Full(_)) => {
512 subscriptions.events_dropped += 1;
513 live_subscribers.push(sender);
514 }
515 Err(TrySendError::Disconnected(_)) => {}
516 }
517 }
518
519 subscriptions.subscribers = live_subscribers;
520 }
521
522 fn stop_pruning_thread(&self) {
523 let handle = {
524 let mut runtime = self.pruning_runtime.lock();
525 runtime.shutdown.store(true, Ordering::SeqCst);
526 let handle = runtime.handle.take();
527 runtime.shutdown = Arc::new(AtomicBool::new(false));
528 handle
529 };
530
531 if let Some(handle) = handle {
532 let _ = handle.join();
533 }
534 }
535
536 fn execute_statement(
537 &self,
538 stmt: &Statement,
539 sql: &str,
540 params: &HashMap<String, Value>,
541 tx: Option<TxId>,
542 ) -> Result<QueryResult> {
543 self.plugin.on_query(sql)?;
544
545 if let Some(change) = self.ddl_change_for_statement(stmt).as_ref() {
546 self.plugin.on_ddl(change)?;
547 }
548
549 if let Statement::Insert(ins) = stmt
551 && (ins.table.eq_ignore_ascii_case("GRAPH")
552 || ins.table.eq_ignore_ascii_case("__edges"))
553 {
554 return self.execute_graph_insert(ins, params, tx);
555 }
556
557 let started = Instant::now();
558 let result = (|| {
559 let stmt = self.pre_resolve_cte_subqueries(stmt, params, tx)?;
561 let plan = contextdb_planner::plan(&stmt)?;
562 validate_dml(&plan, self, params)?;
563 let result = match tx {
564 Some(tx) => execute_plan(self, &plan, params, Some(tx)),
565 None => self.execute_autocommit(&plan, params),
566 };
567 if result.is_ok()
568 && let Statement::CreateTable(ct) = &stmt
569 && !ct.dag_edge_types.is_empty()
570 {
571 self.graph.register_dag_edge_types(&ct.dag_edge_types);
572 }
573 result
574 })();
575 let duration = started.elapsed();
576 let outcome = query_outcome_from_result(&result);
577 self.plugin.post_query(sql, duration, &outcome);
578 result.map(strip_internal_row_id)
579 }
580
581 fn execute_graph_insert(
583 &self,
584 ins: &contextdb_parser::ast::Insert,
585 params: &HashMap<String, Value>,
586 tx: Option<TxId>,
587 ) -> Result<QueryResult> {
588 use crate::executor::resolve_expr;
589
590 let col_index = |name: &str| {
591 ins.columns
592 .iter()
593 .position(|c| c.eq_ignore_ascii_case(name))
594 };
595 let source_idx = col_index("source_id")
596 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires source_id column".into()))?;
597 let target_idx = col_index("target_id")
598 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires target_id column".into()))?;
599 let edge_type_idx = col_index("edge_type")
600 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires edge_type column".into()))?;
601
602 let auto_commit = tx.is_none();
603 let tx = tx.unwrap_or_else(|| self.begin());
604 let mut count = 0u64;
605 for row_exprs in &ins.values {
606 let source = resolve_expr(&row_exprs[source_idx], params)?;
607 let target = resolve_expr(&row_exprs[target_idx], params)?;
608 let edge_type = resolve_expr(&row_exprs[edge_type_idx], params)?;
609
610 let source_uuid = match &source {
611 Value::Uuid(u) => *u,
612 Value::Text(t) => uuid::Uuid::parse_str(t)
613 .map_err(|e| Error::PlanError(format!("invalid source_id uuid: {e}")))?,
614 _ => return Err(Error::PlanError("source_id must be UUID".into())),
615 };
616 let target_uuid = match &target {
617 Value::Uuid(u) => *u,
618 Value::Text(t) => uuid::Uuid::parse_str(t)
619 .map_err(|e| Error::PlanError(format!("invalid target_id uuid: {e}")))?,
620 _ => return Err(Error::PlanError("target_id must be UUID".into())),
621 };
622 let edge_type_str = match &edge_type {
623 Value::Text(t) => t.clone(),
624 _ => return Err(Error::PlanError("edge_type must be TEXT".into())),
625 };
626
627 self.insert_edge(
628 tx,
629 source_uuid,
630 target_uuid,
631 edge_type_str,
632 Default::default(),
633 )?;
634 count += 1;
635 }
636
637 if auto_commit {
638 self.commit_with_source(tx, CommitSource::AutoCommit)?;
639 }
640
641 Ok(QueryResult::empty_with_affected(count))
642 }
643
644 fn ddl_change_for_statement(&self, stmt: &Statement) -> Option<DdlChange> {
645 match stmt {
646 Statement::CreateTable(ct) => Some(ddl_change_from_create_table(ct)),
647 Statement::DropTable(dt) => Some(DdlChange::DropTable {
648 name: dt.name.clone(),
649 }),
650 Statement::AlterTable(at) => {
651 let mut meta = self.table_meta(&at.table).unwrap_or_default();
652 match &at.action {
654 AlterAction::AddColumn(col) => {
655 meta.columns.push(contextdb_core::ColumnDef {
656 name: col.name.clone(),
657 column_type: crate::executor::map_column_type(&col.data_type),
658 nullable: col.nullable,
659 primary_key: col.primary_key,
660 unique: col.unique,
661 default: col
662 .default
663 .as_ref()
664 .map(crate::executor::stored_default_expr),
665 expires: col.expires,
666 });
667 if col.expires {
668 meta.expires_column = Some(col.name.clone());
669 }
670 }
671 AlterAction::DropColumn(name) => {
672 meta.columns.retain(|c| c.name != *name);
673 if meta.expires_column.as_deref() == Some(name.as_str()) {
674 meta.expires_column = None;
675 }
676 }
677 AlterAction::RenameColumn { from, to } => {
678 if let Some(c) = meta.columns.iter_mut().find(|c| c.name == *from) {
679 c.name = to.clone();
680 }
681 if meta.expires_column.as_deref() == Some(from.as_str()) {
682 meta.expires_column = Some(to.clone());
683 }
684 }
685 AlterAction::SetRetain {
686 duration_seconds,
687 sync_safe,
688 } => {
689 meta.default_ttl_seconds = Some(*duration_seconds);
690 meta.sync_safe = *sync_safe;
691 }
692 AlterAction::DropRetain => {
693 meta.default_ttl_seconds = None;
694 meta.sync_safe = false;
695 }
696 AlterAction::SetSyncConflictPolicy(_) | AlterAction::DropSyncConflictPolicy => { }
698 }
699 Some(DdlChange::AlterTable {
700 name: at.table.clone(),
701 columns: meta
702 .columns
703 .iter()
704 .map(|c| {
705 (
706 c.name.clone(),
707 sql_type_for_meta_column(c, &meta.propagation_rules),
708 )
709 })
710 .collect(),
711 constraints: create_table_constraints_from_meta(&meta),
712 })
713 }
714 _ => None,
715 }
716 }
717
718 fn pre_resolve_cte_subqueries(
721 &self,
722 stmt: &Statement,
723 params: &HashMap<String, Value>,
724 tx: Option<TxId>,
725 ) -> Result<Statement> {
726 if let Statement::Select(sel) = stmt
727 && !sel.ctes.is_empty()
728 && sel.body.where_clause.is_some()
729 {
730 use crate::executor::resolve_in_subqueries_with_ctes;
731 let resolved_where = sel
732 .body
733 .where_clause
734 .as_ref()
735 .map(|expr| resolve_in_subqueries_with_ctes(self, expr, params, tx, &sel.ctes))
736 .transpose()?;
737 let mut new_body = sel.body.clone();
738 new_body.where_clause = resolved_where;
739 Ok(Statement::Select(contextdb_parser::ast::SelectStatement {
740 ctes: sel.ctes.clone(),
741 body: new_body,
742 }))
743 } else {
744 Ok(stmt.clone())
745 }
746 }
747
748 pub fn insert_row(
749 &self,
750 tx: TxId,
751 table: &str,
752 values: HashMap<ColName, Value>,
753 ) -> Result<RowId> {
754 self.relational.insert(tx, table, values)
755 }
756
757 pub fn upsert_row(
758 &self,
759 tx: TxId,
760 table: &str,
761 conflict_col: &str,
762 values: HashMap<ColName, Value>,
763 ) -> Result<UpsertResult> {
764 let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
765 let meta = self.table_meta(table);
766 let new_state = meta
767 .as_ref()
768 .and_then(|m| m.state_machine.as_ref())
769 .and_then(|sm| values.get(&sm.column))
770 .and_then(Value::as_text)
771 .map(std::borrow::ToOwned::to_owned);
772
773 let result = self
774 .relational
775 .upsert(tx, table, conflict_col, values, self.snapshot())?;
776
777 if let (Some(uuid), Some(state), Some(_meta)) =
778 (row_uuid, new_state.as_deref(), meta.as_ref())
779 && matches!(result, UpsertResult::Updated)
780 {
781 self.propagate_state_change_if_needed(tx, table, Some(uuid), Some(state))?;
782 }
783
784 Ok(result)
785 }
786
787 pub(crate) fn propagate_state_change_if_needed(
788 &self,
789 tx: TxId,
790 table: &str,
791 row_uuid: Option<uuid::Uuid>,
792 new_state: Option<&str>,
793 ) -> Result<()> {
794 if let (Some(uuid), Some(state)) = (row_uuid, new_state) {
795 let already_propagating = self
796 .tx_mgr
797 .with_write_set(tx, |ws| ws.propagation_in_progress)?;
798 if !already_propagating {
799 self.tx_mgr
800 .with_write_set(tx, |ws| ws.propagation_in_progress = true)?;
801 let propagate_result = self.propagate(tx, table, uuid, state);
802 self.tx_mgr
803 .with_write_set(tx, |ws| ws.propagation_in_progress = false)?;
804 propagate_result?;
805 }
806 }
807
808 Ok(())
809 }
810
811 fn propagate(
812 &self,
813 tx: TxId,
814 table: &str,
815 row_uuid: uuid::Uuid,
816 new_state: &str,
817 ) -> Result<()> {
818 let snapshot = self.snapshot();
819 let metas = self.relational_store().table_meta.read().clone();
820 let mut queue: VecDeque<PropagationQueueEntry> = VecDeque::new();
821 let mut visited: HashSet<(String, uuid::Uuid)> = HashSet::new();
822 let mut abort_violation: Option<Error> = None;
823 let ctx = PropagationContext {
824 tx,
825 snapshot,
826 metas: &metas,
827 };
828 let root = PropagationSource {
829 table,
830 uuid: row_uuid,
831 state: new_state,
832 depth: 0,
833 };
834
835 self.enqueue_fk_children(&ctx, &mut queue, root);
836 self.enqueue_edge_children(&ctx, &mut queue, root)?;
837 self.apply_vector_exclusions(&ctx, root)?;
838
839 while let Some(entry) = queue.pop_front() {
840 if !visited.insert((entry.table.clone(), entry.uuid)) {
841 continue;
842 }
843
844 let Some(meta) = metas.get(&entry.table) else {
845 continue;
846 };
847
848 let Some(state_machine) = &meta.state_machine else {
849 let msg = format!(
850 "warning: propagation target table {} has no state machine",
851 entry.table
852 );
853 eprintln!("{msg}");
854 if entry.abort_on_failure && abort_violation.is_none() {
855 abort_violation = Some(Error::PropagationAborted {
856 table: entry.table.clone(),
857 column: String::new(),
858 from: String::new(),
859 to: entry.target_state.clone(),
860 });
861 }
862 continue;
863 };
864
865 let state_column = state_machine.column.clone();
866 let Some(existing) = self.relational.point_lookup_with_tx(
867 Some(tx),
868 &entry.table,
869 "id",
870 &Value::Uuid(entry.uuid),
871 snapshot,
872 )?
873 else {
874 continue;
875 };
876
877 let from_state = existing
878 .values
879 .get(&state_column)
880 .and_then(Value::as_text)
881 .unwrap_or("")
882 .to_string();
883
884 let mut next_values = existing.values.clone();
885 next_values.insert(
886 state_column.clone(),
887 Value::Text(entry.target_state.clone()),
888 );
889
890 let upsert_outcome =
891 self.relational
892 .upsert(tx, &entry.table, "id", next_values, snapshot);
893
894 let reached_state = match upsert_outcome {
895 Ok(UpsertResult::Updated) => entry.target_state.as_str(),
896 Ok(UpsertResult::NoOp) | Ok(UpsertResult::Inserted) => continue,
897 Err(Error::InvalidStateTransition(_)) => {
898 eprintln!(
899 "warning: skipped invalid propagated transition {}.{} {} -> {}",
900 entry.table, state_column, from_state, entry.target_state
901 );
902 if entry.abort_on_failure && abort_violation.is_none() {
903 abort_violation = Some(Error::PropagationAborted {
904 table: entry.table.clone(),
905 column: state_column.clone(),
906 from: from_state,
907 to: entry.target_state.clone(),
908 });
909 }
910 continue;
911 }
912 Err(err) => return Err(err),
913 };
914
915 self.enqueue_edge_children(
916 &ctx,
917 &mut queue,
918 PropagationSource {
919 table: &entry.table,
920 uuid: entry.uuid,
921 state: reached_state,
922 depth: entry.depth,
923 },
924 )?;
925 self.apply_vector_exclusions(
926 &ctx,
927 PropagationSource {
928 table: &entry.table,
929 uuid: entry.uuid,
930 state: reached_state,
931 depth: entry.depth,
932 },
933 )?;
934
935 self.enqueue_fk_children(
936 &ctx,
937 &mut queue,
938 PropagationSource {
939 table: &entry.table,
940 uuid: entry.uuid,
941 state: reached_state,
942 depth: entry.depth,
943 },
944 );
945 }
946
947 if let Some(err) = abort_violation {
948 return Err(err);
949 }
950
951 Ok(())
952 }
953
954 fn enqueue_fk_children(
955 &self,
956 ctx: &PropagationContext<'_>,
957 queue: &mut VecDeque<PropagationQueueEntry>,
958 source: PropagationSource<'_>,
959 ) {
960 for (owner_table, owner_meta) in ctx.metas {
961 for rule in &owner_meta.propagation_rules {
962 let PropagationRule::ForeignKey {
963 fk_column,
964 referenced_table,
965 trigger_state,
966 target_state,
967 max_depth,
968 abort_on_failure,
969 ..
970 } = rule
971 else {
972 continue;
973 };
974
975 if referenced_table != source.table || trigger_state != source.state {
976 continue;
977 }
978
979 if source.depth >= *max_depth {
980 continue;
981 }
982
983 let rows = match self.relational.scan_filter_with_tx(
984 Some(ctx.tx),
985 owner_table,
986 ctx.snapshot,
987 &|row| row.values.get(fk_column) == Some(&Value::Uuid(source.uuid)),
988 ) {
989 Ok(rows) => rows,
990 Err(err) => {
991 eprintln!(
992 "warning: propagation scan failed for {owner_table}.{fk_column}: {err}"
993 );
994 continue;
995 }
996 };
997
998 for row in rows {
999 if let Some(id) = row.values.get("id").and_then(Value::as_uuid).copied() {
1000 queue.push_back(PropagationQueueEntry {
1001 table: owner_table.clone(),
1002 uuid: id,
1003 target_state: target_state.clone(),
1004 depth: source.depth + 1,
1005 abort_on_failure: *abort_on_failure,
1006 });
1007 }
1008 }
1009 }
1010 }
1011 }
1012
1013 fn enqueue_edge_children(
1014 &self,
1015 ctx: &PropagationContext<'_>,
1016 queue: &mut VecDeque<PropagationQueueEntry>,
1017 source: PropagationSource<'_>,
1018 ) -> Result<()> {
1019 let Some(meta) = ctx.metas.get(source.table) else {
1020 return Ok(());
1021 };
1022
1023 for rule in &meta.propagation_rules {
1024 let PropagationRule::Edge {
1025 edge_type,
1026 direction,
1027 trigger_state,
1028 target_state,
1029 max_depth,
1030 abort_on_failure,
1031 } = rule
1032 else {
1033 continue;
1034 };
1035
1036 if trigger_state != source.state || source.depth >= *max_depth {
1037 continue;
1038 }
1039
1040 let bfs = self.query_bfs(
1041 source.uuid,
1042 Some(std::slice::from_ref(edge_type)),
1043 *direction,
1044 1,
1045 ctx.snapshot,
1046 )?;
1047
1048 for node in bfs.nodes {
1049 if self
1050 .relational
1051 .point_lookup_with_tx(
1052 Some(ctx.tx),
1053 source.table,
1054 "id",
1055 &Value::Uuid(node.id),
1056 ctx.snapshot,
1057 )?
1058 .is_some()
1059 {
1060 queue.push_back(PropagationQueueEntry {
1061 table: source.table.to_string(),
1062 uuid: node.id,
1063 target_state: target_state.clone(),
1064 depth: source.depth + 1,
1065 abort_on_failure: *abort_on_failure,
1066 });
1067 }
1068 }
1069 }
1070
1071 Ok(())
1072 }
1073
1074 fn apply_vector_exclusions(
1075 &self,
1076 ctx: &PropagationContext<'_>,
1077 source: PropagationSource<'_>,
1078 ) -> Result<()> {
1079 let Some(meta) = ctx.metas.get(source.table) else {
1080 return Ok(());
1081 };
1082
1083 for rule in &meta.propagation_rules {
1084 let PropagationRule::VectorExclusion { trigger_state } = rule else {
1085 continue;
1086 };
1087 if trigger_state != source.state {
1088 continue;
1089 }
1090 for row_id in self.logical_row_ids_for_uuid(ctx.tx, source.table, source.uuid) {
1091 self.delete_vector(ctx.tx, row_id)?;
1092 }
1093 }
1094
1095 Ok(())
1096 }
1097
1098 pub fn delete_row(&self, tx: TxId, table: &str, row_id: RowId) -> Result<()> {
1099 self.relational.delete(tx, table, row_id)
1100 }
1101
1102 pub fn scan(&self, table: &str, snapshot: SnapshotId) -> Result<Vec<VersionedRow>> {
1103 self.relational.scan(table, snapshot)
1104 }
1105
1106 pub fn scan_filter(
1107 &self,
1108 table: &str,
1109 snapshot: SnapshotId,
1110 predicate: &dyn Fn(&VersionedRow) -> bool,
1111 ) -> Result<Vec<VersionedRow>> {
1112 self.relational.scan_filter(table, snapshot, predicate)
1113 }
1114
1115 pub fn point_lookup(
1116 &self,
1117 table: &str,
1118 col: &str,
1119 value: &Value,
1120 snapshot: SnapshotId,
1121 ) -> Result<Option<VersionedRow>> {
1122 self.relational.point_lookup(table, col, value, snapshot)
1123 }
1124
1125 pub(crate) fn point_lookup_in_tx(
1126 &self,
1127 tx: TxId,
1128 table: &str,
1129 col: &str,
1130 value: &Value,
1131 snapshot: SnapshotId,
1132 ) -> Result<Option<VersionedRow>> {
1133 self.relational
1134 .point_lookup_with_tx(Some(tx), table, col, value, snapshot)
1135 }
1136
1137 pub(crate) fn logical_row_ids_for_uuid(
1138 &self,
1139 tx: TxId,
1140 table: &str,
1141 uuid: uuid::Uuid,
1142 ) -> Vec<RowId> {
1143 let mut row_ids = HashSet::new();
1144
1145 if let Some(rows) = self.relational_store.tables.read().get(table) {
1146 for row in rows {
1147 if row.values.get("id") == Some(&Value::Uuid(uuid)) {
1148 row_ids.insert(row.row_id);
1149 }
1150 }
1151 }
1152
1153 let _ = self.tx_mgr.with_write_set(tx, |ws| {
1154 for (insert_table, row) in &ws.relational_inserts {
1155 if insert_table == table && row.values.get("id") == Some(&Value::Uuid(uuid)) {
1156 row_ids.insert(row.row_id);
1157 }
1158 }
1159 });
1160
1161 row_ids.into_iter().collect()
1162 }
1163
1164 pub fn insert_edge(
1165 &self,
1166 tx: TxId,
1167 source: NodeId,
1168 target: NodeId,
1169 edge_type: EdgeType,
1170 properties: HashMap<String, Value>,
1171 ) -> Result<bool> {
1172 let bytes = estimate_edge_bytes(source, target, &edge_type, &properties);
1173 self.accountant.try_allocate_for(
1174 bytes,
1175 "graph_insert",
1176 "insert_edge",
1177 "Reduce edge fan-out or raise MEMORY_LIMIT before inserting more graph edges.",
1178 )?;
1179
1180 match self
1181 .graph
1182 .insert_edge(tx, source, target, edge_type, properties)
1183 {
1184 Ok(inserted) => {
1185 if !inserted {
1186 self.accountant.release(bytes);
1187 }
1188 Ok(inserted)
1189 }
1190 Err(err) => {
1191 self.accountant.release(bytes);
1192 Err(err)
1193 }
1194 }
1195 }
1196
1197 pub fn delete_edge(
1198 &self,
1199 tx: TxId,
1200 source: NodeId,
1201 target: NodeId,
1202 edge_type: &str,
1203 ) -> Result<()> {
1204 self.graph.delete_edge(tx, source, target, edge_type)
1205 }
1206
1207 pub fn query_bfs(
1208 &self,
1209 start: NodeId,
1210 edge_types: Option<&[EdgeType]>,
1211 direction: Direction,
1212 max_depth: u32,
1213 snapshot: SnapshotId,
1214 ) -> Result<TraversalResult> {
1215 self.graph
1216 .bfs(start, edge_types, direction, 1, max_depth, snapshot)
1217 }
1218
1219 pub fn edge_count(
1220 &self,
1221 source: NodeId,
1222 edge_type: &str,
1223 snapshot: SnapshotId,
1224 ) -> Result<usize> {
1225 Ok(self.graph.edge_count(source, edge_type, snapshot))
1226 }
1227
1228 pub fn get_edge_properties(
1229 &self,
1230 source: NodeId,
1231 target: NodeId,
1232 edge_type: &str,
1233 snapshot: SnapshotId,
1234 ) -> Result<Option<HashMap<String, Value>>> {
1235 let props = self
1236 .graph_store
1237 .forward_adj
1238 .read()
1239 .get(&source)
1240 .and_then(|entries| {
1241 entries
1242 .iter()
1243 .rev()
1244 .find(|entry| {
1245 entry.target == target
1246 && entry.edge_type == edge_type
1247 && entry.visible_at(snapshot)
1248 })
1249 .map(|entry| entry.properties.clone())
1250 });
1251 Ok(props)
1252 }
1253
1254 pub fn insert_vector(&self, tx: TxId, row_id: RowId, vector: Vec<f32>) -> Result<()> {
1255 let bytes = estimate_vector_bytes(&vector);
1256 self.accountant.try_allocate_for(
1257 bytes,
1258 "insert",
1259 "vector_insert",
1260 "Reduce vector dimensionality, insert fewer rows, or raise MEMORY_LIMIT.",
1261 )?;
1262 self.vector
1263 .insert_vector(tx, row_id, vector)
1264 .inspect_err(|_| self.accountant.release(bytes))
1265 }
1266
1267 pub fn delete_vector(&self, tx: TxId, row_id: RowId) -> Result<()> {
1268 self.vector.delete_vector(tx, row_id)
1269 }
1270
1271 pub fn query_vector(
1272 &self,
1273 query: &[f32],
1274 k: usize,
1275 candidates: Option<&RoaringTreemap>,
1276 snapshot: SnapshotId,
1277 ) -> Result<Vec<(RowId, f32)>> {
1278 self.vector.search(query, k, candidates, snapshot)
1279 }
1280
1281 pub fn has_live_vector(&self, row_id: RowId, snapshot: SnapshotId) -> bool {
1282 self.vector_store
1283 .vectors
1284 .read()
1285 .iter()
1286 .any(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
1287 }
1288
1289 pub fn live_vector_entry(&self, row_id: RowId, snapshot: SnapshotId) -> Option<VectorEntry> {
1290 self.vector_store
1291 .vectors
1292 .read()
1293 .iter()
1294 .rev()
1295 .find(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
1296 .cloned()
1297 }
1298
1299 pub(crate) fn drop_table_aux_state(&self, table: &str) {
1300 let snapshot = self.snapshot();
1301 let rows = self.scan(table, snapshot).unwrap_or_default();
1302 let row_ids: HashSet<RowId> = rows.iter().map(|row| row.row_id).collect();
1303 let edge_keys: HashSet<(NodeId, EdgeType, NodeId)> = rows
1304 .iter()
1305 .filter_map(|row| {
1306 match (
1307 row.values.get("source_id").and_then(Value::as_uuid),
1308 row.values.get("target_id").and_then(Value::as_uuid),
1309 row.values.get("edge_type").and_then(Value::as_text),
1310 ) {
1311 (Some(source), Some(target), Some(edge_type)) => {
1312 Some((*source, edge_type.to_string(), *target))
1313 }
1314 _ => None,
1315 }
1316 })
1317 .collect();
1318
1319 if !row_ids.is_empty() {
1320 let mut vectors = self.vector_store.vectors.write();
1321 vectors.retain(|entry| !row_ids.contains(&entry.row_id));
1322 self.vector_store.clear_hnsw(self.accountant());
1323 }
1324
1325 if !edge_keys.is_empty() {
1326 {
1327 let mut forward = self.graph_store.forward_adj.write();
1328 for entries in forward.values_mut() {
1329 entries.retain(|entry| {
1330 !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
1331 });
1332 }
1333 forward.retain(|_, entries| !entries.is_empty());
1334 }
1335 {
1336 let mut reverse = self.graph_store.reverse_adj.write();
1337 for entries in reverse.values_mut() {
1338 entries.retain(|entry| {
1339 !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
1340 });
1341 }
1342 reverse.retain(|_, entries| !entries.is_empty());
1343 }
1344 }
1345 }
1346
1347 pub fn table_names(&self) -> Vec<String> {
1348 self.relational_store.table_names()
1349 }
1350
1351 pub fn table_meta(&self, table: &str) -> Option<TableMeta> {
1352 self.relational_store.table_meta(table)
1353 }
1354
1355 pub fn run_pruning_cycle(&self) -> u64 {
1357 let _guard = self.pruning_guard.lock();
1358 prune_expired_rows(
1359 &self.relational_store,
1360 &self.graph_store,
1361 &self.vector_store,
1362 self.accountant(),
1363 self.persistence.as_ref(),
1364 self.sync_watermark(),
1365 )
1366 }
1367
1368 pub fn set_pruning_interval(&self, interval: Duration) {
1370 self.stop_pruning_thread();
1371
1372 let shutdown = Arc::new(AtomicBool::new(false));
1373 let relational = self.relational_store.clone();
1374 let graph = self.graph_store.clone();
1375 let vector = self.vector_store.clone();
1376 let accountant = self.accountant.clone();
1377 let persistence = self.persistence.clone();
1378 let sync_watermark = self.sync_watermark.clone();
1379 let pruning_guard = self.pruning_guard.clone();
1380 let thread_shutdown = shutdown.clone();
1381
1382 let handle = thread::spawn(move || {
1383 while !thread_shutdown.load(Ordering::SeqCst) {
1384 {
1385 let _guard = pruning_guard.lock();
1386 let _ = prune_expired_rows(
1387 &relational,
1388 &graph,
1389 &vector,
1390 accountant.as_ref(),
1391 persistence.as_ref(),
1392 sync_watermark.load(Ordering::SeqCst),
1393 );
1394 }
1395 sleep_with_shutdown(&thread_shutdown, interval);
1396 }
1397 });
1398
1399 let mut runtime = self.pruning_runtime.lock();
1400 runtime.shutdown = shutdown;
1401 runtime.handle = Some(handle);
1402 }
1403
1404 pub fn sync_watermark(&self) -> u64 {
1405 self.sync_watermark.load(Ordering::SeqCst)
1406 }
1407
1408 pub fn set_sync_watermark(&self, watermark: u64) {
1409 self.sync_watermark.store(watermark, Ordering::SeqCst);
1410 }
1411
1412 pub fn instance_id(&self) -> uuid::Uuid {
1413 self.instance_id
1414 }
1415
1416 pub fn open_memory_with_plugin_and_accountant(
1417 plugin: Arc<dyn DatabasePlugin>,
1418 accountant: Arc<MemoryAccountant>,
1419 ) -> Result<Self> {
1420 Self::open_memory_internal(plugin, accountant)
1421 }
1422
1423 pub fn open_memory_with_plugin(plugin: Arc<dyn DatabasePlugin>) -> Result<Self> {
1424 let db = Self::open_memory_with_plugin_and_accountant(
1425 plugin,
1426 Arc::new(MemoryAccountant::no_limit()),
1427 )?;
1428 db.plugin.on_open()?;
1429 Ok(db)
1430 }
1431
1432 pub fn close(&self) -> Result<()> {
1433 if self.closed.swap(true, Ordering::SeqCst) {
1434 return Ok(());
1435 }
1436 let tx = self.session_tx.lock().take();
1437 if let Some(tx) = tx {
1438 self.rollback(tx)?;
1439 }
1440 self.stop_pruning_thread();
1441 self.subscriptions.lock().subscribers.clear();
1442 if let Some(persistence) = &self.persistence {
1443 persistence.close();
1444 }
1445 self.plugin.on_close()
1446 }
1447
1448 pub fn open_with_plugin(
1450 path: impl AsRef<Path>,
1451 plugin: Arc<dyn DatabasePlugin>,
1452 ) -> Result<Self> {
1453 let db = Self::open_loaded(path, plugin, Arc::new(MemoryAccountant::no_limit()), None)?;
1454 db.plugin.on_open()?;
1455 Ok(db)
1456 }
1457
1458 pub fn open_with_config(
1460 path: impl AsRef<Path>,
1461 plugin: Arc<dyn DatabasePlugin>,
1462 accountant: Arc<MemoryAccountant>,
1463 ) -> Result<Self> {
1464 Self::open_with_config_and_disk_limit(path, plugin, accountant, None)
1465 }
1466
1467 pub fn open_with_config_and_disk_limit(
1468 path: impl AsRef<Path>,
1469 plugin: Arc<dyn DatabasePlugin>,
1470 accountant: Arc<MemoryAccountant>,
1471 startup_disk_limit: Option<u64>,
1472 ) -> Result<Self> {
1473 let path = path.as_ref();
1474 if path.as_os_str() == ":memory:" {
1475 return Self::open_memory_with_plugin_and_accountant(plugin, accountant);
1476 }
1477 let accountant = if let Some(limit) = persisted_memory_limit(path)? {
1478 let usage = accountant.usage();
1479 if usage.limit.is_none() && usage.startup_ceiling.is_none() {
1480 Arc::new(MemoryAccountant::with_budget(limit))
1481 } else {
1482 accountant
1483 }
1484 } else {
1485 accountant
1486 };
1487 let db = Self::open_loaded(path, plugin, accountant, startup_disk_limit)?;
1488 db.plugin.on_open()?;
1489 Ok(db)
1490 }
1491
1492 pub fn open_memory_with_accountant(accountant: Arc<MemoryAccountant>) -> Self {
1494 Self::open_memory_internal(Arc::new(CorePlugin), accountant)
1495 .expect("failed to open in-memory database with accountant")
1496 }
1497
1498 pub fn accountant(&self) -> &MemoryAccountant {
1500 &self.accountant
1501 }
1502
1503 fn account_loaded_state(&self) -> Result<()> {
1504 let metadata_bytes = self
1505 .relational_store
1506 .table_meta
1507 .read()
1508 .values()
1509 .fold(0usize, |acc, meta| {
1510 acc.saturating_add(meta.estimated_bytes())
1511 });
1512 self.accountant.try_allocate_for(
1513 metadata_bytes,
1514 "open",
1515 "load_table_metadata",
1516 "Open the database with a larger MEMORY_LIMIT or reduce stored schema metadata.",
1517 )?;
1518
1519 let row_bytes =
1520 self.relational_store
1521 .tables
1522 .read()
1523 .iter()
1524 .fold(0usize, |acc, (table, rows)| {
1525 let meta = self.table_meta(table);
1526 acc.saturating_add(rows.iter().fold(0usize, |inner, row| {
1527 inner.saturating_add(meta.as_ref().map_or_else(
1528 || row.estimated_bytes(),
1529 |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
1530 ))
1531 }))
1532 });
1533 self.accountant.try_allocate_for(
1534 row_bytes,
1535 "open",
1536 "load_rows",
1537 "Open the database with a larger MEMORY_LIMIT or prune retained rows first.",
1538 )?;
1539
1540 let edge_bytes = self
1541 .graph_store
1542 .forward_adj
1543 .read()
1544 .values()
1545 .flatten()
1546 .filter(|edge| edge.deleted_tx.is_none())
1547 .fold(0usize, |acc, edge| {
1548 acc.saturating_add(edge.estimated_bytes())
1549 });
1550 self.accountant.try_allocate_for(
1551 edge_bytes,
1552 "open",
1553 "load_edges",
1554 "Open the database with a larger MEMORY_LIMIT or reduce graph edge volume.",
1555 )?;
1556
1557 let vector_bytes = self
1558 .vector_store
1559 .vectors
1560 .read()
1561 .iter()
1562 .filter(|entry| entry.deleted_tx.is_none())
1563 .fold(0usize, |acc, entry| {
1564 acc.saturating_add(entry.estimated_bytes())
1565 });
1566 self.accountant.try_allocate_for(
1567 vector_bytes,
1568 "open",
1569 "load_vectors",
1570 "Open the database with a larger MEMORY_LIMIT or reduce stored vector data.",
1571 )?;
1572
1573 Ok(())
1574 }
1575
1576 fn release_insert_allocations(&self, ws: &contextdb_tx::WriteSet) {
1577 for (table, row) in &ws.relational_inserts {
1578 let bytes = self
1579 .table_meta(table)
1580 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
1581 .unwrap_or_else(|| row.estimated_bytes());
1582 self.accountant.release(bytes);
1583 }
1584
1585 for edge in &ws.adj_inserts {
1586 self.accountant.release(edge.estimated_bytes());
1587 }
1588
1589 for entry in &ws.vector_inserts {
1590 self.accountant.release(entry.estimated_bytes());
1591 }
1592 }
1593
1594 fn release_delete_allocations(&self, ws: &contextdb_tx::WriteSet) {
1595 for (table, row_id, _) in &ws.relational_deletes {
1596 if let Some(row) = self.find_row_by_id(table, *row_id) {
1597 let bytes = self
1598 .table_meta(table)
1599 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
1600 .unwrap_or_else(|| row.estimated_bytes());
1601 self.accountant.release(bytes);
1602 }
1603 }
1604
1605 for (source, edge_type, target, _) in &ws.adj_deletes {
1606 if let Some(edge) = self.find_edge(source, target, edge_type) {
1607 self.accountant.release(edge.estimated_bytes());
1608 }
1609 }
1610
1611 for (row_id, _) in &ws.vector_deletes {
1612 if let Some(vector) = self.find_vector_by_row_id(*row_id) {
1613 self.accountant.release(vector.estimated_bytes());
1614 }
1615 }
1616
1617 if !ws.vector_deletes.is_empty() {
1618 self.vector_store.clear_hnsw(self.accountant());
1619 }
1620 }
1621
1622 fn find_row_by_id(&self, table: &str, row_id: RowId) -> Option<VersionedRow> {
1623 self.relational_store
1624 .tables
1625 .read()
1626 .get(table)
1627 .and_then(|rows| rows.iter().find(|row| row.row_id == row_id))
1628 .cloned()
1629 }
1630
1631 fn find_vector_by_row_id(&self, row_id: RowId) -> Option<VectorEntry> {
1632 self.vector_store
1633 .vectors
1634 .read()
1635 .iter()
1636 .find(|entry| entry.row_id == row_id)
1637 .cloned()
1638 }
1639
1640 fn find_edge(&self, source: &NodeId, target: &NodeId, edge_type: &str) -> Option<AdjEntry> {
1641 self.graph_store
1642 .forward_adj
1643 .read()
1644 .get(source)
1645 .and_then(|entries| {
1646 entries
1647 .iter()
1648 .find(|entry| entry.target == *target && entry.edge_type == edge_type)
1649 .cloned()
1650 })
1651 }
1652
1653 pub(crate) fn write_set_checkpoint(
1654 &self,
1655 tx: TxId,
1656 ) -> Result<(usize, usize, usize, usize, usize)> {
1657 self.tx_mgr.with_write_set(tx, |ws| {
1658 (
1659 ws.relational_inserts.len(),
1660 ws.relational_deletes.len(),
1661 ws.adj_inserts.len(),
1662 ws.vector_inserts.len(),
1663 ws.vector_deletes.len(),
1664 )
1665 })
1666 }
1667
1668 pub(crate) fn restore_write_set_checkpoint(
1669 &self,
1670 tx: TxId,
1671 checkpoint: (usize, usize, usize, usize, usize),
1672 ) -> Result<()> {
1673 self.tx_mgr.with_write_set(tx, |ws| {
1674 ws.relational_inserts.truncate(checkpoint.0);
1675 ws.relational_deletes.truncate(checkpoint.1);
1676 ws.adj_inserts.truncate(checkpoint.2);
1677 ws.vector_inserts.truncate(checkpoint.3);
1678 ws.vector_deletes.truncate(checkpoint.4);
1679 })
1680 }
1681
1682 pub fn conflict_policies(&self) -> ConflictPolicies {
1684 self.conflict_policies.read().clone()
1685 }
1686
1687 pub fn set_default_conflict_policy(&self, policy: ConflictPolicy) {
1689 self.conflict_policies.write().default = policy;
1690 }
1691
1692 pub fn set_table_conflict_policy(&self, table: &str, policy: ConflictPolicy) {
1694 self.conflict_policies
1695 .write()
1696 .per_table
1697 .insert(table.to_string(), policy);
1698 }
1699
1700 pub fn drop_table_conflict_policy(&self, table: &str) {
1702 self.conflict_policies.write().per_table.remove(table);
1703 }
1704
1705 pub fn plugin(&self) -> &dyn DatabasePlugin {
1706 self.plugin.as_ref()
1707 }
1708
1709 pub fn plugin_health(&self) -> PluginHealth {
1710 self.plugin.health()
1711 }
1712
1713 pub fn plugin_describe(&self) -> serde_json::Value {
1714 self.plugin.describe()
1715 }
1716
1717 pub(crate) fn graph(&self) -> &MemGraphExecutor<DynStore> {
1718 &self.graph
1719 }
1720
1721 pub(crate) fn relational_store(&self) -> &Arc<RelationalStore> {
1722 &self.relational_store
1723 }
1724
1725 pub(crate) fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
1726 where
1727 F: FnOnce(u64) -> R,
1728 {
1729 self.tx_mgr.allocate_ddl_lsn(f)
1730 }
1731
1732 pub(crate) fn with_commit_lock<F, R>(&self, f: F) -> R
1733 where
1734 F: FnOnce() -> R,
1735 {
1736 self.tx_mgr.with_commit_lock(f)
1737 }
1738
1739 pub(crate) fn log_create_table_ddl(&self, name: &str, meta: &TableMeta, lsn: u64) {
1740 let change = ddl_change_from_meta(name, meta);
1741 self.ddl_log.write().push((lsn, change.clone()));
1742 if let Some(persistence) = &self.persistence {
1743 let _ = persistence.append_ddl_log(lsn, &change);
1744 }
1745 }
1746
1747 pub(crate) fn log_drop_table_ddl(&self, name: &str, lsn: u64) {
1748 let change = DdlChange::DropTable {
1749 name: name.to_string(),
1750 };
1751 self.ddl_log.write().push((lsn, change.clone()));
1752 if let Some(persistence) = &self.persistence {
1753 let _ = persistence.append_ddl_log(lsn, &change);
1754 }
1755 }
1756
1757 pub(crate) fn log_alter_table_ddl(&self, name: &str, meta: &TableMeta, lsn: u64) {
1758 let change = DdlChange::AlterTable {
1759 name: name.to_string(),
1760 columns: meta
1761 .columns
1762 .iter()
1763 .map(|c| {
1764 (
1765 c.name.clone(),
1766 sql_type_for_meta_column(c, &meta.propagation_rules),
1767 )
1768 })
1769 .collect(),
1770 constraints: create_table_constraints_from_meta(meta),
1771 };
1772 self.ddl_log.write().push((lsn, change.clone()));
1773 if let Some(persistence) = &self.persistence {
1774 let _ = persistence.append_ddl_log(lsn, &change);
1775 }
1776 }
1777
1778 pub(crate) fn persist_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
1779 if let Some(persistence) = &self.persistence {
1780 persistence.flush_table_meta(name, meta)?;
1781 }
1782 Ok(())
1783 }
1784
1785 pub(crate) fn persist_memory_limit(&self, limit: Option<usize>) -> Result<()> {
1786 if let Some(persistence) = &self.persistence {
1787 match limit {
1788 Some(limit) => persistence.flush_config_value("memory_limit", &limit)?,
1789 None => persistence.remove_config_value("memory_limit")?,
1790 }
1791 }
1792 Ok(())
1793 }
1794
1795 pub fn set_disk_limit(&self, limit: Option<u64>) -> Result<()> {
1796 if self.persistence.is_none() {
1797 self.disk_limit.store(0, Ordering::SeqCst);
1798 return Ok(());
1799 }
1800
1801 let ceiling = self.disk_limit_startup_ceiling();
1802 if let Some(ceiling) = ceiling {
1803 match limit {
1804 Some(bytes) if bytes > ceiling => {
1805 return Err(Error::Other(format!(
1806 "disk limit {bytes} exceeds startup ceiling {ceiling}"
1807 )));
1808 }
1809 None => {
1810 return Err(Error::Other(
1811 "cannot remove disk limit when a startup ceiling is set".to_string(),
1812 ));
1813 }
1814 _ => {}
1815 }
1816 }
1817
1818 self.disk_limit.store(limit.unwrap_or(0), Ordering::SeqCst);
1819 Ok(())
1820 }
1821
1822 pub fn disk_limit(&self) -> Option<u64> {
1823 match self.disk_limit.load(Ordering::SeqCst) {
1824 0 => None,
1825 bytes => Some(bytes),
1826 }
1827 }
1828
1829 pub fn disk_limit_startup_ceiling(&self) -> Option<u64> {
1830 match self.disk_limit_startup_ceiling.load(Ordering::SeqCst) {
1831 0 => None,
1832 bytes => Some(bytes),
1833 }
1834 }
1835
1836 pub fn disk_file_size(&self) -> Option<u64> {
1837 self.persistence
1838 .as_ref()
1839 .map(|persistence| std::fs::metadata(persistence.path()).map(|meta| meta.len()))
1840 .transpose()
1841 .ok()
1842 .flatten()
1843 }
1844
1845 pub(crate) fn persist_disk_limit(&self, limit: Option<u64>) -> Result<()> {
1846 if let Some(persistence) = &self.persistence {
1847 match limit {
1848 Some(limit) => persistence.flush_config_value("disk_limit", &limit)?,
1849 None => persistence.remove_config_value("disk_limit")?,
1850 }
1851 }
1852 Ok(())
1853 }
1854
1855 pub fn check_disk_budget(&self, operation: &str) -> Result<()> {
1856 let Some(limit) = self.disk_limit() else {
1857 return Ok(());
1858 };
1859 let Some(current_bytes) = self.disk_file_size() else {
1860 return Ok(());
1861 };
1862 if current_bytes < limit {
1863 return Ok(());
1864 }
1865 Err(Error::DiskBudgetExceeded {
1866 operation: operation.to_string(),
1867 current_bytes,
1868 budget_limit_bytes: limit,
1869 hint: "Reduce retained file-backed data or raise DISK_LIMIT before writing more data."
1870 .to_string(),
1871 })
1872 }
1873
1874 pub fn persisted_sync_watermarks(&self, tenant_id: &str) -> Result<(u64, u64)> {
1875 let Some(persistence) = &self.persistence else {
1876 return Ok((0, 0));
1877 };
1878 let push = persistence
1879 .load_config_value::<u64>(&format!("sync_push_watermark:{tenant_id}"))?
1880 .unwrap_or(0);
1881 let pull = persistence
1882 .load_config_value::<u64>(&format!("sync_pull_watermark:{tenant_id}"))?
1883 .unwrap_or(0);
1884 Ok((push, pull))
1885 }
1886
1887 pub fn persist_sync_push_watermark(&self, tenant_id: &str, watermark: u64) -> Result<()> {
1888 if let Some(persistence) = &self.persistence {
1889 persistence
1890 .flush_config_value(&format!("sync_push_watermark:{tenant_id}"), &watermark)?;
1891 }
1892 Ok(())
1893 }
1894
1895 pub fn persist_sync_pull_watermark(&self, tenant_id: &str, watermark: u64) -> Result<()> {
1896 if let Some(persistence) = &self.persistence {
1897 persistence
1898 .flush_config_value(&format!("sync_pull_watermark:{tenant_id}"), &watermark)?;
1899 }
1900 Ok(())
1901 }
1902
1903 pub(crate) fn persist_table_rows(&self, name: &str) -> Result<()> {
1904 if let Some(persistence) = &self.persistence {
1905 let tables = self.relational_store.tables.read();
1906 if let Some(rows) = tables.get(name) {
1907 persistence.rewrite_table_rows(name, rows)?;
1908 }
1909 }
1910 Ok(())
1911 }
1912
1913 pub(crate) fn remove_persisted_table(&self, name: &str) -> Result<()> {
1914 if let Some(persistence) = &self.persistence {
1915 persistence.remove_table_meta(name)?;
1916 persistence.remove_table_data(name)?;
1917 }
1918 Ok(())
1919 }
1920
1921 pub fn change_log_since(&self, since_lsn: u64) -> Vec<ChangeLogEntry> {
1922 let log = self.change_log.read();
1923 let start = log.partition_point(|e| e.lsn() <= since_lsn);
1924 log[start..].to_vec()
1925 }
1926
1927 pub fn ddl_log_since(&self, since_lsn: u64) -> Vec<DdlChange> {
1928 let ddl = self.ddl_log.read();
1929 let start = ddl.partition_point(|(lsn, _)| *lsn <= since_lsn);
1930 ddl[start..].iter().map(|(_, c)| c.clone()).collect()
1931 }
1932
1933 #[allow(dead_code)]
1936 fn full_state_snapshot(&self) -> ChangeSet {
1937 let mut rows = Vec::new();
1938 let mut edges = Vec::new();
1939 let mut vectors = Vec::new();
1940 let mut ddl = Vec::new();
1941
1942 let meta_guard = self.relational_store.table_meta.read();
1943 let tables_guard = self.relational_store.tables.read();
1944
1945 for (name, meta) in meta_guard.iter() {
1947 ddl.push(ddl_change_from_meta(name, meta));
1948 }
1949
1950 let mut live_row_ids: HashSet<RowId> = HashSet::new();
1952 for (table_name, table_rows) in tables_guard.iter() {
1953 let meta = match meta_guard.get(table_name) {
1954 Some(m) => m,
1955 None => continue,
1956 };
1957 let key_col = meta.natural_key_column.clone().unwrap_or_else(|| {
1958 if meta
1959 .columns
1960 .iter()
1961 .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
1962 {
1963 "id".to_string()
1964 } else {
1965 String::new()
1966 }
1967 });
1968 if key_col.is_empty() {
1969 continue;
1970 }
1971 for row in table_rows.iter().filter(|r| r.deleted_tx.is_none()) {
1972 let key_val = match row.values.get(&key_col) {
1973 Some(v) => v.clone(),
1974 None => continue,
1975 };
1976 live_row_ids.insert(row.row_id);
1977 rows.push(RowChange {
1978 table: table_name.clone(),
1979 natural_key: NaturalKey {
1980 column: key_col.clone(),
1981 value: key_val,
1982 },
1983 values: row.values.clone(),
1984 deleted: false,
1985 lsn: row.lsn,
1986 });
1987 }
1988 }
1989
1990 drop(tables_guard);
1991 drop(meta_guard);
1992
1993 let fwd = self.graph_store.forward_adj.read();
1995 for (_source, entries) in fwd.iter() {
1996 for entry in entries.iter().filter(|e| e.deleted_tx.is_none()) {
1997 edges.push(EdgeChange {
1998 source: entry.source,
1999 target: entry.target,
2000 edge_type: entry.edge_type.clone(),
2001 properties: entry.properties.clone(),
2002 lsn: entry.lsn,
2003 });
2004 }
2005 }
2006 drop(fwd);
2007
2008 let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
2010 let vecs = self.vector_store.vectors.read();
2011 for entry in vecs.iter().filter(|v| v.deleted_tx.is_none()) {
2012 if !live_row_ids.contains(&entry.row_id) {
2013 continue; }
2015 if !seen_vector_rows.insert(entry.row_id) {
2016 continue; }
2018 vectors.push(VectorChange {
2019 row_id: entry.row_id,
2020 vector: entry.vector.clone(),
2021 lsn: entry.lsn,
2022 });
2023 }
2024 drop(vecs);
2025
2026 ChangeSet {
2027 rows,
2028 edges,
2029 vectors,
2030 ddl,
2031 }
2032 }
2033
2034 fn persisted_state_since(&self, since_lsn: u64) -> ChangeSet {
2035 if since_lsn == 0 {
2036 return self.full_state_snapshot();
2037 }
2038
2039 let mut rows = Vec::new();
2040 let mut edges = Vec::new();
2041 let mut vectors = Vec::new();
2042 let ddl = Vec::new();
2043
2044 let meta_guard = self.relational_store.table_meta.read();
2045 let tables_guard = self.relational_store.tables.read();
2046
2047 let mut live_row_ids: HashSet<RowId> = HashSet::new();
2048 for (table_name, table_rows) in tables_guard.iter() {
2049 let meta = match meta_guard.get(table_name) {
2050 Some(meta) => meta,
2051 None => continue,
2052 };
2053 let key_col = meta.natural_key_column.clone().unwrap_or_else(|| {
2054 if meta
2055 .columns
2056 .iter()
2057 .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
2058 {
2059 "id".to_string()
2060 } else {
2061 String::new()
2062 }
2063 });
2064 if key_col.is_empty() {
2065 continue;
2066 }
2067 for row in table_rows.iter().filter(|row| row.deleted_tx.is_none()) {
2068 live_row_ids.insert(row.row_id);
2069 if row.lsn <= since_lsn {
2070 continue;
2071 }
2072 let key_val = match row.values.get(&key_col) {
2073 Some(value) => value.clone(),
2074 None => continue,
2075 };
2076 rows.push(RowChange {
2077 table: table_name.clone(),
2078 natural_key: NaturalKey {
2079 column: key_col.clone(),
2080 value: key_val,
2081 },
2082 values: row.values.clone(),
2083 deleted: false,
2084 lsn: row.lsn,
2085 });
2086 }
2087 }
2088 drop(tables_guard);
2089 drop(meta_guard);
2090
2091 let fwd = self.graph_store.forward_adj.read();
2092 for entries in fwd.values() {
2093 for entry in entries
2094 .iter()
2095 .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
2096 {
2097 edges.push(EdgeChange {
2098 source: entry.source,
2099 target: entry.target,
2100 edge_type: entry.edge_type.clone(),
2101 properties: entry.properties.clone(),
2102 lsn: entry.lsn,
2103 });
2104 }
2105 }
2106 drop(fwd);
2107
2108 let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
2109 let vecs = self.vector_store.vectors.read();
2110 for entry in vecs
2111 .iter()
2112 .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
2113 {
2114 if !live_row_ids.contains(&entry.row_id) {
2115 continue;
2116 }
2117 if !seen_vector_rows.insert(entry.row_id) {
2118 continue;
2119 }
2120 vectors.push(VectorChange {
2121 row_id: entry.row_id,
2122 vector: entry.vector.clone(),
2123 lsn: entry.lsn,
2124 });
2125 }
2126 drop(vecs);
2127
2128 ChangeSet {
2129 rows,
2130 edges,
2131 vectors,
2132 ddl,
2133 }
2134 }
2135
2136 fn preflight_sync_apply_memory(
2137 &self,
2138 changes: &ChangeSet,
2139 policies: &ConflictPolicies,
2140 ) -> Result<()> {
2141 let usage = self.accountant.usage();
2142 let Some(limit) = usage.limit else {
2143 return Ok(());
2144 };
2145 let available = usage.available.unwrap_or(limit);
2146 let mut required = 0usize;
2147
2148 for row in &changes.rows {
2149 if row.deleted || row.values.is_empty() {
2150 continue;
2151 }
2152
2153 let policy = policies
2154 .per_table
2155 .get(&row.table)
2156 .copied()
2157 .unwrap_or(policies.default);
2158 let existing = self.point_lookup(
2159 &row.table,
2160 &row.natural_key.column,
2161 &row.natural_key.value,
2162 self.snapshot(),
2163 )?;
2164
2165 if existing.is_some()
2166 && matches!(
2167 policy,
2168 ConflictPolicy::InsertIfNotExists | ConflictPolicy::ServerWins
2169 )
2170 {
2171 continue;
2172 }
2173
2174 required = required.saturating_add(
2175 self.table_meta(&row.table)
2176 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
2177 .unwrap_or_else(|| estimate_row_value_bytes(&row.values)),
2178 );
2179 }
2180
2181 for edge in &changes.edges {
2182 required = required.saturating_add(
2183 96 + edge.edge_type.len().saturating_mul(16)
2184 + estimate_row_value_bytes(&edge.properties),
2185 );
2186 }
2187
2188 for vector in &changes.vectors {
2189 if vector.vector.is_empty() {
2190 continue;
2191 }
2192 required = required.saturating_add(
2193 24 + vector
2194 .vector
2195 .len()
2196 .saturating_mul(std::mem::size_of::<f32>()),
2197 );
2198 }
2199
2200 if required > available {
2201 return Err(Error::MemoryBudgetExceeded {
2202 subsystem: "sync".to_string(),
2203 operation: "apply_changes".to_string(),
2204 requested_bytes: required,
2205 available_bytes: available,
2206 budget_limit_bytes: limit,
2207 hint:
2208 "Reduce sync batch size, split the push, or raise MEMORY_LIMIT on the server."
2209 .to_string(),
2210 });
2211 }
2212
2213 Ok(())
2214 }
2215
2216 pub fn changes_since(&self, since_lsn: u64) -> ChangeSet {
2218 if since_lsn > self.current_lsn() {
2220 return ChangeSet::default();
2221 }
2222
2223 let log = self.change_log.read();
2226 let change_first_lsn = log.first().map(|e| e.lsn());
2227 let change_log_empty = log.is_empty();
2228 drop(log);
2229
2230 let ddl = self.ddl_log.read();
2231 let ddl_first_lsn = ddl.first().map(|(lsn, _)| *lsn);
2232 let ddl_log_empty = ddl.is_empty();
2233 drop(ddl);
2234
2235 let has_table_data = !self
2236 .relational_store
2237 .tables
2238 .read()
2239 .values()
2240 .all(|rows| rows.is_empty());
2241 let has_table_meta = !self.relational_store.table_meta.read().is_empty();
2242
2243 if change_log_empty && ddl_log_empty && (has_table_data || has_table_meta) {
2246 return self.persisted_state_since(since_lsn);
2247 }
2248
2249 let min_first_lsn = match (change_first_lsn, ddl_first_lsn) {
2251 (Some(c), Some(d)) => Some(c.min(d)),
2252 (Some(c), None) => Some(c),
2253 (None, Some(d)) => Some(d),
2254 (None, None) => None, };
2256
2257 if min_first_lsn.is_some_and(|min_lsn| min_lsn > since_lsn + 1) {
2258 return self.persisted_state_since(since_lsn);
2260 }
2261
2262 let (ddl, change_entries) = self.with_commit_lock(|| {
2263 let ddl = self.ddl_log_since(since_lsn);
2264 let changes = self.change_log_since(since_lsn);
2265 (ddl, changes)
2266 });
2267
2268 let mut rows = Vec::new();
2269 let mut edges = Vec::new();
2270 let mut vectors = Vec::new();
2271
2272 for entry in change_entries {
2273 match entry {
2274 ChangeLogEntry::RowInsert { table, row_id, lsn } => {
2275 if let Some((natural_key, values)) = self.row_change_values(&table, row_id) {
2276 rows.push(RowChange {
2277 table,
2278 natural_key,
2279 values,
2280 deleted: false,
2281 lsn,
2282 });
2283 }
2284 }
2285 ChangeLogEntry::RowDelete {
2286 table,
2287 natural_key,
2288 lsn,
2289 ..
2290 } => {
2291 let mut values = HashMap::new();
2292 values.insert("__deleted".to_string(), Value::Bool(true));
2293 rows.push(RowChange {
2294 table,
2295 natural_key,
2296 values,
2297 deleted: true,
2298 lsn,
2299 });
2300 }
2301 ChangeLogEntry::EdgeInsert {
2302 source,
2303 target,
2304 edge_type,
2305 lsn,
2306 } => {
2307 let properties = self
2308 .edge_properties(source, target, &edge_type, lsn)
2309 .unwrap_or_default();
2310 edges.push(EdgeChange {
2311 source,
2312 target,
2313 edge_type,
2314 properties,
2315 lsn,
2316 });
2317 }
2318 ChangeLogEntry::EdgeDelete {
2319 source,
2320 target,
2321 edge_type,
2322 lsn,
2323 } => {
2324 let mut properties = HashMap::new();
2325 properties.insert("__deleted".to_string(), Value::Bool(true));
2326 edges.push(EdgeChange {
2327 source,
2328 target,
2329 edge_type,
2330 properties,
2331 lsn,
2332 });
2333 }
2334 ChangeLogEntry::VectorInsert { row_id, lsn } => {
2335 if let Some(vector) = self.vector_for_row_lsn(row_id, lsn) {
2336 vectors.push(VectorChange {
2337 row_id,
2338 vector,
2339 lsn,
2340 });
2341 }
2342 }
2343 ChangeLogEntry::VectorDelete { row_id, lsn } => vectors.push(VectorChange {
2344 row_id,
2345 vector: Vec::new(),
2346 lsn,
2347 }),
2348 }
2349 }
2350
2351 let insert_max_lsn: HashMap<(String, String, String), u64> = {
2357 let mut map: HashMap<(String, String, String), u64> = HashMap::new();
2358 for r in rows.iter().filter(|r| !r.deleted) {
2359 let key = (
2360 r.table.clone(),
2361 r.natural_key.column.clone(),
2362 format!("{:?}", r.natural_key.value),
2363 );
2364 let entry = map.entry(key).or_insert(0);
2365 if r.lsn > *entry {
2366 *entry = r.lsn;
2367 }
2368 }
2369 map
2370 };
2371 rows.retain(|r| {
2372 if r.deleted {
2373 let key = (
2374 r.table.clone(),
2375 r.natural_key.column.clone(),
2376 format!("{:?}", r.natural_key.value),
2377 );
2378 match insert_max_lsn.get(&key) {
2381 Some(&insert_lsn) => insert_lsn < r.lsn,
2382 None => true,
2383 }
2384 } else {
2385 true
2386 }
2387 });
2388
2389 ChangeSet {
2390 rows,
2391 edges,
2392 vectors,
2393 ddl,
2394 }
2395 }
2396
2397 pub fn current_lsn(&self) -> u64 {
2399 self.tx_mgr.current_lsn()
2400 }
2401
2402 pub fn subscribe(&self) -> Receiver<CommitEvent> {
2405 self.subscribe_with_capacity(DEFAULT_SUBSCRIPTION_CAPACITY)
2406 }
2407
2408 pub fn subscribe_with_capacity(&self, capacity: usize) -> Receiver<CommitEvent> {
2410 let (tx, rx) = mpsc::sync_channel(capacity.max(1));
2411 self.subscriptions.lock().subscribers.push(tx);
2412 rx
2413 }
2414
2415 pub fn subscription_health(&self) -> SubscriptionMetrics {
2417 let subscriptions = self.subscriptions.lock();
2418 SubscriptionMetrics {
2419 active_channels: subscriptions.subscribers.len(),
2420 events_sent: subscriptions.events_sent,
2421 events_dropped: subscriptions.events_dropped,
2422 }
2423 }
2424
2425 pub fn apply_changes(
2427 &self,
2428 mut changes: ChangeSet,
2429 policies: &ConflictPolicies,
2430 ) -> Result<ApplyResult> {
2431 self.plugin.on_sync_pull(&mut changes)?;
2432 self.check_disk_budget("sync_pull")?;
2433 self.preflight_sync_apply_memory(&changes, policies)?;
2434
2435 let mut tx = self.begin();
2436 let mut result = ApplyResult {
2437 applied_rows: 0,
2438 skipped_rows: 0,
2439 conflicts: Vec::new(),
2440 new_lsn: self.current_lsn(),
2441 };
2442 let vector_row_ids = changes.vectors.iter().map(|v| v.row_id).collect::<Vec<_>>();
2443 let mut vector_row_map: HashMap<u64, u64> = HashMap::new();
2444 let mut vector_row_idx = 0usize;
2445 let mut failed_row_ids: HashSet<u64> = HashSet::new();
2446 let mut table_meta_cache: HashMap<String, Option<TableMeta>> = HashMap::new();
2447 let mut visible_rows_cache: HashMap<String, Vec<VersionedRow>> = HashMap::new();
2448
2449 for ddl in changes.ddl.clone() {
2450 match ddl {
2451 DdlChange::CreateTable {
2452 name,
2453 columns,
2454 constraints,
2455 } => {
2456 if self.table_meta(&name).is_some() {
2457 if let Some(local_meta) = self.table_meta(&name) {
2458 let local_cols: Vec<(String, String)> = local_meta
2459 .columns
2460 .iter()
2461 .map(|c| {
2462 let ty = match c.column_type {
2463 ColumnType::Integer => "INTEGER".to_string(),
2464 ColumnType::Real => "REAL".to_string(),
2465 ColumnType::Text => "TEXT".to_string(),
2466 ColumnType::Boolean => "BOOLEAN".to_string(),
2467 ColumnType::Json => "JSON".to_string(),
2468 ColumnType::Uuid => "UUID".to_string(),
2469 ColumnType::Vector(dim) => format!("VECTOR({dim})"),
2470 ColumnType::Timestamp => "TIMESTAMP".to_string(),
2471 };
2472 (c.name.clone(), ty)
2473 })
2474 .collect();
2475 let remote_cols: Vec<(String, String)> = columns
2476 .iter()
2477 .map(|(col_name, col_type)| {
2478 let base_type = col_type
2479 .split_whitespace()
2480 .next()
2481 .unwrap_or(col_type)
2482 .to_string();
2483 (col_name.clone(), base_type)
2484 })
2485 .collect();
2486 let mut local_sorted = local_cols.clone();
2487 local_sorted.sort();
2488 let mut remote_sorted = remote_cols.clone();
2489 remote_sorted.sort();
2490 if local_sorted != remote_sorted {
2491 result.conflicts.push(Conflict {
2492 natural_key: NaturalKey {
2493 column: "table".to_string(),
2494 value: Value::Text(name.clone()),
2495 },
2496 resolution: ConflictPolicy::ServerWins,
2497 reason: Some(format!(
2498 "schema mismatch: local columns {:?} differ from remote {:?}",
2499 local_cols, remote_cols
2500 )),
2501 });
2502 }
2503 }
2504 continue;
2505 }
2506 let mut sql = format!(
2507 "CREATE TABLE {} ({})",
2508 name,
2509 columns
2510 .iter()
2511 .map(|(col, ty)| format!("{col} {ty}"))
2512 .collect::<Vec<_>>()
2513 .join(", ")
2514 );
2515 if !constraints.is_empty() {
2516 sql.push(' ');
2517 sql.push_str(&constraints.join(" "));
2518 }
2519 self.execute_in_tx(tx, &sql, &HashMap::new())?;
2520 table_meta_cache.remove(&name);
2521 visible_rows_cache.remove(&name);
2522 }
2523 DdlChange::DropTable { name } => {
2524 if self.table_meta(&name).is_some() {
2525 self.drop_table_aux_state(&name);
2526 self.relational_store().drop_table(&name);
2527 self.remove_persisted_table(&name)?;
2528 }
2529 table_meta_cache.remove(&name);
2530 visible_rows_cache.remove(&name);
2531 }
2532 DdlChange::AlterTable {
2533 name,
2534 columns,
2535 constraints,
2536 } => {
2537 if self.table_meta(&name).is_none() {
2538 continue;
2539 }
2540 self.relational_store().table_meta.write().remove(&name);
2541 let mut sql = format!(
2542 "CREATE TABLE {} ({})",
2543 name,
2544 columns
2545 .iter()
2546 .map(|(col, ty)| format!("{col} {ty}"))
2547 .collect::<Vec<_>>()
2548 .join(", ")
2549 );
2550 if !constraints.is_empty() {
2551 sql.push(' ');
2552 sql.push_str(&constraints.join(" "));
2553 }
2554 self.execute_in_tx(tx, &sql, &HashMap::new())?;
2555 table_meta_cache.remove(&name);
2556 visible_rows_cache.remove(&name);
2557 }
2558 }
2559 }
2560
2561 self.preflight_sync_apply_memory(&changes, policies)?;
2562
2563 for row in changes.rows {
2564 if row.values.is_empty() {
2565 result.skipped_rows += 1;
2566 self.commit_with_source(tx, CommitSource::SyncPull)?;
2567 tx = self.begin();
2568 continue;
2569 }
2570
2571 let policy = policies
2572 .per_table
2573 .get(&row.table)
2574 .copied()
2575 .unwrap_or(policies.default);
2576
2577 let existing = cached_point_lookup(
2578 self,
2579 &mut visible_rows_cache,
2580 &row.table,
2581 &row.natural_key.column,
2582 &row.natural_key.value,
2583 )?;
2584 let is_delete = row.deleted;
2585 let row_has_vector = cached_table_meta(self, &mut table_meta_cache, &row.table)
2586 .is_some_and(|meta| {
2587 meta.columns
2588 .iter()
2589 .any(|col| matches!(col.column_type, ColumnType::Vector(_)))
2590 });
2591
2592 if is_delete {
2593 if let Some(local) = existing {
2594 if row_has_vector
2595 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2596 {
2597 vector_row_map.insert(*remote_row_id, local.row_id);
2598 vector_row_idx += 1;
2599 }
2600 if let Err(err) = self.delete_row(tx, &row.table, local.row_id) {
2601 result.conflicts.push(Conflict {
2602 natural_key: row.natural_key.clone(),
2603 resolution: policy,
2604 reason: Some(format!("delete failed: {err}")),
2605 });
2606 result.skipped_rows += 1;
2607 } else {
2608 remove_cached_row(&mut visible_rows_cache, &row.table, local.row_id);
2609 result.applied_rows += 1;
2610 }
2611 } else {
2612 result.skipped_rows += 1;
2613 }
2614 self.commit_with_source(tx, CommitSource::SyncPull)?;
2615 tx = self.begin();
2616 continue;
2617 }
2618
2619 let mut values = row.values.clone();
2620 values.remove("__deleted");
2621
2622 match (existing, policy) {
2623 (None, _) => {
2624 if let Some(meta) = cached_table_meta(self, &mut table_meta_cache, &row.table) {
2625 let mut constraint_error: Option<String> = None;
2626
2627 for col_def in &meta.columns {
2628 if !col_def.nullable
2629 && !col_def.primary_key
2630 && col_def.default.is_none()
2631 {
2632 match values.get(&col_def.name) {
2633 None | Some(Value::Null) => {
2634 constraint_error = Some(format!(
2635 "NOT NULL constraint violated: {}.{}",
2636 row.table, col_def.name
2637 ));
2638 break;
2639 }
2640 _ => {}
2641 }
2642 }
2643 }
2644
2645 let has_unique = meta.columns.iter().any(|c| c.unique && !c.primary_key);
2646 if constraint_error.is_none() && has_unique {
2647 for col_def in &meta.columns {
2648 if col_def.unique
2649 && !col_def.primary_key
2650 && let Some(new_val) = values.get(&col_def.name)
2651 && *new_val != Value::Null
2652 && cached_visible_rows(
2653 self,
2654 &mut visible_rows_cache,
2655 &row.table,
2656 )?
2657 .iter()
2658 .any(|r| r.values.get(&col_def.name) == Some(new_val))
2659 {
2660 constraint_error = Some(format!(
2661 "UNIQUE constraint violated: {}.{}",
2662 row.table, col_def.name
2663 ));
2664 break;
2665 }
2666 }
2667 }
2668
2669 if let Some(err_msg) = constraint_error {
2670 result.skipped_rows += 1;
2671 if row_has_vector
2672 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2673 {
2674 failed_row_ids.insert(*remote_row_id);
2675 vector_row_idx += 1;
2676 }
2677 result.conflicts.push(Conflict {
2678 natural_key: row.natural_key.clone(),
2679 resolution: policy,
2680 reason: Some(err_msg),
2681 });
2682 self.commit_with_source(tx, CommitSource::SyncPull)?;
2683 tx = self.begin();
2684 continue;
2685 }
2686 }
2687
2688 match self.insert_row(tx, &row.table, values.clone()) {
2689 Ok(new_row_id) => {
2690 record_cached_insert(
2691 &mut visible_rows_cache,
2692 &row.table,
2693 VersionedRow {
2694 row_id: new_row_id,
2695 values: values.clone(),
2696 created_tx: tx,
2697 deleted_tx: None,
2698 lsn: row.lsn,
2699 created_at: None,
2700 },
2701 );
2702 result.applied_rows += 1;
2703 if row_has_vector
2704 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2705 {
2706 vector_row_map.insert(*remote_row_id, new_row_id);
2707 vector_row_idx += 1;
2708 }
2709 }
2710 Err(err) => {
2711 if is_fatal_sync_apply_error(&err) {
2712 return Err(err);
2713 }
2714 result.skipped_rows += 1;
2715 if row_has_vector
2716 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2717 {
2718 failed_row_ids.insert(*remote_row_id);
2719 vector_row_idx += 1;
2720 }
2721 result.conflicts.push(Conflict {
2722 natural_key: row.natural_key.clone(),
2723 resolution: policy,
2724 reason: Some(format!("{err}")),
2725 });
2726 }
2727 }
2728 }
2729 (Some(local), ConflictPolicy::InsertIfNotExists) => {
2730 if row_has_vector
2731 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2732 {
2733 vector_row_map.insert(*remote_row_id, local.row_id);
2734 vector_row_idx += 1;
2735 }
2736 result.skipped_rows += 1;
2737 }
2738 (Some(_), ConflictPolicy::ServerWins) => {
2739 result.skipped_rows += 1;
2740 if row_has_vector
2741 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2742 {
2743 failed_row_ids.insert(*remote_row_id);
2744 vector_row_idx += 1;
2745 }
2746 result.conflicts.push(Conflict {
2747 natural_key: row.natural_key.clone(),
2748 resolution: ConflictPolicy::ServerWins,
2749 reason: Some("server_wins".to_string()),
2750 });
2751 }
2752 (Some(local), ConflictPolicy::LatestWins) => {
2753 if row.lsn <= local.lsn {
2754 result.skipped_rows += 1;
2755 if row_has_vector
2756 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2757 {
2758 failed_row_ids.insert(*remote_row_id);
2759 vector_row_idx += 1;
2760 }
2761 result.conflicts.push(Conflict {
2762 natural_key: row.natural_key.clone(),
2763 resolution: ConflictPolicy::LatestWins,
2764 reason: Some("local_lsn_newer_or_equal".to_string()),
2765 });
2766 } else {
2767 if let Some(meta) = self.table_meta(&row.table)
2769 && let Some(sm) = &meta.state_machine
2770 {
2771 let sm_col = sm.column.clone();
2772 let transitions = sm.transitions.clone();
2773 let incoming_state = values.get(&sm_col).and_then(|v| match v {
2774 Value::Text(s) => Some(s.clone()),
2775 _ => None,
2776 });
2777 let local_state = local.values.get(&sm_col).and_then(|v| match v {
2778 Value::Text(s) => Some(s.clone()),
2779 _ => None,
2780 });
2781
2782 if let (Some(incoming), Some(current)) = (incoming_state, local_state) {
2783 let valid = transitions
2785 .get(¤t)
2786 .is_some_and(|targets| targets.contains(&incoming));
2787 if !valid && incoming != current {
2788 result.skipped_rows += 1;
2789 if row_has_vector
2790 && let Some(remote_row_id) =
2791 vector_row_ids.get(vector_row_idx)
2792 {
2793 failed_row_ids.insert(*remote_row_id);
2794 vector_row_idx += 1;
2795 }
2796 result.conflicts.push(Conflict {
2797 natural_key: row.natural_key.clone(),
2798 resolution: ConflictPolicy::LatestWins,
2799 reason: Some(format!(
2800 "state_machine: invalid transition {} -> {} (current: {})",
2801 current, incoming, current
2802 )),
2803 });
2804 self.commit_with_source(tx, CommitSource::SyncPull)?;
2805 tx = self.begin();
2806 continue;
2807 }
2808 }
2809 }
2810
2811 match self.upsert_row(
2812 tx,
2813 &row.table,
2814 &row.natural_key.column,
2815 values.clone(),
2816 ) {
2817 Ok(_) => {
2818 visible_rows_cache.remove(&row.table);
2819 result.applied_rows += 1;
2820 if row_has_vector
2821 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2822 {
2823 if let Ok(Some(found)) = self.point_lookup_in_tx(
2824 tx,
2825 &row.table,
2826 &row.natural_key.column,
2827 &row.natural_key.value,
2828 self.snapshot(),
2829 ) {
2830 vector_row_map.insert(*remote_row_id, found.row_id);
2831 }
2832 vector_row_idx += 1;
2833 }
2834 }
2835 Err(err) => {
2836 if is_fatal_sync_apply_error(&err) {
2837 return Err(err);
2838 }
2839 result.skipped_rows += 1;
2840 if row_has_vector
2841 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2842 {
2843 failed_row_ids.insert(*remote_row_id);
2844 vector_row_idx += 1;
2845 }
2846 result.conflicts.push(Conflict {
2847 natural_key: row.natural_key.clone(),
2848 resolution: ConflictPolicy::LatestWins,
2849 reason: Some(format!("state_machine_or_constraint: {err}")),
2850 });
2851 }
2852 }
2853 }
2854 }
2855 (Some(_), ConflictPolicy::EdgeWins) => {
2856 result.conflicts.push(Conflict {
2857 natural_key: row.natural_key.clone(),
2858 resolution: ConflictPolicy::EdgeWins,
2859 reason: Some("edge_wins".to_string()),
2860 });
2861 match self.upsert_row(tx, &row.table, &row.natural_key.column, values.clone()) {
2862 Ok(_) => {
2863 visible_rows_cache.remove(&row.table);
2864 result.applied_rows += 1;
2865 if row_has_vector
2866 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2867 {
2868 if let Ok(Some(found)) = self.point_lookup_in_tx(
2869 tx,
2870 &row.table,
2871 &row.natural_key.column,
2872 &row.natural_key.value,
2873 self.snapshot(),
2874 ) {
2875 vector_row_map.insert(*remote_row_id, found.row_id);
2876 }
2877 vector_row_idx += 1;
2878 }
2879 }
2880 Err(err) => {
2881 if is_fatal_sync_apply_error(&err) {
2882 return Err(err);
2883 }
2884 result.skipped_rows += 1;
2885 if row_has_vector
2886 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2887 {
2888 failed_row_ids.insert(*remote_row_id);
2889 vector_row_idx += 1;
2890 }
2891 if let Some(last) = result.conflicts.last_mut() {
2892 last.reason = Some(format!("state_machine_or_constraint: {err}"));
2893 }
2894 }
2895 }
2896 }
2897 }
2898
2899 self.commit_with_source(tx, CommitSource::SyncPull)?;
2900 tx = self.begin();
2901 }
2902
2903 for edge in changes.edges {
2904 let is_delete = matches!(edge.properties.get("__deleted"), Some(Value::Bool(true)));
2905 if is_delete {
2906 let _ = self.delete_edge(tx, edge.source, edge.target, &edge.edge_type);
2907 } else {
2908 let _ = self.insert_edge(
2909 tx,
2910 edge.source,
2911 edge.target,
2912 edge.edge_type,
2913 edge.properties,
2914 );
2915 }
2916 }
2917
2918 for vector in changes.vectors {
2919 if failed_row_ids.contains(&vector.row_id) {
2920 continue; }
2922 let local_row_id = vector_row_map
2923 .get(&vector.row_id)
2924 .copied()
2925 .unwrap_or(vector.row_id);
2926 if vector.vector.is_empty() {
2927 let _ = self.vector.delete_vector(tx, local_row_id);
2928 } else {
2929 if self.has_live_vector(local_row_id, self.snapshot()) {
2930 let _ = self.delete_vector(tx, local_row_id);
2931 }
2932 if let Err(err) = self.insert_vector(tx, local_row_id, vector.vector)
2933 && is_fatal_sync_apply_error(&err)
2934 {
2935 return Err(err);
2936 }
2937 }
2938 }
2939
2940 self.commit_with_source(tx, CommitSource::SyncPull)?;
2941 result.new_lsn = self.current_lsn();
2942 Ok(result)
2943 }
2944
2945 fn row_change_values(
2946 &self,
2947 table: &str,
2948 row_id: RowId,
2949 ) -> Option<(NaturalKey, HashMap<String, Value>)> {
2950 let tables = self.relational_store.tables.read();
2951 let meta = self.relational_store.table_meta.read();
2952 let rows = tables.get(table)?;
2953 let row = rows.iter().find(|r| r.row_id == row_id)?;
2954 let key_col = meta
2955 .get(table)
2956 .and_then(|m| m.natural_key_column.clone())
2957 .or_else(|| {
2958 meta.get(table).and_then(|m| {
2959 m.columns
2960 .iter()
2961 .find(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
2962 .map(|_| "id".to_string())
2963 })
2964 })?;
2965
2966 let key_val = row.values.get(&key_col)?.clone();
2967 let values = row
2968 .values
2969 .iter()
2970 .map(|(k, v)| (k.clone(), v.clone()))
2971 .collect::<HashMap<_, _>>();
2972 Some((
2973 NaturalKey {
2974 column: key_col,
2975 value: key_val,
2976 },
2977 values,
2978 ))
2979 }
2980
2981 fn edge_properties(
2982 &self,
2983 source: NodeId,
2984 target: NodeId,
2985 edge_type: &str,
2986 lsn: u64,
2987 ) -> Option<HashMap<String, Value>> {
2988 self.graph_store
2989 .forward_adj
2990 .read()
2991 .get(&source)
2992 .and_then(|entries| {
2993 entries
2994 .iter()
2995 .find(|e| e.target == target && e.edge_type == edge_type && e.lsn == lsn)
2996 .map(|e| e.properties.clone())
2997 })
2998 }
2999
3000 fn vector_for_row_lsn(&self, row_id: RowId, lsn: u64) -> Option<Vec<f32>> {
3001 self.vector_store
3002 .vectors
3003 .read()
3004 .iter()
3005 .find(|v| v.row_id == row_id && v.lsn == lsn)
3006 .map(|v| v.vector.clone())
3007 }
3008}
3009
3010fn strip_internal_row_id(mut qr: QueryResult) -> QueryResult {
3011 if let Some(pos) = qr.columns.iter().position(|c| c == "row_id") {
3012 qr.columns.remove(pos);
3013 for row in &mut qr.rows {
3014 if pos < row.len() {
3015 row.remove(pos);
3016 }
3017 }
3018 }
3019 qr
3020}
3021
3022fn cached_table_meta(
3023 db: &Database,
3024 cache: &mut HashMap<String, Option<TableMeta>>,
3025 table: &str,
3026) -> Option<TableMeta> {
3027 cache
3028 .entry(table.to_string())
3029 .or_insert_with(|| db.table_meta(table))
3030 .clone()
3031}
3032
3033fn cached_visible_rows<'a>(
3034 db: &Database,
3035 cache: &'a mut HashMap<String, Vec<VersionedRow>>,
3036 table: &str,
3037) -> Result<&'a mut Vec<VersionedRow>> {
3038 if !cache.contains_key(table) {
3039 let rows = db.scan(table, db.snapshot())?;
3040 cache.insert(table.to_string(), rows);
3041 }
3042 Ok(cache.get_mut(table).expect("cached visible rows"))
3043}
3044
3045fn cached_point_lookup(
3046 db: &Database,
3047 cache: &mut HashMap<String, Vec<VersionedRow>>,
3048 table: &str,
3049 col: &str,
3050 value: &Value,
3051) -> Result<Option<VersionedRow>> {
3052 let rows = cached_visible_rows(db, cache, table)?;
3053 Ok(rows
3054 .iter()
3055 .find(|r| r.values.get(col) == Some(value))
3056 .cloned())
3057}
3058
3059fn record_cached_insert(
3060 cache: &mut HashMap<String, Vec<VersionedRow>>,
3061 table: &str,
3062 row: VersionedRow,
3063) {
3064 if let Some(rows) = cache.get_mut(table) {
3065 rows.push(row);
3066 }
3067}
3068
3069fn remove_cached_row(cache: &mut HashMap<String, Vec<VersionedRow>>, table: &str, row_id: RowId) {
3070 if let Some(rows) = cache.get_mut(table) {
3071 rows.retain(|row| row.row_id != row_id);
3072 }
3073}
3074
3075fn query_outcome_from_result(result: &Result<QueryResult>) -> QueryOutcome {
3076 match result {
3077 Ok(query_result) => QueryOutcome::Success {
3078 row_count: if query_result.rows.is_empty() {
3079 query_result.rows_affected as usize
3080 } else {
3081 query_result.rows.len()
3082 },
3083 },
3084 Err(error) => QueryOutcome::Error {
3085 error: error.to_string(),
3086 },
3087 }
3088}
3089
3090fn maybe_prebuild_hnsw(vector_store: &VectorStore, accountant: &MemoryAccountant) {
3091 if vector_store.vector_count() >= 1000 {
3092 let entries = vector_store.all_entries();
3093 let dim = vector_store.dimension().unwrap_or(0);
3094 let estimated_bytes = estimate_hnsw_index_bytes(entries.len(), dim);
3095 if accountant
3096 .try_allocate_for(
3097 estimated_bytes,
3098 "vector_index",
3099 "prebuild_hnsw",
3100 "Open the database with a larger MEMORY_LIMIT to enable HNSW indexing.",
3101 )
3102 .is_err()
3103 {
3104 return;
3105 }
3106
3107 let hnsw_opt = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
3108 HnswIndex::new(&entries, dim)
3109 }))
3110 .ok();
3111 if hnsw_opt.is_some() {
3112 vector_store.set_hnsw_bytes(estimated_bytes);
3113 } else {
3114 accountant.release(estimated_bytes);
3115 }
3116 let _ = vector_store.hnsw.set(RwLock::new(hnsw_opt));
3117 }
3118}
3119
3120fn estimate_row_bytes_for_meta(
3121 values: &HashMap<ColName, Value>,
3122 meta: &TableMeta,
3123 include_vectors: bool,
3124) -> usize {
3125 let mut bytes = 96usize;
3126 for column in &meta.columns {
3127 let Some(value) = values.get(&column.name) else {
3128 continue;
3129 };
3130 if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
3131 continue;
3132 }
3133 bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
3134 }
3135 bytes
3136}
3137
3138fn estimate_vector_bytes(vector: &[f32]) -> usize {
3139 24 + vector.len().saturating_mul(std::mem::size_of::<f32>())
3140}
3141
3142fn estimate_edge_bytes(
3143 source: NodeId,
3144 target: NodeId,
3145 edge_type: &str,
3146 properties: &HashMap<String, Value>,
3147) -> usize {
3148 AdjEntry {
3149 source,
3150 target,
3151 edge_type: edge_type.to_string(),
3152 properties: properties.clone(),
3153 created_tx: 0,
3154 deleted_tx: None,
3155 lsn: 0,
3156 }
3157 .estimated_bytes()
3158}
3159
3160fn estimate_hnsw_index_bytes(entry_count: usize, dimension: usize) -> usize {
3161 entry_count
3162 .saturating_mul(dimension)
3163 .saturating_mul(std::mem::size_of::<f32>())
3164 .saturating_mul(3)
3165}
3166
3167impl Drop for Database {
3168 fn drop(&mut self) {
3169 if self.closed.swap(true, Ordering::SeqCst) {
3170 return;
3171 }
3172 let runtime = self.pruning_runtime.get_mut();
3173 runtime.shutdown.store(true, Ordering::SeqCst);
3174 if let Some(handle) = runtime.handle.take() {
3175 let _ = handle.join();
3176 }
3177 self.subscriptions.lock().subscribers.clear();
3178 if let Some(persistence) = &self.persistence {
3179 persistence.close();
3180 }
3181 }
3182}
3183
3184fn sleep_with_shutdown(shutdown: &AtomicBool, interval: Duration) {
3185 let deadline = Instant::now() + interval;
3186 while !shutdown.load(Ordering::SeqCst) {
3187 let now = Instant::now();
3188 if now >= deadline {
3189 break;
3190 }
3191 let remaining = deadline.saturating_duration_since(now);
3192 thread::sleep(remaining.min(Duration::from_millis(50)));
3193 }
3194}
3195
3196fn prune_expired_rows(
3197 relational_store: &Arc<RelationalStore>,
3198 graph_store: &Arc<GraphStore>,
3199 vector_store: &Arc<VectorStore>,
3200 accountant: &MemoryAccountant,
3201 persistence: Option<&Arc<RedbPersistence>>,
3202 sync_watermark: u64,
3203) -> u64 {
3204 let now_millis = current_epoch_millis();
3205 let metas = relational_store.table_meta.read().clone();
3206 let mut pruned_by_table: HashMap<String, Vec<RowId>> = HashMap::new();
3207 let mut pruned_node_ids = HashSet::new();
3208 let mut released_row_bytes = 0usize;
3209
3210 {
3211 let mut tables = relational_store.tables.write();
3212 for (table_name, rows) in tables.iter_mut() {
3213 let Some(meta) = metas.get(table_name) else {
3214 continue;
3215 };
3216 if meta.default_ttl_seconds.is_none() {
3217 continue;
3218 }
3219
3220 rows.retain(|row| {
3221 if !row_is_prunable(row, meta, now_millis, sync_watermark) {
3222 return true;
3223 }
3224
3225 pruned_by_table
3226 .entry(table_name.clone())
3227 .or_default()
3228 .push(row.row_id);
3229 released_row_bytes = released_row_bytes
3230 .saturating_add(estimate_row_bytes_for_meta(&row.values, meta, false));
3231 if let Some(Value::Uuid(id)) = row.values.get("id") {
3232 pruned_node_ids.insert(*id);
3233 }
3234 false
3235 });
3236 }
3237 }
3238
3239 let pruned_row_ids: HashSet<RowId> = pruned_by_table
3240 .values()
3241 .flat_map(|rows| rows.iter().copied())
3242 .collect();
3243 if pruned_row_ids.is_empty() {
3244 return 0;
3245 }
3246
3247 let mut released_vector_bytes = 0usize;
3248 {
3249 let mut vectors = vector_store.vectors.write();
3250 vectors.retain(|entry| {
3251 if pruned_row_ids.contains(&entry.row_id) {
3252 released_vector_bytes =
3253 released_vector_bytes.saturating_add(entry.estimated_bytes());
3254 false
3255 } else {
3256 true
3257 }
3258 });
3259 }
3260 if let Some(hnsw) = vector_store.hnsw.get() {
3261 *hnsw.write() = None;
3262 }
3263
3264 let mut released_edge_bytes = 0usize;
3265 {
3266 let mut forward = graph_store.forward_adj.write();
3267 for entries in forward.values_mut() {
3268 entries.retain(|entry| {
3269 if pruned_node_ids.contains(&entry.source)
3270 || pruned_node_ids.contains(&entry.target)
3271 {
3272 released_edge_bytes =
3273 released_edge_bytes.saturating_add(entry.estimated_bytes());
3274 false
3275 } else {
3276 true
3277 }
3278 });
3279 }
3280 forward.retain(|_, entries| !entries.is_empty());
3281 }
3282 {
3283 let mut reverse = graph_store.reverse_adj.write();
3284 for entries in reverse.values_mut() {
3285 entries.retain(|entry| {
3286 !pruned_node_ids.contains(&entry.source) && !pruned_node_ids.contains(&entry.target)
3287 });
3288 }
3289 reverse.retain(|_, entries| !entries.is_empty());
3290 }
3291
3292 if let Some(persistence) = persistence {
3293 for table_name in pruned_by_table.keys() {
3294 let rows = relational_store
3295 .tables
3296 .read()
3297 .get(table_name)
3298 .cloned()
3299 .unwrap_or_default();
3300 let _ = persistence.rewrite_table_rows(table_name, &rows);
3301 }
3302
3303 let vectors = vector_store.vectors.read().clone();
3304 let edges = graph_store
3305 .forward_adj
3306 .read()
3307 .values()
3308 .flat_map(|entries| entries.iter().cloned())
3309 .collect::<Vec<_>>();
3310 let _ = persistence.rewrite_vectors(&vectors);
3311 let _ = persistence.rewrite_graph_edges(&edges);
3312 }
3313
3314 accountant.release(
3315 released_row_bytes
3316 .saturating_add(released_vector_bytes)
3317 .saturating_add(released_edge_bytes),
3318 );
3319
3320 pruned_row_ids.len() as u64
3321}
3322
3323fn row_is_prunable(
3324 row: &VersionedRow,
3325 meta: &TableMeta,
3326 now_millis: u64,
3327 sync_watermark: u64,
3328) -> bool {
3329 if meta.sync_safe && row.lsn >= sync_watermark {
3330 return false;
3331 }
3332
3333 let Some(default_ttl_seconds) = meta.default_ttl_seconds else {
3334 return false;
3335 };
3336
3337 if let Some(expires_column) = &meta.expires_column {
3338 match row.values.get(expires_column) {
3339 Some(Value::Timestamp(millis)) if *millis == i64::MAX => return false,
3340 Some(Value::Timestamp(millis)) if *millis < 0 => return true,
3341 Some(Value::Timestamp(millis)) => return (*millis as u64) <= now_millis,
3342 Some(Value::Null) | None => {}
3343 Some(_) => {}
3344 }
3345 }
3346
3347 let ttl_millis = default_ttl_seconds.saturating_mul(1000);
3348 row.created_at
3349 .map(|created_at| now_millis.saturating_sub(created_at) > ttl_millis)
3350 .unwrap_or(false)
3351}
3352
3353fn current_epoch_millis() -> u64 {
3354 std::time::SystemTime::now()
3355 .duration_since(std::time::UNIX_EPOCH)
3356 .unwrap_or_default()
3357 .as_millis() as u64
3358}
3359
3360fn max_tx_across_all(
3361 relational: &RelationalStore,
3362 graph: &GraphStore,
3363 vector: &VectorStore,
3364) -> TxId {
3365 let relational_max = relational
3366 .tables
3367 .read()
3368 .values()
3369 .flat_map(|rows| rows.iter())
3370 .flat_map(|row| std::iter::once(row.created_tx).chain(row.deleted_tx))
3371 .max()
3372 .unwrap_or(0);
3373 let graph_max = graph
3374 .forward_adj
3375 .read()
3376 .values()
3377 .flat_map(|entries| entries.iter())
3378 .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
3379 .max()
3380 .unwrap_or(0);
3381 let vector_max = vector
3382 .vectors
3383 .read()
3384 .iter()
3385 .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
3386 .max()
3387 .unwrap_or(0);
3388
3389 relational_max.max(graph_max).max(vector_max)
3390}
3391
3392fn max_lsn_across_all(
3393 relational: &RelationalStore,
3394 graph: &GraphStore,
3395 vector: &VectorStore,
3396) -> u64 {
3397 let relational_max = relational
3398 .tables
3399 .read()
3400 .values()
3401 .flat_map(|rows| rows.iter().map(|row| row.lsn))
3402 .max()
3403 .unwrap_or(0);
3404 let graph_max = graph
3405 .forward_adj
3406 .read()
3407 .values()
3408 .flat_map(|entries| entries.iter().map(|entry| entry.lsn))
3409 .max()
3410 .unwrap_or(0);
3411 let vector_max = vector
3412 .vectors
3413 .read()
3414 .iter()
3415 .map(|entry| entry.lsn)
3416 .max()
3417 .unwrap_or(0);
3418
3419 relational_max.max(graph_max).max(vector_max)
3420}
3421
3422fn persisted_memory_limit(path: &Path) -> Result<Option<usize>> {
3423 if !path.exists() {
3424 return Ok(None);
3425 }
3426 let persistence = RedbPersistence::open(path)?;
3427 let limit = persistence.load_config_value::<usize>("memory_limit")?;
3428 persistence.close();
3429 Ok(limit)
3430}
3431
3432fn is_fatal_sync_apply_error(err: &Error) -> bool {
3433 matches!(
3434 err,
3435 Error::MemoryBudgetExceeded { .. } | Error::DiskBudgetExceeded { .. }
3436 )
3437}
3438
3439fn ddl_change_from_create_table(ct: &CreateTable) -> DdlChange {
3440 DdlChange::CreateTable {
3441 name: ct.name.clone(),
3442 columns: ct
3443 .columns
3444 .iter()
3445 .map(|col| {
3446 (
3447 col.name.clone(),
3448 sql_type_for_ast_column(col, &ct.propagation_rules),
3449 )
3450 })
3451 .collect(),
3452 constraints: create_table_constraints_from_ast(ct),
3453 }
3454}
3455
3456fn ddl_change_from_meta(name: &str, meta: &TableMeta) -> DdlChange {
3457 DdlChange::CreateTable {
3458 name: name.to_string(),
3459 columns: meta
3460 .columns
3461 .iter()
3462 .map(|col| {
3463 (
3464 col.name.clone(),
3465 sql_type_for_meta_column(col, &meta.propagation_rules),
3466 )
3467 })
3468 .collect(),
3469 constraints: create_table_constraints_from_meta(meta),
3470 }
3471}
3472
3473fn sql_type_for_ast(data_type: &DataType) -> String {
3474 match data_type {
3475 DataType::Uuid => "UUID".to_string(),
3476 DataType::Text => "TEXT".to_string(),
3477 DataType::Integer => "INTEGER".to_string(),
3478 DataType::Real => "REAL".to_string(),
3479 DataType::Boolean => "BOOLEAN".to_string(),
3480 DataType::Timestamp => "TIMESTAMP".to_string(),
3481 DataType::Json => "JSON".to_string(),
3482 DataType::Vector(dim) => format!("VECTOR({dim})"),
3483 }
3484}
3485
3486fn sql_type_for_ast_column(
3487 col: &contextdb_parser::ast::ColumnDef,
3488 _rules: &[contextdb_parser::ast::AstPropagationRule],
3489) -> String {
3490 let mut ty = sql_type_for_ast(&col.data_type);
3491 if let Some(reference) = &col.references {
3492 ty.push_str(&format!(
3493 " REFERENCES {}({})",
3494 reference.table, reference.column
3495 ));
3496 for rule in &reference.propagation_rules {
3497 if let contextdb_parser::ast::AstPropagationRule::FkState {
3498 trigger_state,
3499 target_state,
3500 max_depth,
3501 abort_on_failure,
3502 } = rule
3503 {
3504 ty.push_str(&format!(
3505 " ON STATE {} PROPAGATE SET {}",
3506 trigger_state, target_state
3507 ));
3508 if max_depth.unwrap_or(10) != 10 {
3509 ty.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
3510 }
3511 if *abort_on_failure {
3512 ty.push_str(" ABORT ON FAILURE");
3513 }
3514 }
3515 }
3516 }
3517 if col.primary_key {
3518 ty.push_str(" PRIMARY KEY");
3519 }
3520 if !col.nullable && !col.primary_key {
3521 ty.push_str(" NOT NULL");
3522 }
3523 if col.unique {
3524 ty.push_str(" UNIQUE");
3525 }
3526 ty
3527}
3528
3529fn sql_type_for_meta_column(col: &contextdb_core::ColumnDef, rules: &[PropagationRule]) -> String {
3530 let mut ty = match col.column_type {
3531 ColumnType::Integer => "INTEGER".to_string(),
3532 ColumnType::Real => "REAL".to_string(),
3533 ColumnType::Text => "TEXT".to_string(),
3534 ColumnType::Boolean => "BOOLEAN".to_string(),
3535 ColumnType::Json => "JSON".to_string(),
3536 ColumnType::Uuid => "UUID".to_string(),
3537 ColumnType::Vector(dim) => format!("VECTOR({dim})"),
3538 ColumnType::Timestamp => "TIMESTAMP".to_string(),
3539 };
3540
3541 let fk_rules = rules
3542 .iter()
3543 .filter_map(|rule| match rule {
3544 PropagationRule::ForeignKey {
3545 fk_column,
3546 referenced_table,
3547 referenced_column,
3548 trigger_state,
3549 target_state,
3550 max_depth,
3551 abort_on_failure,
3552 } if fk_column == &col.name => Some((
3553 referenced_table,
3554 referenced_column,
3555 trigger_state,
3556 target_state,
3557 *max_depth,
3558 *abort_on_failure,
3559 )),
3560 _ => None,
3561 })
3562 .collect::<Vec<_>>();
3563
3564 if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
3565 ty.push_str(&format!(
3566 " REFERENCES {}({})",
3567 referenced_table, referenced_column
3568 ));
3569 for (_, _, trigger_state, target_state, max_depth, abort_on_failure) in fk_rules {
3570 ty.push_str(&format!(
3571 " ON STATE {} PROPAGATE SET {}",
3572 trigger_state, target_state
3573 ));
3574 if max_depth != 10 {
3575 ty.push_str(&format!(" MAX DEPTH {max_depth}"));
3576 }
3577 if abort_on_failure {
3578 ty.push_str(" ABORT ON FAILURE");
3579 }
3580 }
3581 }
3582 if col.primary_key {
3583 ty.push_str(" PRIMARY KEY");
3584 }
3585 if !col.nullable && !col.primary_key {
3586 ty.push_str(" NOT NULL");
3587 }
3588 if col.unique {
3589 ty.push_str(" UNIQUE");
3590 }
3591 if col.expires {
3592 ty.push_str(" EXPIRES");
3593 }
3594
3595 ty
3596}
3597
3598fn create_table_constraints_from_ast(ct: &CreateTable) -> Vec<String> {
3599 let mut constraints = Vec::new();
3600
3601 if ct.immutable {
3602 constraints.push("IMMUTABLE".to_string());
3603 }
3604
3605 if let Some(sm) = &ct.state_machine {
3606 let transitions = sm
3607 .transitions
3608 .iter()
3609 .map(|(from, tos)| format!("{from} -> [{}]", tos.join(", ")))
3610 .collect::<Vec<_>>()
3611 .join(", ");
3612 constraints.push(format!("STATE MACHINE ({}: {})", sm.column, transitions));
3613 }
3614
3615 if !ct.dag_edge_types.is_empty() {
3616 let edge_types = ct
3617 .dag_edge_types
3618 .iter()
3619 .map(|edge_type| format!("'{edge_type}'"))
3620 .collect::<Vec<_>>()
3621 .join(", ");
3622 constraints.push(format!("DAG({edge_types})"));
3623 }
3624
3625 if let Some(retain) = &ct.retain {
3626 let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(retain.duration_seconds));
3627 if retain.sync_safe {
3628 clause.push_str(" SYNC SAFE");
3629 }
3630 constraints.push(clause);
3631 }
3632
3633 for rule in &ct.propagation_rules {
3634 match rule {
3635 contextdb_parser::ast::AstPropagationRule::EdgeState {
3636 edge_type,
3637 direction,
3638 trigger_state,
3639 target_state,
3640 max_depth,
3641 abort_on_failure,
3642 } => {
3643 let mut clause = format!(
3644 "PROPAGATE ON EDGE {} {} STATE {} SET {}",
3645 edge_type, direction, trigger_state, target_state
3646 );
3647 if max_depth.unwrap_or(10) != 10 {
3648 clause.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
3649 }
3650 if *abort_on_failure {
3651 clause.push_str(" ABORT ON FAILURE");
3652 }
3653 constraints.push(clause);
3654 }
3655 contextdb_parser::ast::AstPropagationRule::VectorExclusion { trigger_state } => {
3656 constraints.push(format!(
3657 "PROPAGATE ON STATE {} EXCLUDE VECTOR",
3658 trigger_state
3659 ));
3660 }
3661 contextdb_parser::ast::AstPropagationRule::FkState { .. } => {}
3662 }
3663 }
3664
3665 constraints
3666}
3667
3668fn create_table_constraints_from_meta(meta: &TableMeta) -> Vec<String> {
3669 let mut constraints = Vec::new();
3670
3671 if meta.immutable {
3672 constraints.push("IMMUTABLE".to_string());
3673 }
3674
3675 if let Some(sm) = &meta.state_machine {
3676 let states = sm
3677 .transitions
3678 .iter()
3679 .map(|(from, to)| format!("{from} -> [{}]", to.join(", ")))
3680 .collect::<Vec<_>>()
3681 .join(", ");
3682 constraints.push(format!("STATE MACHINE ({}: {})", sm.column, states));
3683 }
3684
3685 if !meta.dag_edge_types.is_empty() {
3686 let edge_types = meta
3687 .dag_edge_types
3688 .iter()
3689 .map(|edge_type| format!("'{edge_type}'"))
3690 .collect::<Vec<_>>()
3691 .join(", ");
3692 constraints.push(format!("DAG({edge_types})"));
3693 }
3694
3695 if let Some(ttl_seconds) = meta.default_ttl_seconds {
3696 let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(ttl_seconds));
3697 if meta.sync_safe {
3698 clause.push_str(" SYNC SAFE");
3699 }
3700 constraints.push(clause);
3701 }
3702
3703 for rule in &meta.propagation_rules {
3704 match rule {
3705 PropagationRule::Edge {
3706 edge_type,
3707 direction,
3708 trigger_state,
3709 target_state,
3710 max_depth,
3711 abort_on_failure,
3712 } => {
3713 let dir = match direction {
3714 Direction::Incoming => "INCOMING",
3715 Direction::Outgoing => "OUTGOING",
3716 Direction::Both => "BOTH",
3717 };
3718 let mut clause = format!(
3719 "PROPAGATE ON EDGE {} {} STATE {} SET {}",
3720 edge_type, dir, trigger_state, target_state
3721 );
3722 if *max_depth != 10 {
3723 clause.push_str(&format!(" MAX DEPTH {max_depth}"));
3724 }
3725 if *abort_on_failure {
3726 clause.push_str(" ABORT ON FAILURE");
3727 }
3728 constraints.push(clause);
3729 }
3730 PropagationRule::VectorExclusion { trigger_state } => {
3731 constraints.push(format!(
3732 "PROPAGATE ON STATE {} EXCLUDE VECTOR",
3733 trigger_state
3734 ));
3735 }
3736 PropagationRule::ForeignKey { .. } => {}
3737 }
3738 }
3739
3740 constraints
3741}
3742
3743fn ttl_seconds_to_sql(seconds: u64) -> String {
3744 if seconds.is_multiple_of(24 * 60 * 60) {
3745 format!("{} DAYS", seconds / (24 * 60 * 60))
3746 } else if seconds.is_multiple_of(60 * 60) {
3747 format!("{} HOURS", seconds / (60 * 60))
3748 } else if seconds.is_multiple_of(60) {
3749 format!("{} MINUTES", seconds / 60)
3750 } else {
3751 format!("{seconds} SECONDS")
3752 }
3753}