1use crate::composite_store::{ChangeLogEntry, CompositeStore};
2use crate::executor::execute_plan;
3use crate::persistence::RedbPersistence;
4use crate::persistent_store::PersistentCompositeStore;
5use crate::plugin::{
6 CommitEvent, CommitSource, CorePlugin, DatabasePlugin, PluginHealth, QueryOutcome,
7 SubscriptionMetrics,
8};
9use crate::schema_enforcer::validate_dml;
10use crate::sync_types::{
11 ApplyResult, ChangeSet, Conflict, ConflictPolicies, ConflictPolicy, DdlChange, EdgeChange,
12 NaturalKey, RowChange, VectorChange,
13};
14use contextdb_core::*;
15use contextdb_graph::{GraphStore, MemGraphExecutor};
16use contextdb_parser::Statement;
17use contextdb_parser::ast::{AlterAction, CreateTable, DataType};
18use contextdb_planner::PhysicalPlan;
19use contextdb_relational::{MemRelationalExecutor, RelationalStore};
20use contextdb_tx::{TxManager, WriteSetApplicator};
21use contextdb_vector::{HnswIndex, MemVectorExecutor, VectorStore};
22use parking_lot::{Mutex, RwLock};
23use roaring::RoaringTreemap;
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::path::Path;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
28use std::sync::{Arc, OnceLock};
29use std::thread::{self, JoinHandle};
30use std::time::{Duration, Instant};
31
32type DynStore = Box<dyn WriteSetApplicator>;
33const DEFAULT_SUBSCRIPTION_CAPACITY: usize = 64;
34
35#[derive(Debug, Clone)]
36pub struct QueryResult {
37 pub columns: Vec<String>,
38 pub rows: Vec<Vec<Value>>,
39 pub rows_affected: u64,
40}
41
42impl QueryResult {
43 pub fn empty() -> Self {
44 Self {
45 columns: vec![],
46 rows: vec![],
47 rows_affected: 0,
48 }
49 }
50
51 pub fn empty_with_affected(rows_affected: u64) -> Self {
52 Self {
53 columns: vec![],
54 rows: vec![],
55 rows_affected,
56 }
57 }
58}
59
60pub struct Database {
61 tx_mgr: Arc<TxManager<DynStore>>,
62 relational_store: Arc<RelationalStore>,
63 graph_store: Arc<GraphStore>,
64 vector_store: Arc<VectorStore>,
65 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
66 ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
67 persistence: Option<Arc<RedbPersistence>>,
68 relational: MemRelationalExecutor<DynStore>,
69 graph: MemGraphExecutor<DynStore>,
70 vector: MemVectorExecutor<DynStore>,
71 session_tx: Mutex<Option<TxId>>,
72 instance_id: uuid::Uuid,
73 plugin: Arc<dyn DatabasePlugin>,
74 accountant: Arc<MemoryAccountant>,
75 conflict_policies: RwLock<ConflictPolicies>,
76 subscriptions: Mutex<SubscriptionState>,
77 pruning_runtime: Mutex<PruningRuntime>,
78 pruning_guard: Arc<Mutex<()>>,
79 disk_limit: AtomicU64,
80 disk_limit_startup_ceiling: AtomicU64,
81 sync_watermark: Arc<AtomicU64>,
82 closed: AtomicBool,
83}
84
85impl std::fmt::Debug for Database {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("Database")
88 .field("instance_id", &self.instance_id)
89 .finish_non_exhaustive()
90 }
91}
92
93#[derive(Debug, Clone)]
94struct PropagationQueueEntry {
95 table: String,
96 uuid: uuid::Uuid,
97 target_state: String,
98 depth: u32,
99 abort_on_failure: bool,
100}
101
102#[derive(Debug, Clone, Copy)]
103struct PropagationSource<'a> {
104 table: &'a str,
105 uuid: uuid::Uuid,
106 state: &'a str,
107 depth: u32,
108}
109
110#[derive(Debug, Clone, Copy)]
111struct PropagationContext<'a> {
112 tx: TxId,
113 snapshot: SnapshotId,
114 metas: &'a HashMap<String, TableMeta>,
115}
116
117#[derive(Debug)]
118struct SubscriptionState {
119 subscribers: Vec<SyncSender<CommitEvent>>,
120 events_sent: u64,
121 events_dropped: u64,
122}
123
124impl SubscriptionState {
125 fn new() -> Self {
126 Self {
127 subscribers: Vec::new(),
128 events_sent: 0,
129 events_dropped: 0,
130 }
131 }
132}
133
134#[derive(Debug)]
135struct PruningRuntime {
136 shutdown: Arc<AtomicBool>,
137 handle: Option<JoinHandle<()>>,
138}
139
140impl PruningRuntime {
141 fn new() -> Self {
142 Self {
143 shutdown: Arc::new(AtomicBool::new(false)),
144 handle: None,
145 }
146 }
147}
148
149impl Database {
150 #[allow(clippy::too_many_arguments)]
151 fn build_db(
152 tx_mgr: Arc<TxManager<DynStore>>,
153 relational: Arc<RelationalStore>,
154 graph: Arc<GraphStore>,
155 vector_store: Arc<VectorStore>,
156 hnsw: Arc<OnceLock<parking_lot::RwLock<Option<HnswIndex>>>>,
157 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
158 ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
159 persistence: Option<Arc<RedbPersistence>>,
160 plugin: Arc<dyn DatabasePlugin>,
161 accountant: Arc<MemoryAccountant>,
162 disk_limit: Option<u64>,
163 disk_limit_startup_ceiling: Option<u64>,
164 ) -> Self {
165 Self {
166 tx_mgr: tx_mgr.clone(),
167 relational_store: relational.clone(),
168 graph_store: graph.clone(),
169 vector_store: vector_store.clone(),
170 change_log,
171 ddl_log,
172 persistence,
173 relational: MemRelationalExecutor::new(relational, tx_mgr.clone()),
174 graph: MemGraphExecutor::new(graph, tx_mgr.clone()),
175 vector: MemVectorExecutor::new_with_accountant(
176 vector_store,
177 tx_mgr.clone(),
178 hnsw,
179 accountant.clone(),
180 ),
181 session_tx: Mutex::new(None),
182 instance_id: uuid::Uuid::new_v4(),
183 plugin,
184 accountant,
185 conflict_policies: RwLock::new(ConflictPolicies::uniform(ConflictPolicy::LatestWins)),
186 subscriptions: Mutex::new(SubscriptionState::new()),
187 pruning_runtime: Mutex::new(PruningRuntime::new()),
188 pruning_guard: Arc::new(Mutex::new(())),
189 disk_limit: AtomicU64::new(disk_limit.unwrap_or(0)),
190 disk_limit_startup_ceiling: AtomicU64::new(disk_limit_startup_ceiling.unwrap_or(0)),
191 sync_watermark: Arc::new(AtomicU64::new(0)),
192 closed: AtomicBool::new(false),
193 }
194 }
195
196 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
197 Self::open_with_config(
198 path,
199 Arc::new(CorePlugin),
200 Arc::new(MemoryAccountant::no_limit()),
201 )
202 }
203
204 pub fn open_memory() -> Self {
205 Self::open_memory_with_plugin_and_accountant(
206 Arc::new(CorePlugin),
207 Arc::new(MemoryAccountant::no_limit()),
208 )
209 .expect("failed to open in-memory database")
210 }
211
212 fn open_loaded(
213 path: impl AsRef<Path>,
214 plugin: Arc<dyn DatabasePlugin>,
215 mut accountant: Arc<MemoryAccountant>,
216 startup_disk_limit: Option<u64>,
217 ) -> Result<Self> {
218 let path = path.as_ref();
219 let persistence = if path.exists() {
220 Arc::new(RedbPersistence::open(path)?)
221 } else {
222 Arc::new(RedbPersistence::create(path)?)
223 };
224 if accountant.usage().limit.is_none()
225 && let Some(limit) = persistence.load_config_value::<usize>("memory_limit")?
226 {
227 accountant = Arc::new(MemoryAccountant::with_budget(limit));
228 }
229 let persisted_disk_limit = persistence.load_config_value::<u64>("disk_limit")?;
230 let startup_disk_ceiling = startup_disk_limit;
231 let effective_disk_limit = match (persisted_disk_limit, startup_disk_limit) {
232 (Some(persisted), Some(ceiling)) => Some(persisted.min(ceiling)),
233 (Some(persisted), None) => Some(persisted),
234 (None, Some(ceiling)) => Some(ceiling),
235 (None, None) => None,
236 };
237
238 let all_meta = persistence.load_all_table_meta()?;
239
240 let relational = Arc::new(RelationalStore::new());
241 for (name, meta) in &all_meta {
242 relational.create_table(name, meta.clone());
243 for row in persistence.load_relational_table(name)? {
244 relational.insert_loaded_row(name, row);
245 }
246 }
247
248 let graph = Arc::new(GraphStore::new());
249 for edge in persistence.load_forward_edges()? {
250 graph.insert_loaded_edge(edge);
251 }
252
253 let hnsw = Arc::new(OnceLock::new());
254 let vector = Arc::new(VectorStore::new(hnsw.clone()));
255 for meta in all_meta.values() {
256 for column in &meta.columns {
257 if let ColumnType::Vector(dimension) = column.column_type {
258 vector.set_dimension(dimension);
259 break;
260 }
261 }
262 }
263 for entry in persistence.load_vectors()? {
264 vector.insert_loaded_vector(entry);
265 }
266
267 let max_row_id = relational.max_row_id();
268 let max_tx = max_tx_across_all(&relational, &graph, &vector);
269 let max_lsn = max_lsn_across_all(&relational, &graph, &vector);
270 relational.set_next_row_id(max_row_id.saturating_add(1));
271
272 let change_log = Arc::new(RwLock::new(persistence.load_change_log()?));
273 let ddl_log = Arc::new(RwLock::new(persistence.load_ddl_log()?));
274 let composite = CompositeStore::new(
275 relational.clone(),
276 graph.clone(),
277 vector.clone(),
278 change_log.clone(),
279 ddl_log.clone(),
280 );
281 let persistent = PersistentCompositeStore::new(composite, persistence.clone());
282 let store: DynStore = Box::new(persistent);
283 let tx_mgr = Arc::new(TxManager::new_with_counters(
284 store,
285 max_tx.saturating_add(1),
286 max_lsn.saturating_add(1),
287 max_tx,
288 ));
289
290 let db = Self::build_db(
291 tx_mgr,
292 relational,
293 graph,
294 vector,
295 hnsw,
296 change_log,
297 ddl_log,
298 Some(persistence),
299 plugin,
300 accountant,
301 effective_disk_limit,
302 startup_disk_ceiling,
303 );
304
305 for meta in all_meta.values() {
306 if !meta.dag_edge_types.is_empty() {
307 db.graph.register_dag_edge_types(&meta.dag_edge_types);
308 }
309 }
310
311 db.account_loaded_state()?;
312 maybe_prebuild_hnsw(&db.vector_store, db.accountant());
313
314 Ok(db)
315 }
316
317 fn open_memory_internal(
318 plugin: Arc<dyn DatabasePlugin>,
319 accountant: Arc<MemoryAccountant>,
320 ) -> Result<Self> {
321 let relational = Arc::new(RelationalStore::new());
322 let graph = Arc::new(GraphStore::new());
323 let hnsw = Arc::new(OnceLock::new());
324 let vector = Arc::new(VectorStore::new(hnsw.clone()));
325 let change_log = Arc::new(RwLock::new(Vec::new()));
326 let ddl_log = Arc::new(RwLock::new(Vec::new()));
327 let store: DynStore = Box::new(CompositeStore::new(
328 relational.clone(),
329 graph.clone(),
330 vector.clone(),
331 change_log.clone(),
332 ddl_log.clone(),
333 ));
334 let tx_mgr = Arc::new(TxManager::new(store));
335
336 let db = Self::build_db(
337 tx_mgr, relational, graph, vector, hnsw, change_log, ddl_log, None, plugin, accountant,
338 None, None,
339 );
340 maybe_prebuild_hnsw(&db.vector_store, db.accountant());
341 Ok(db)
342 }
343
344 pub fn begin(&self) -> TxId {
345 self.tx_mgr.begin()
346 }
347
348 pub fn commit(&self, tx: TxId) -> Result<()> {
349 self.commit_with_source(tx, CommitSource::User)
350 }
351
352 pub fn rollback(&self, tx: TxId) -> Result<()> {
353 let ws = self.tx_mgr.cloned_write_set(tx)?;
354 self.release_insert_allocations(&ws);
355 self.tx_mgr.rollback(tx)
356 }
357
358 pub fn snapshot(&self) -> SnapshotId {
359 self.tx_mgr.snapshot()
360 }
361
362 pub fn execute(&self, sql: &str, params: &HashMap<String, Value>) -> Result<QueryResult> {
363 let stmt = contextdb_parser::parse(sql)?;
364
365 match &stmt {
366 Statement::Begin => {
367 let mut session = self.session_tx.lock();
368 if session.is_none() {
369 *session = Some(self.begin());
370 }
371 return Ok(QueryResult::empty());
372 }
373 Statement::Commit => {
374 let mut session = self.session_tx.lock();
375 if let Some(tx) = session.take() {
376 self.commit_with_source(tx, CommitSource::User)?;
377 }
378 return Ok(QueryResult::empty());
379 }
380 Statement::Rollback => {
381 let mut session = self.session_tx.lock();
382 if let Some(tx) = *session {
383 self.rollback(tx)?;
384 *session = None;
385 }
386 return Ok(QueryResult::empty());
387 }
388 _ => {}
389 }
390
391 let active_tx = *self.session_tx.lock();
392 self.execute_statement(&stmt, sql, params, active_tx)
393 }
394
395 fn execute_autocommit(
396 &self,
397 plan: &PhysicalPlan,
398 params: &HashMap<String, Value>,
399 ) -> Result<QueryResult> {
400 match plan {
401 PhysicalPlan::Insert(_) | PhysicalPlan::Delete(_) | PhysicalPlan::Update(_) => {
402 let tx = self.begin();
403 let result = execute_plan(self, plan, params, Some(tx));
404 match result {
405 Ok(qr) => {
406 self.commit_with_source(tx, CommitSource::AutoCommit)?;
407 Ok(qr)
408 }
409 Err(e) => {
410 let _ = self.rollback(tx);
411 Err(e)
412 }
413 }
414 }
415 _ => execute_plan(self, plan, params, None),
416 }
417 }
418
419 pub fn explain(&self, sql: &str) -> Result<String> {
420 let stmt = contextdb_parser::parse(sql)?;
421 let plan = contextdb_planner::plan(&stmt)?;
422 if self.vector_store.vector_count() >= 1000 && !self.vector_store.has_hnsw_index() {
423 maybe_prebuild_hnsw(&self.vector_store, self.accountant());
424 }
425 let mut output = plan.explain();
426 if self.vector_store.has_hnsw_index() {
427 output = output.replace("VectorSearch(", "HNSWSearch(");
428 output = output.replace("VectorSearch {", "HNSWSearch {");
429 } else {
430 output = output.replace("VectorSearch(", "VectorSearch(strategy=BruteForce, ");
431 output = output.replace("VectorSearch {", "VectorSearch { strategy: BruteForce,");
432 }
433 Ok(output)
434 }
435
436 pub fn execute_in_tx(
437 &self,
438 tx: TxId,
439 sql: &str,
440 params: &HashMap<String, Value>,
441 ) -> Result<QueryResult> {
442 let stmt = contextdb_parser::parse(sql)?;
443 self.execute_statement(&stmt, sql, params, Some(tx))
444 }
445
446 fn commit_with_source(&self, tx: TxId, source: CommitSource) -> Result<()> {
447 let mut ws = self.tx_mgr.cloned_write_set(tx)?;
448
449 if !ws.is_empty()
450 && let Err(err) = self.validate_foreign_keys_in_tx(tx)
451 {
452 let _ = self.rollback(tx);
453 return Err(err);
454 }
455
456 if !ws.is_empty()
457 && let Err(err) = self.plugin.pre_commit(&ws, source)
458 {
459 let _ = self.rollback(tx);
460 return Err(err);
461 }
462
463 let lsn = self.tx_mgr.commit_with_lsn(tx)?;
464
465 if !ws.is_empty() {
466 self.release_delete_allocations(&ws);
467 ws.stamp_lsn(lsn);
468 self.plugin.post_commit(&ws, source);
469 self.publish_commit_event(Self::build_commit_event(&ws, source, lsn));
470 }
471
472 Ok(())
473 }
474
475 fn build_commit_event(
476 ws: &contextdb_tx::WriteSet,
477 source: CommitSource,
478 lsn: u64,
479 ) -> CommitEvent {
480 let mut tables_changed: Vec<String> = ws
481 .relational_inserts
482 .iter()
483 .map(|(table, _)| table.clone())
484 .chain(
485 ws.relational_deletes
486 .iter()
487 .map(|(table, _, _)| table.clone()),
488 )
489 .collect::<HashSet<_>>()
490 .into_iter()
491 .collect();
492 tables_changed.sort();
493
494 CommitEvent {
495 source,
496 lsn,
497 tables_changed,
498 row_count: ws.relational_inserts.len()
499 + ws.relational_deletes.len()
500 + ws.adj_inserts.len()
501 + ws.adj_deletes.len()
502 + ws.vector_inserts.len()
503 + ws.vector_deletes.len(),
504 }
505 }
506
507 fn publish_commit_event(&self, event: CommitEvent) {
508 let mut subscriptions = self.subscriptions.lock();
509 let subscribers = std::mem::take(&mut subscriptions.subscribers);
510 let mut live_subscribers = Vec::with_capacity(subscribers.len());
511
512 for sender in subscribers {
513 match sender.try_send(event.clone()) {
514 Ok(()) => {
515 subscriptions.events_sent += 1;
516 live_subscribers.push(sender);
517 }
518 Err(TrySendError::Full(_)) => {
519 subscriptions.events_dropped += 1;
520 live_subscribers.push(sender);
521 }
522 Err(TrySendError::Disconnected(_)) => {}
523 }
524 }
525
526 subscriptions.subscribers = live_subscribers;
527 }
528
529 fn stop_pruning_thread(&self) {
530 let handle = {
531 let mut runtime = self.pruning_runtime.lock();
532 runtime.shutdown.store(true, Ordering::SeqCst);
533 let handle = runtime.handle.take();
534 runtime.shutdown = Arc::new(AtomicBool::new(false));
535 handle
536 };
537
538 if let Some(handle) = handle {
539 let _ = handle.join();
540 }
541 }
542
543 fn execute_statement(
544 &self,
545 stmt: &Statement,
546 sql: &str,
547 params: &HashMap<String, Value>,
548 tx: Option<TxId>,
549 ) -> Result<QueryResult> {
550 self.plugin.on_query(sql)?;
551
552 if let Some(change) = self.ddl_change_for_statement(stmt).as_ref() {
553 self.plugin.on_ddl(change)?;
554 }
555
556 if let Statement::Insert(ins) = stmt
558 && (ins.table.eq_ignore_ascii_case("GRAPH")
559 || ins.table.eq_ignore_ascii_case("__edges"))
560 {
561 return self.execute_graph_insert(ins, params, tx);
562 }
563
564 let started = Instant::now();
565 let result = (|| {
566 let stmt = self.pre_resolve_cte_subqueries(stmt, params, tx)?;
568 let plan = contextdb_planner::plan(&stmt)?;
569 validate_dml(&plan, self, params)?;
570 let result = match tx {
571 Some(tx) => execute_plan(self, &plan, params, Some(tx)),
572 None => self.execute_autocommit(&plan, params),
573 };
574 if result.is_ok()
575 && let Statement::CreateTable(ct) = &stmt
576 && !ct.dag_edge_types.is_empty()
577 {
578 self.graph.register_dag_edge_types(&ct.dag_edge_types);
579 }
580 result
581 })();
582 let duration = started.elapsed();
583 let outcome = query_outcome_from_result(&result);
584 self.plugin.post_query(sql, duration, &outcome);
585 result.map(strip_internal_row_id)
586 }
587
588 fn execute_graph_insert(
590 &self,
591 ins: &contextdb_parser::ast::Insert,
592 params: &HashMap<String, Value>,
593 tx: Option<TxId>,
594 ) -> Result<QueryResult> {
595 use crate::executor::resolve_expr;
596
597 let col_index = |name: &str| {
598 ins.columns
599 .iter()
600 .position(|c| c.eq_ignore_ascii_case(name))
601 };
602 let source_idx = col_index("source_id")
603 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires source_id column".into()))?;
604 let target_idx = col_index("target_id")
605 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires target_id column".into()))?;
606 let edge_type_idx = col_index("edge_type")
607 .ok_or_else(|| Error::PlanError("GRAPH INSERT requires edge_type column".into()))?;
608
609 let auto_commit = tx.is_none();
610 let tx = tx.unwrap_or_else(|| self.begin());
611 let mut count = 0u64;
612 for row_exprs in &ins.values {
613 let source = resolve_expr(&row_exprs[source_idx], params)?;
614 let target = resolve_expr(&row_exprs[target_idx], params)?;
615 let edge_type = resolve_expr(&row_exprs[edge_type_idx], params)?;
616
617 let source_uuid = match &source {
618 Value::Uuid(u) => *u,
619 Value::Text(t) => uuid::Uuid::parse_str(t)
620 .map_err(|e| Error::PlanError(format!("invalid source_id uuid: {e}")))?,
621 _ => return Err(Error::PlanError("source_id must be UUID".into())),
622 };
623 let target_uuid = match &target {
624 Value::Uuid(u) => *u,
625 Value::Text(t) => uuid::Uuid::parse_str(t)
626 .map_err(|e| Error::PlanError(format!("invalid target_id uuid: {e}")))?,
627 _ => return Err(Error::PlanError("target_id must be UUID".into())),
628 };
629 let edge_type_str = match &edge_type {
630 Value::Text(t) => t.clone(),
631 _ => return Err(Error::PlanError("edge_type must be TEXT".into())),
632 };
633
634 self.insert_edge(
635 tx,
636 source_uuid,
637 target_uuid,
638 edge_type_str,
639 Default::default(),
640 )?;
641 count += 1;
642 }
643
644 if auto_commit {
645 self.commit_with_source(tx, CommitSource::AutoCommit)?;
646 }
647
648 Ok(QueryResult::empty_with_affected(count))
649 }
650
651 fn ddl_change_for_statement(&self, stmt: &Statement) -> Option<DdlChange> {
652 match stmt {
653 Statement::CreateTable(ct) => Some(ddl_change_from_create_table(ct)),
654 Statement::DropTable(dt) => Some(DdlChange::DropTable {
655 name: dt.name.clone(),
656 }),
657 Statement::AlterTable(at) => {
658 let mut meta = self.table_meta(&at.table).unwrap_or_default();
659 match &at.action {
661 AlterAction::AddColumn(col) => {
662 meta.columns.push(contextdb_core::ColumnDef {
663 name: col.name.clone(),
664 column_type: crate::executor::map_column_type(&col.data_type),
665 nullable: col.nullable,
666 primary_key: col.primary_key,
667 unique: col.unique,
668 default: col
669 .default
670 .as_ref()
671 .map(crate::executor::stored_default_expr),
672 references: col.references.as_ref().map(|reference| {
673 contextdb_core::ForeignKeyReference {
674 table: reference.table.clone(),
675 column: reference.column.clone(),
676 }
677 }),
678 expires: col.expires,
679 });
680 if col.expires {
681 meta.expires_column = Some(col.name.clone());
682 }
683 }
684 AlterAction::DropColumn(name) => {
685 meta.columns.retain(|c| c.name != *name);
686 if meta.expires_column.as_deref() == Some(name.as_str()) {
687 meta.expires_column = None;
688 }
689 }
690 AlterAction::RenameColumn { from, to } => {
691 if let Some(c) = meta.columns.iter_mut().find(|c| c.name == *from) {
692 c.name = to.clone();
693 }
694 if meta.expires_column.as_deref() == Some(from.as_str()) {
695 meta.expires_column = Some(to.clone());
696 }
697 }
698 AlterAction::SetRetain {
699 duration_seconds,
700 sync_safe,
701 } => {
702 meta.default_ttl_seconds = Some(*duration_seconds);
703 meta.sync_safe = *sync_safe;
704 }
705 AlterAction::DropRetain => {
706 meta.default_ttl_seconds = None;
707 meta.sync_safe = false;
708 }
709 AlterAction::SetSyncConflictPolicy(_) | AlterAction::DropSyncConflictPolicy => { }
711 }
712 Some(DdlChange::AlterTable {
713 name: at.table.clone(),
714 columns: meta
715 .columns
716 .iter()
717 .map(|c| {
718 (
719 c.name.clone(),
720 sql_type_for_meta_column(c, &meta.propagation_rules),
721 )
722 })
723 .collect(),
724 constraints: create_table_constraints_from_meta(&meta),
725 })
726 }
727 _ => None,
728 }
729 }
730
731 fn pre_resolve_cte_subqueries(
734 &self,
735 stmt: &Statement,
736 params: &HashMap<String, Value>,
737 tx: Option<TxId>,
738 ) -> Result<Statement> {
739 if let Statement::Select(sel) = stmt
740 && !sel.ctes.is_empty()
741 && sel.body.where_clause.is_some()
742 {
743 use crate::executor::resolve_in_subqueries_with_ctes;
744 let resolved_where = sel
745 .body
746 .where_clause
747 .as_ref()
748 .map(|expr| resolve_in_subqueries_with_ctes(self, expr, params, tx, &sel.ctes))
749 .transpose()?;
750 let mut new_body = sel.body.clone();
751 new_body.where_clause = resolved_where;
752 Ok(Statement::Select(contextdb_parser::ast::SelectStatement {
753 ctes: sel.ctes.clone(),
754 body: new_body,
755 }))
756 } else {
757 Ok(stmt.clone())
758 }
759 }
760
761 pub fn insert_row(
762 &self,
763 tx: TxId,
764 table: &str,
765 values: HashMap<ColName, Value>,
766 ) -> Result<RowId> {
767 self.validate_row_constraints(tx, table, &values, None)?;
768 self.relational.insert(tx, table, values)
769 }
770
771 pub fn upsert_row(
772 &self,
773 tx: TxId,
774 table: &str,
775 conflict_col: &str,
776 values: HashMap<ColName, Value>,
777 ) -> Result<UpsertResult> {
778 let snapshot = self.snapshot();
779 let existing_row_id = values
780 .get(conflict_col)
781 .map(|conflict_value| {
782 self.point_lookup_in_tx(tx, table, conflict_col, conflict_value, snapshot)
783 .map(|row| row.map(|row| row.row_id))
784 })
785 .transpose()?
786 .flatten();
787 self.validate_row_constraints(tx, table, &values, existing_row_id)?;
788
789 let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
790 let meta = self.table_meta(table);
791 let new_state = meta
792 .as_ref()
793 .and_then(|m| m.state_machine.as_ref())
794 .and_then(|sm| values.get(&sm.column))
795 .and_then(Value::as_text)
796 .map(std::borrow::ToOwned::to_owned);
797
798 let result = self
799 .relational
800 .upsert(tx, table, conflict_col, values, snapshot)?;
801
802 if let (Some(uuid), Some(state), Some(_meta)) =
803 (row_uuid, new_state.as_deref(), meta.as_ref())
804 && matches!(result, UpsertResult::Updated)
805 {
806 self.propagate_state_change_if_needed(tx, table, Some(uuid), Some(state))?;
807 }
808
809 Ok(result)
810 }
811
812 fn validate_row_constraints(
813 &self,
814 tx: TxId,
815 table: &str,
816 values: &HashMap<ColName, Value>,
817 skip_row_id: Option<RowId>,
818 ) -> Result<()> {
819 let meta = self
820 .table_meta(table)
821 .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
822 let snapshot = self.snapshot();
823
824 let visible_rows =
825 self.relational
826 .scan_filter_with_tx(Some(tx), table, snapshot, &|row| {
827 skip_row_id.is_none_or(|row_id| row.row_id != row_id)
828 })?;
829
830 for column in meta
831 .columns
832 .iter()
833 .filter(|column| column.unique && !column.primary_key)
834 {
835 let Some(value) = values.get(&column.name) else {
836 continue;
837 };
838 if *value == Value::Null {
839 continue;
840 }
841 if visible_rows
842 .iter()
843 .any(|existing| existing.values.get(&column.name) == Some(value))
844 {
845 return Err(Error::UniqueViolation {
846 table: table.to_string(),
847 column: column.name.clone(),
848 });
849 }
850 }
851
852 for unique_constraint in &meta.unique_constraints {
853 let mut candidate_values = Vec::with_capacity(unique_constraint.len());
854 let mut has_null = false;
855
856 for column_name in unique_constraint {
857 match values.get(column_name) {
858 Some(Value::Null) | None => {
859 has_null = true;
860 break;
861 }
862 Some(value) => candidate_values.push(value),
863 }
864 }
865
866 if has_null {
867 continue;
868 }
869
870 if visible_rows.iter().any(|existing| {
871 unique_constraint
872 .iter()
873 .zip(candidate_values.iter())
874 .all(|(column_name, value)| existing.values.get(column_name) == Some(*value))
875 }) {
876 return Err(Error::UniqueViolation {
877 table: table.to_string(),
878 column: unique_constraint.join(","),
879 });
880 }
881 }
882
883 Ok(())
884 }
885
886 fn validate_foreign_keys_in_tx(&self, tx: TxId) -> Result<()> {
887 let snapshot = self.snapshot();
888 let relational_inserts = self
889 .tx_mgr
890 .with_write_set(tx, |ws| ws.relational_inserts.clone())?;
891
892 for (table, row) in relational_inserts {
893 let meta = self
894 .table_meta(&table)
895 .ok_or_else(|| Error::TableNotFound(table.clone()))?;
896 for column in &meta.columns {
897 let Some(reference) = &column.references else {
898 continue;
899 };
900 let Some(value) = row.values.get(&column.name) else {
901 continue;
902 };
903 if *value == Value::Null {
904 continue;
905 }
906 if self
907 .point_lookup_in_tx(tx, &reference.table, &reference.column, value, snapshot)?
908 .is_none()
909 {
910 return Err(Error::ForeignKeyViolation {
911 table: table.clone(),
912 column: column.name.clone(),
913 ref_table: reference.table.clone(),
914 });
915 }
916 }
917 }
918
919 Ok(())
920 }
921
922 pub(crate) fn propagate_state_change_if_needed(
923 &self,
924 tx: TxId,
925 table: &str,
926 row_uuid: Option<uuid::Uuid>,
927 new_state: Option<&str>,
928 ) -> Result<()> {
929 if let (Some(uuid), Some(state)) = (row_uuid, new_state) {
930 let already_propagating = self
931 .tx_mgr
932 .with_write_set(tx, |ws| ws.propagation_in_progress)?;
933 if !already_propagating {
934 self.tx_mgr
935 .with_write_set(tx, |ws| ws.propagation_in_progress = true)?;
936 let propagate_result = self.propagate(tx, table, uuid, state);
937 self.tx_mgr
938 .with_write_set(tx, |ws| ws.propagation_in_progress = false)?;
939 propagate_result?;
940 }
941 }
942
943 Ok(())
944 }
945
946 fn propagate(
947 &self,
948 tx: TxId,
949 table: &str,
950 row_uuid: uuid::Uuid,
951 new_state: &str,
952 ) -> Result<()> {
953 let snapshot = self.snapshot();
954 let metas = self.relational_store().table_meta.read().clone();
955 let mut queue: VecDeque<PropagationQueueEntry> = VecDeque::new();
956 let mut visited: HashSet<(String, uuid::Uuid)> = HashSet::new();
957 let mut abort_violation: Option<Error> = None;
958 let ctx = PropagationContext {
959 tx,
960 snapshot,
961 metas: &metas,
962 };
963 let root = PropagationSource {
964 table,
965 uuid: row_uuid,
966 state: new_state,
967 depth: 0,
968 };
969
970 self.enqueue_fk_children(&ctx, &mut queue, root);
971 self.enqueue_edge_children(&ctx, &mut queue, root)?;
972 self.apply_vector_exclusions(&ctx, root)?;
973
974 while let Some(entry) = queue.pop_front() {
975 if !visited.insert((entry.table.clone(), entry.uuid)) {
976 continue;
977 }
978
979 let Some(meta) = metas.get(&entry.table) else {
980 continue;
981 };
982
983 let Some(state_machine) = &meta.state_machine else {
984 let msg = format!(
985 "warning: propagation target table {} has no state machine",
986 entry.table
987 );
988 eprintln!("{msg}");
989 if entry.abort_on_failure && abort_violation.is_none() {
990 abort_violation = Some(Error::PropagationAborted {
991 table: entry.table.clone(),
992 column: String::new(),
993 from: String::new(),
994 to: entry.target_state.clone(),
995 });
996 }
997 continue;
998 };
999
1000 let state_column = state_machine.column.clone();
1001 let Some(existing) = self.relational.point_lookup_with_tx(
1002 Some(tx),
1003 &entry.table,
1004 "id",
1005 &Value::Uuid(entry.uuid),
1006 snapshot,
1007 )?
1008 else {
1009 continue;
1010 };
1011
1012 let from_state = existing
1013 .values
1014 .get(&state_column)
1015 .and_then(Value::as_text)
1016 .unwrap_or("")
1017 .to_string();
1018
1019 let mut next_values = existing.values.clone();
1020 next_values.insert(
1021 state_column.clone(),
1022 Value::Text(entry.target_state.clone()),
1023 );
1024
1025 let upsert_outcome =
1026 self.relational
1027 .upsert(tx, &entry.table, "id", next_values, snapshot);
1028
1029 let reached_state = match upsert_outcome {
1030 Ok(UpsertResult::Updated) => entry.target_state.as_str(),
1031 Ok(UpsertResult::NoOp) | Ok(UpsertResult::Inserted) => continue,
1032 Err(Error::InvalidStateTransition(_)) => {
1033 eprintln!(
1034 "warning: skipped invalid propagated transition {}.{} {} -> {}",
1035 entry.table, state_column, from_state, entry.target_state
1036 );
1037 if entry.abort_on_failure && abort_violation.is_none() {
1038 abort_violation = Some(Error::PropagationAborted {
1039 table: entry.table.clone(),
1040 column: state_column.clone(),
1041 from: from_state,
1042 to: entry.target_state.clone(),
1043 });
1044 }
1045 continue;
1046 }
1047 Err(err) => return Err(err),
1048 };
1049
1050 self.enqueue_edge_children(
1051 &ctx,
1052 &mut queue,
1053 PropagationSource {
1054 table: &entry.table,
1055 uuid: entry.uuid,
1056 state: reached_state,
1057 depth: entry.depth,
1058 },
1059 )?;
1060 self.apply_vector_exclusions(
1061 &ctx,
1062 PropagationSource {
1063 table: &entry.table,
1064 uuid: entry.uuid,
1065 state: reached_state,
1066 depth: entry.depth,
1067 },
1068 )?;
1069
1070 self.enqueue_fk_children(
1071 &ctx,
1072 &mut queue,
1073 PropagationSource {
1074 table: &entry.table,
1075 uuid: entry.uuid,
1076 state: reached_state,
1077 depth: entry.depth,
1078 },
1079 );
1080 }
1081
1082 if let Some(err) = abort_violation {
1083 return Err(err);
1084 }
1085
1086 Ok(())
1087 }
1088
1089 fn enqueue_fk_children(
1090 &self,
1091 ctx: &PropagationContext<'_>,
1092 queue: &mut VecDeque<PropagationQueueEntry>,
1093 source: PropagationSource<'_>,
1094 ) {
1095 for (owner_table, owner_meta) in ctx.metas {
1096 for rule in &owner_meta.propagation_rules {
1097 let PropagationRule::ForeignKey {
1098 fk_column,
1099 referenced_table,
1100 trigger_state,
1101 target_state,
1102 max_depth,
1103 abort_on_failure,
1104 ..
1105 } = rule
1106 else {
1107 continue;
1108 };
1109
1110 if referenced_table != source.table || trigger_state != source.state {
1111 continue;
1112 }
1113
1114 if source.depth >= *max_depth {
1115 continue;
1116 }
1117
1118 let rows = match self.relational.scan_filter_with_tx(
1119 Some(ctx.tx),
1120 owner_table,
1121 ctx.snapshot,
1122 &|row| row.values.get(fk_column) == Some(&Value::Uuid(source.uuid)),
1123 ) {
1124 Ok(rows) => rows,
1125 Err(err) => {
1126 eprintln!(
1127 "warning: propagation scan failed for {owner_table}.{fk_column}: {err}"
1128 );
1129 continue;
1130 }
1131 };
1132
1133 for row in rows {
1134 if let Some(id) = row.values.get("id").and_then(Value::as_uuid).copied() {
1135 queue.push_back(PropagationQueueEntry {
1136 table: owner_table.clone(),
1137 uuid: id,
1138 target_state: target_state.clone(),
1139 depth: source.depth + 1,
1140 abort_on_failure: *abort_on_failure,
1141 });
1142 }
1143 }
1144 }
1145 }
1146 }
1147
1148 fn enqueue_edge_children(
1149 &self,
1150 ctx: &PropagationContext<'_>,
1151 queue: &mut VecDeque<PropagationQueueEntry>,
1152 source: PropagationSource<'_>,
1153 ) -> Result<()> {
1154 let Some(meta) = ctx.metas.get(source.table) else {
1155 return Ok(());
1156 };
1157
1158 for rule in &meta.propagation_rules {
1159 let PropagationRule::Edge {
1160 edge_type,
1161 direction,
1162 trigger_state,
1163 target_state,
1164 max_depth,
1165 abort_on_failure,
1166 } = rule
1167 else {
1168 continue;
1169 };
1170
1171 if trigger_state != source.state || source.depth >= *max_depth {
1172 continue;
1173 }
1174
1175 let bfs = self.query_bfs(
1176 source.uuid,
1177 Some(std::slice::from_ref(edge_type)),
1178 *direction,
1179 1,
1180 ctx.snapshot,
1181 )?;
1182
1183 for node in bfs.nodes {
1184 if self
1185 .relational
1186 .point_lookup_with_tx(
1187 Some(ctx.tx),
1188 source.table,
1189 "id",
1190 &Value::Uuid(node.id),
1191 ctx.snapshot,
1192 )?
1193 .is_some()
1194 {
1195 queue.push_back(PropagationQueueEntry {
1196 table: source.table.to_string(),
1197 uuid: node.id,
1198 target_state: target_state.clone(),
1199 depth: source.depth + 1,
1200 abort_on_failure: *abort_on_failure,
1201 });
1202 }
1203 }
1204 }
1205
1206 Ok(())
1207 }
1208
1209 fn apply_vector_exclusions(
1210 &self,
1211 ctx: &PropagationContext<'_>,
1212 source: PropagationSource<'_>,
1213 ) -> Result<()> {
1214 let Some(meta) = ctx.metas.get(source.table) else {
1215 return Ok(());
1216 };
1217
1218 for rule in &meta.propagation_rules {
1219 let PropagationRule::VectorExclusion { trigger_state } = rule else {
1220 continue;
1221 };
1222 if trigger_state != source.state {
1223 continue;
1224 }
1225 for row_id in self.logical_row_ids_for_uuid(ctx.tx, source.table, source.uuid) {
1226 self.delete_vector(ctx.tx, row_id)?;
1227 }
1228 }
1229
1230 Ok(())
1231 }
1232
1233 pub fn delete_row(&self, tx: TxId, table: &str, row_id: RowId) -> Result<()> {
1234 self.relational.delete(tx, table, row_id)
1235 }
1236
1237 pub fn scan(&self, table: &str, snapshot: SnapshotId) -> Result<Vec<VersionedRow>> {
1238 self.relational.scan(table, snapshot)
1239 }
1240
1241 pub fn scan_filter(
1242 &self,
1243 table: &str,
1244 snapshot: SnapshotId,
1245 predicate: &dyn Fn(&VersionedRow) -> bool,
1246 ) -> Result<Vec<VersionedRow>> {
1247 self.relational.scan_filter(table, snapshot, predicate)
1248 }
1249
1250 pub fn point_lookup(
1251 &self,
1252 table: &str,
1253 col: &str,
1254 value: &Value,
1255 snapshot: SnapshotId,
1256 ) -> Result<Option<VersionedRow>> {
1257 self.relational.point_lookup(table, col, value, snapshot)
1258 }
1259
1260 pub(crate) fn point_lookup_in_tx(
1261 &self,
1262 tx: TxId,
1263 table: &str,
1264 col: &str,
1265 value: &Value,
1266 snapshot: SnapshotId,
1267 ) -> Result<Option<VersionedRow>> {
1268 self.relational
1269 .point_lookup_with_tx(Some(tx), table, col, value, snapshot)
1270 }
1271
1272 pub(crate) fn logical_row_ids_for_uuid(
1273 &self,
1274 tx: TxId,
1275 table: &str,
1276 uuid: uuid::Uuid,
1277 ) -> Vec<RowId> {
1278 let mut row_ids = HashSet::new();
1279
1280 if let Some(rows) = self.relational_store.tables.read().get(table) {
1281 for row in rows {
1282 if row.values.get("id") == Some(&Value::Uuid(uuid)) {
1283 row_ids.insert(row.row_id);
1284 }
1285 }
1286 }
1287
1288 let _ = self.tx_mgr.with_write_set(tx, |ws| {
1289 for (insert_table, row) in &ws.relational_inserts {
1290 if insert_table == table && row.values.get("id") == Some(&Value::Uuid(uuid)) {
1291 row_ids.insert(row.row_id);
1292 }
1293 }
1294 });
1295
1296 row_ids.into_iter().collect()
1297 }
1298
1299 pub fn insert_edge(
1300 &self,
1301 tx: TxId,
1302 source: NodeId,
1303 target: NodeId,
1304 edge_type: EdgeType,
1305 properties: HashMap<String, Value>,
1306 ) -> Result<bool> {
1307 let bytes = estimate_edge_bytes(source, target, &edge_type, &properties);
1308 self.accountant.try_allocate_for(
1309 bytes,
1310 "graph_insert",
1311 "insert_edge",
1312 "Reduce edge fan-out or raise MEMORY_LIMIT before inserting more graph edges.",
1313 )?;
1314
1315 match self
1316 .graph
1317 .insert_edge(tx, source, target, edge_type, properties)
1318 {
1319 Ok(inserted) => {
1320 if !inserted {
1321 self.accountant.release(bytes);
1322 }
1323 Ok(inserted)
1324 }
1325 Err(err) => {
1326 self.accountant.release(bytes);
1327 Err(err)
1328 }
1329 }
1330 }
1331
1332 pub fn delete_edge(
1333 &self,
1334 tx: TxId,
1335 source: NodeId,
1336 target: NodeId,
1337 edge_type: &str,
1338 ) -> Result<()> {
1339 self.graph.delete_edge(tx, source, target, edge_type)
1340 }
1341
1342 pub fn query_bfs(
1343 &self,
1344 start: NodeId,
1345 edge_types: Option<&[EdgeType]>,
1346 direction: Direction,
1347 max_depth: u32,
1348 snapshot: SnapshotId,
1349 ) -> Result<TraversalResult> {
1350 self.graph
1351 .bfs(start, edge_types, direction, 1, max_depth, snapshot)
1352 }
1353
1354 pub fn edge_count(
1355 &self,
1356 source: NodeId,
1357 edge_type: &str,
1358 snapshot: SnapshotId,
1359 ) -> Result<usize> {
1360 Ok(self.graph.edge_count(source, edge_type, snapshot))
1361 }
1362
1363 pub fn get_edge_properties(
1364 &self,
1365 source: NodeId,
1366 target: NodeId,
1367 edge_type: &str,
1368 snapshot: SnapshotId,
1369 ) -> Result<Option<HashMap<String, Value>>> {
1370 let props = self
1371 .graph_store
1372 .forward_adj
1373 .read()
1374 .get(&source)
1375 .and_then(|entries| {
1376 entries
1377 .iter()
1378 .rev()
1379 .find(|entry| {
1380 entry.target == target
1381 && entry.edge_type == edge_type
1382 && entry.visible_at(snapshot)
1383 })
1384 .map(|entry| entry.properties.clone())
1385 });
1386 Ok(props)
1387 }
1388
1389 pub fn insert_vector(&self, tx: TxId, row_id: RowId, vector: Vec<f32>) -> Result<()> {
1390 let bytes = estimate_vector_bytes(&vector);
1391 self.accountant.try_allocate_for(
1392 bytes,
1393 "insert",
1394 "vector_insert",
1395 "Reduce vector dimensionality, insert fewer rows, or raise MEMORY_LIMIT.",
1396 )?;
1397 self.vector
1398 .insert_vector(tx, row_id, vector)
1399 .inspect_err(|_| self.accountant.release(bytes))
1400 }
1401
1402 pub fn delete_vector(&self, tx: TxId, row_id: RowId) -> Result<()> {
1403 self.vector.delete_vector(tx, row_id)
1404 }
1405
1406 pub fn query_vector(
1407 &self,
1408 query: &[f32],
1409 k: usize,
1410 candidates: Option<&RoaringTreemap>,
1411 snapshot: SnapshotId,
1412 ) -> Result<Vec<(RowId, f32)>> {
1413 self.vector.search(query, k, candidates, snapshot)
1414 }
1415
1416 pub fn has_live_vector(&self, row_id: RowId, snapshot: SnapshotId) -> bool {
1417 self.vector_store
1418 .vectors
1419 .read()
1420 .iter()
1421 .any(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
1422 }
1423
1424 pub fn live_vector_entry(&self, row_id: RowId, snapshot: SnapshotId) -> Option<VectorEntry> {
1425 self.vector_store
1426 .vectors
1427 .read()
1428 .iter()
1429 .rev()
1430 .find(|entry| entry.row_id == row_id && entry.visible_at(snapshot))
1431 .cloned()
1432 }
1433
1434 pub(crate) fn drop_table_aux_state(&self, table: &str) {
1435 let snapshot = self.snapshot();
1436 let rows = self.scan(table, snapshot).unwrap_or_default();
1437 let row_ids: HashSet<RowId> = rows.iter().map(|row| row.row_id).collect();
1438 let edge_keys: HashSet<(NodeId, EdgeType, NodeId)> = rows
1439 .iter()
1440 .filter_map(|row| {
1441 match (
1442 row.values.get("source_id").and_then(Value::as_uuid),
1443 row.values.get("target_id").and_then(Value::as_uuid),
1444 row.values.get("edge_type").and_then(Value::as_text),
1445 ) {
1446 (Some(source), Some(target), Some(edge_type)) => {
1447 Some((*source, edge_type.to_string(), *target))
1448 }
1449 _ => None,
1450 }
1451 })
1452 .collect();
1453
1454 if !row_ids.is_empty() {
1455 let mut vectors = self.vector_store.vectors.write();
1456 vectors.retain(|entry| !row_ids.contains(&entry.row_id));
1457 self.vector_store.clear_hnsw(self.accountant());
1458 }
1459
1460 if !edge_keys.is_empty() {
1461 {
1462 let mut forward = self.graph_store.forward_adj.write();
1463 for entries in forward.values_mut() {
1464 entries.retain(|entry| {
1465 !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
1466 });
1467 }
1468 forward.retain(|_, entries| !entries.is_empty());
1469 }
1470 {
1471 let mut reverse = self.graph_store.reverse_adj.write();
1472 for entries in reverse.values_mut() {
1473 entries.retain(|entry| {
1474 !edge_keys.contains(&(entry.source, entry.edge_type.clone(), entry.target))
1475 });
1476 }
1477 reverse.retain(|_, entries| !entries.is_empty());
1478 }
1479 }
1480 }
1481
1482 pub fn table_names(&self) -> Vec<String> {
1483 self.relational_store.table_names()
1484 }
1485
1486 pub fn table_meta(&self, table: &str) -> Option<TableMeta> {
1487 self.relational_store.table_meta(table)
1488 }
1489
1490 pub fn run_pruning_cycle(&self) -> u64 {
1492 let _guard = self.pruning_guard.lock();
1493 prune_expired_rows(
1494 &self.relational_store,
1495 &self.graph_store,
1496 &self.vector_store,
1497 self.accountant(),
1498 self.persistence.as_ref(),
1499 self.sync_watermark(),
1500 )
1501 }
1502
1503 pub fn set_pruning_interval(&self, interval: Duration) {
1505 self.stop_pruning_thread();
1506
1507 let shutdown = Arc::new(AtomicBool::new(false));
1508 let relational = self.relational_store.clone();
1509 let graph = self.graph_store.clone();
1510 let vector = self.vector_store.clone();
1511 let accountant = self.accountant.clone();
1512 let persistence = self.persistence.clone();
1513 let sync_watermark = self.sync_watermark.clone();
1514 let pruning_guard = self.pruning_guard.clone();
1515 let thread_shutdown = shutdown.clone();
1516
1517 let handle = thread::spawn(move || {
1518 while !thread_shutdown.load(Ordering::SeqCst) {
1519 {
1520 let _guard = pruning_guard.lock();
1521 let _ = prune_expired_rows(
1522 &relational,
1523 &graph,
1524 &vector,
1525 accountant.as_ref(),
1526 persistence.as_ref(),
1527 sync_watermark.load(Ordering::SeqCst),
1528 );
1529 }
1530 sleep_with_shutdown(&thread_shutdown, interval);
1531 }
1532 });
1533
1534 let mut runtime = self.pruning_runtime.lock();
1535 runtime.shutdown = shutdown;
1536 runtime.handle = Some(handle);
1537 }
1538
1539 pub fn sync_watermark(&self) -> u64 {
1540 self.sync_watermark.load(Ordering::SeqCst)
1541 }
1542
1543 pub fn set_sync_watermark(&self, watermark: u64) {
1544 self.sync_watermark.store(watermark, Ordering::SeqCst);
1545 }
1546
1547 pub fn instance_id(&self) -> uuid::Uuid {
1548 self.instance_id
1549 }
1550
1551 pub fn open_memory_with_plugin_and_accountant(
1552 plugin: Arc<dyn DatabasePlugin>,
1553 accountant: Arc<MemoryAccountant>,
1554 ) -> Result<Self> {
1555 Self::open_memory_internal(plugin, accountant)
1556 }
1557
1558 pub fn open_memory_with_plugin(plugin: Arc<dyn DatabasePlugin>) -> Result<Self> {
1559 let db = Self::open_memory_with_plugin_and_accountant(
1560 plugin,
1561 Arc::new(MemoryAccountant::no_limit()),
1562 )?;
1563 db.plugin.on_open()?;
1564 Ok(db)
1565 }
1566
1567 pub fn close(&self) -> Result<()> {
1568 if self.closed.swap(true, Ordering::SeqCst) {
1569 return Ok(());
1570 }
1571 let tx = self.session_tx.lock().take();
1572 if let Some(tx) = tx {
1573 self.rollback(tx)?;
1574 }
1575 self.stop_pruning_thread();
1576 self.subscriptions.lock().subscribers.clear();
1577 if let Some(persistence) = &self.persistence {
1578 persistence.close();
1579 }
1580 self.plugin.on_close()
1581 }
1582
1583 pub fn open_with_plugin(
1585 path: impl AsRef<Path>,
1586 plugin: Arc<dyn DatabasePlugin>,
1587 ) -> Result<Self> {
1588 let db = Self::open_loaded(path, plugin, Arc::new(MemoryAccountant::no_limit()), None)?;
1589 db.plugin.on_open()?;
1590 Ok(db)
1591 }
1592
1593 pub fn open_with_config(
1595 path: impl AsRef<Path>,
1596 plugin: Arc<dyn DatabasePlugin>,
1597 accountant: Arc<MemoryAccountant>,
1598 ) -> Result<Self> {
1599 Self::open_with_config_and_disk_limit(path, plugin, accountant, None)
1600 }
1601
1602 pub fn open_with_config_and_disk_limit(
1603 path: impl AsRef<Path>,
1604 plugin: Arc<dyn DatabasePlugin>,
1605 accountant: Arc<MemoryAccountant>,
1606 startup_disk_limit: Option<u64>,
1607 ) -> Result<Self> {
1608 let path = path.as_ref();
1609 if path.as_os_str() == ":memory:" {
1610 return Self::open_memory_with_plugin_and_accountant(plugin, accountant);
1611 }
1612 let accountant = if let Some(limit) = persisted_memory_limit(path)? {
1613 let usage = accountant.usage();
1614 if usage.limit.is_none() && usage.startup_ceiling.is_none() {
1615 Arc::new(MemoryAccountant::with_budget(limit))
1616 } else {
1617 accountant
1618 }
1619 } else {
1620 accountant
1621 };
1622 let db = Self::open_loaded(path, plugin, accountant, startup_disk_limit)?;
1623 db.plugin.on_open()?;
1624 Ok(db)
1625 }
1626
1627 pub fn open_memory_with_accountant(accountant: Arc<MemoryAccountant>) -> Self {
1629 Self::open_memory_internal(Arc::new(CorePlugin), accountant)
1630 .expect("failed to open in-memory database with accountant")
1631 }
1632
1633 pub fn accountant(&self) -> &MemoryAccountant {
1635 &self.accountant
1636 }
1637
1638 fn account_loaded_state(&self) -> Result<()> {
1639 let metadata_bytes = self
1640 .relational_store
1641 .table_meta
1642 .read()
1643 .values()
1644 .fold(0usize, |acc, meta| {
1645 acc.saturating_add(meta.estimated_bytes())
1646 });
1647 self.accountant.try_allocate_for(
1648 metadata_bytes,
1649 "open",
1650 "load_table_metadata",
1651 "Open the database with a larger MEMORY_LIMIT or reduce stored schema metadata.",
1652 )?;
1653
1654 let row_bytes =
1655 self.relational_store
1656 .tables
1657 .read()
1658 .iter()
1659 .fold(0usize, |acc, (table, rows)| {
1660 let meta = self.table_meta(table);
1661 acc.saturating_add(rows.iter().fold(0usize, |inner, row| {
1662 inner.saturating_add(meta.as_ref().map_or_else(
1663 || row.estimated_bytes(),
1664 |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
1665 ))
1666 }))
1667 });
1668 self.accountant.try_allocate_for(
1669 row_bytes,
1670 "open",
1671 "load_rows",
1672 "Open the database with a larger MEMORY_LIMIT or prune retained rows first.",
1673 )?;
1674
1675 let edge_bytes = self
1676 .graph_store
1677 .forward_adj
1678 .read()
1679 .values()
1680 .flatten()
1681 .filter(|edge| edge.deleted_tx.is_none())
1682 .fold(0usize, |acc, edge| {
1683 acc.saturating_add(edge.estimated_bytes())
1684 });
1685 self.accountant.try_allocate_for(
1686 edge_bytes,
1687 "open",
1688 "load_edges",
1689 "Open the database with a larger MEMORY_LIMIT or reduce graph edge volume.",
1690 )?;
1691
1692 let vector_bytes = self
1693 .vector_store
1694 .vectors
1695 .read()
1696 .iter()
1697 .filter(|entry| entry.deleted_tx.is_none())
1698 .fold(0usize, |acc, entry| {
1699 acc.saturating_add(entry.estimated_bytes())
1700 });
1701 self.accountant.try_allocate_for(
1702 vector_bytes,
1703 "open",
1704 "load_vectors",
1705 "Open the database with a larger MEMORY_LIMIT or reduce stored vector data.",
1706 )?;
1707
1708 Ok(())
1709 }
1710
1711 fn release_insert_allocations(&self, ws: &contextdb_tx::WriteSet) {
1712 for (table, row) in &ws.relational_inserts {
1713 let bytes = self
1714 .table_meta(table)
1715 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
1716 .unwrap_or_else(|| row.estimated_bytes());
1717 self.accountant.release(bytes);
1718 }
1719
1720 for edge in &ws.adj_inserts {
1721 self.accountant.release(edge.estimated_bytes());
1722 }
1723
1724 for entry in &ws.vector_inserts {
1725 self.accountant.release(entry.estimated_bytes());
1726 }
1727 }
1728
1729 fn release_delete_allocations(&self, ws: &contextdb_tx::WriteSet) {
1730 for (table, row_id, _) in &ws.relational_deletes {
1731 if let Some(row) = self.find_row_by_id(table, *row_id) {
1732 let bytes = self
1733 .table_meta(table)
1734 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
1735 .unwrap_or_else(|| row.estimated_bytes());
1736 self.accountant.release(bytes);
1737 }
1738 }
1739
1740 for (source, edge_type, target, _) in &ws.adj_deletes {
1741 if let Some(edge) = self.find_edge(source, target, edge_type) {
1742 self.accountant.release(edge.estimated_bytes());
1743 }
1744 }
1745
1746 for (row_id, _) in &ws.vector_deletes {
1747 if let Some(vector) = self.find_vector_by_row_id(*row_id) {
1748 self.accountant.release(vector.estimated_bytes());
1749 }
1750 }
1751
1752 if !ws.vector_deletes.is_empty() {
1753 self.vector_store.clear_hnsw(self.accountant());
1754 }
1755 }
1756
1757 fn find_row_by_id(&self, table: &str, row_id: RowId) -> Option<VersionedRow> {
1758 self.relational_store
1759 .tables
1760 .read()
1761 .get(table)
1762 .and_then(|rows| rows.iter().find(|row| row.row_id == row_id))
1763 .cloned()
1764 }
1765
1766 fn find_vector_by_row_id(&self, row_id: RowId) -> Option<VectorEntry> {
1767 self.vector_store
1768 .vectors
1769 .read()
1770 .iter()
1771 .find(|entry| entry.row_id == row_id)
1772 .cloned()
1773 }
1774
1775 fn find_edge(&self, source: &NodeId, target: &NodeId, edge_type: &str) -> Option<AdjEntry> {
1776 self.graph_store
1777 .forward_adj
1778 .read()
1779 .get(source)
1780 .and_then(|entries| {
1781 entries
1782 .iter()
1783 .find(|entry| entry.target == *target && entry.edge_type == edge_type)
1784 .cloned()
1785 })
1786 }
1787
1788 pub(crate) fn write_set_checkpoint(
1789 &self,
1790 tx: TxId,
1791 ) -> Result<(usize, usize, usize, usize, usize)> {
1792 self.tx_mgr.with_write_set(tx, |ws| {
1793 (
1794 ws.relational_inserts.len(),
1795 ws.relational_deletes.len(),
1796 ws.adj_inserts.len(),
1797 ws.vector_inserts.len(),
1798 ws.vector_deletes.len(),
1799 )
1800 })
1801 }
1802
1803 pub(crate) fn restore_write_set_checkpoint(
1804 &self,
1805 tx: TxId,
1806 checkpoint: (usize, usize, usize, usize, usize),
1807 ) -> Result<()> {
1808 self.tx_mgr.with_write_set(tx, |ws| {
1809 ws.relational_inserts.truncate(checkpoint.0);
1810 ws.relational_deletes.truncate(checkpoint.1);
1811 ws.adj_inserts.truncate(checkpoint.2);
1812 ws.vector_inserts.truncate(checkpoint.3);
1813 ws.vector_deletes.truncate(checkpoint.4);
1814 })
1815 }
1816
1817 pub fn conflict_policies(&self) -> ConflictPolicies {
1819 self.conflict_policies.read().clone()
1820 }
1821
1822 pub fn set_default_conflict_policy(&self, policy: ConflictPolicy) {
1824 self.conflict_policies.write().default = policy;
1825 }
1826
1827 pub fn set_table_conflict_policy(&self, table: &str, policy: ConflictPolicy) {
1829 self.conflict_policies
1830 .write()
1831 .per_table
1832 .insert(table.to_string(), policy);
1833 }
1834
1835 pub fn drop_table_conflict_policy(&self, table: &str) {
1837 self.conflict_policies.write().per_table.remove(table);
1838 }
1839
1840 pub fn plugin(&self) -> &dyn DatabasePlugin {
1841 self.plugin.as_ref()
1842 }
1843
1844 pub fn plugin_health(&self) -> PluginHealth {
1845 self.plugin.health()
1846 }
1847
1848 pub fn plugin_describe(&self) -> serde_json::Value {
1849 self.plugin.describe()
1850 }
1851
1852 pub(crate) fn graph(&self) -> &MemGraphExecutor<DynStore> {
1853 &self.graph
1854 }
1855
1856 pub(crate) fn relational_store(&self) -> &Arc<RelationalStore> {
1857 &self.relational_store
1858 }
1859
1860 pub(crate) fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
1861 where
1862 F: FnOnce(u64) -> R,
1863 {
1864 self.tx_mgr.allocate_ddl_lsn(f)
1865 }
1866
1867 pub(crate) fn with_commit_lock<F, R>(&self, f: F) -> R
1868 where
1869 F: FnOnce() -> R,
1870 {
1871 self.tx_mgr.with_commit_lock(f)
1872 }
1873
1874 pub(crate) fn log_create_table_ddl(&self, name: &str, meta: &TableMeta, lsn: u64) {
1875 let change = ddl_change_from_meta(name, meta);
1876 self.ddl_log.write().push((lsn, change.clone()));
1877 if let Some(persistence) = &self.persistence {
1878 let _ = persistence.append_ddl_log(lsn, &change);
1879 }
1880 }
1881
1882 pub(crate) fn log_drop_table_ddl(&self, name: &str, lsn: u64) {
1883 let change = DdlChange::DropTable {
1884 name: name.to_string(),
1885 };
1886 self.ddl_log.write().push((lsn, change.clone()));
1887 if let Some(persistence) = &self.persistence {
1888 let _ = persistence.append_ddl_log(lsn, &change);
1889 }
1890 }
1891
1892 pub(crate) fn log_alter_table_ddl(&self, name: &str, meta: &TableMeta, lsn: u64) {
1893 let change = DdlChange::AlterTable {
1894 name: name.to_string(),
1895 columns: meta
1896 .columns
1897 .iter()
1898 .map(|c| {
1899 (
1900 c.name.clone(),
1901 sql_type_for_meta_column(c, &meta.propagation_rules),
1902 )
1903 })
1904 .collect(),
1905 constraints: create_table_constraints_from_meta(meta),
1906 };
1907 self.ddl_log.write().push((lsn, change.clone()));
1908 if let Some(persistence) = &self.persistence {
1909 let _ = persistence.append_ddl_log(lsn, &change);
1910 }
1911 }
1912
1913 pub(crate) fn persist_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
1914 if let Some(persistence) = &self.persistence {
1915 persistence.flush_table_meta(name, meta)?;
1916 }
1917 Ok(())
1918 }
1919
1920 pub(crate) fn persist_memory_limit(&self, limit: Option<usize>) -> Result<()> {
1921 if let Some(persistence) = &self.persistence {
1922 match limit {
1923 Some(limit) => persistence.flush_config_value("memory_limit", &limit)?,
1924 None => persistence.remove_config_value("memory_limit")?,
1925 }
1926 }
1927 Ok(())
1928 }
1929
1930 pub fn set_disk_limit(&self, limit: Option<u64>) -> Result<()> {
1931 if self.persistence.is_none() {
1932 self.disk_limit.store(0, Ordering::SeqCst);
1933 return Ok(());
1934 }
1935
1936 let ceiling = self.disk_limit_startup_ceiling();
1937 if let Some(ceiling) = ceiling {
1938 match limit {
1939 Some(bytes) if bytes > ceiling => {
1940 return Err(Error::Other(format!(
1941 "disk limit {bytes} exceeds startup ceiling {ceiling}"
1942 )));
1943 }
1944 None => {
1945 return Err(Error::Other(
1946 "cannot remove disk limit when a startup ceiling is set".to_string(),
1947 ));
1948 }
1949 _ => {}
1950 }
1951 }
1952
1953 self.disk_limit.store(limit.unwrap_or(0), Ordering::SeqCst);
1954 Ok(())
1955 }
1956
1957 pub fn disk_limit(&self) -> Option<u64> {
1958 match self.disk_limit.load(Ordering::SeqCst) {
1959 0 => None,
1960 bytes => Some(bytes),
1961 }
1962 }
1963
1964 pub fn disk_limit_startup_ceiling(&self) -> Option<u64> {
1965 match self.disk_limit_startup_ceiling.load(Ordering::SeqCst) {
1966 0 => None,
1967 bytes => Some(bytes),
1968 }
1969 }
1970
1971 pub fn disk_file_size(&self) -> Option<u64> {
1972 self.persistence
1973 .as_ref()
1974 .map(|persistence| std::fs::metadata(persistence.path()).map(|meta| meta.len()))
1975 .transpose()
1976 .ok()
1977 .flatten()
1978 }
1979
1980 pub(crate) fn persist_disk_limit(&self, limit: Option<u64>) -> Result<()> {
1981 if let Some(persistence) = &self.persistence {
1982 match limit {
1983 Some(limit) => persistence.flush_config_value("disk_limit", &limit)?,
1984 None => persistence.remove_config_value("disk_limit")?,
1985 }
1986 }
1987 Ok(())
1988 }
1989
1990 pub fn check_disk_budget(&self, operation: &str) -> Result<()> {
1991 let Some(limit) = self.disk_limit() else {
1992 return Ok(());
1993 };
1994 let Some(current_bytes) = self.disk_file_size() else {
1995 return Ok(());
1996 };
1997 if current_bytes < limit {
1998 return Ok(());
1999 }
2000 Err(Error::DiskBudgetExceeded {
2001 operation: operation.to_string(),
2002 current_bytes,
2003 budget_limit_bytes: limit,
2004 hint: "Reduce retained file-backed data or raise DISK_LIMIT before writing more data."
2005 .to_string(),
2006 })
2007 }
2008
2009 pub fn persisted_sync_watermarks(&self, tenant_id: &str) -> Result<(u64, u64)> {
2010 let Some(persistence) = &self.persistence else {
2011 return Ok((0, 0));
2012 };
2013 let push = persistence
2014 .load_config_value::<u64>(&format!("sync_push_watermark:{tenant_id}"))?
2015 .unwrap_or(0);
2016 let pull = persistence
2017 .load_config_value::<u64>(&format!("sync_pull_watermark:{tenant_id}"))?
2018 .unwrap_or(0);
2019 Ok((push, pull))
2020 }
2021
2022 pub fn persist_sync_push_watermark(&self, tenant_id: &str, watermark: u64) -> Result<()> {
2023 if let Some(persistence) = &self.persistence {
2024 persistence
2025 .flush_config_value(&format!("sync_push_watermark:{tenant_id}"), &watermark)?;
2026 }
2027 Ok(())
2028 }
2029
2030 pub fn persist_sync_pull_watermark(&self, tenant_id: &str, watermark: u64) -> Result<()> {
2031 if let Some(persistence) = &self.persistence {
2032 persistence
2033 .flush_config_value(&format!("sync_pull_watermark:{tenant_id}"), &watermark)?;
2034 }
2035 Ok(())
2036 }
2037
2038 pub(crate) fn persist_table_rows(&self, name: &str) -> Result<()> {
2039 if let Some(persistence) = &self.persistence {
2040 let tables = self.relational_store.tables.read();
2041 if let Some(rows) = tables.get(name) {
2042 persistence.rewrite_table_rows(name, rows)?;
2043 }
2044 }
2045 Ok(())
2046 }
2047
2048 pub(crate) fn remove_persisted_table(&self, name: &str) -> Result<()> {
2049 if let Some(persistence) = &self.persistence {
2050 persistence.remove_table_meta(name)?;
2051 persistence.remove_table_data(name)?;
2052 }
2053 Ok(())
2054 }
2055
2056 pub fn change_log_since(&self, since_lsn: u64) -> Vec<ChangeLogEntry> {
2057 let log = self.change_log.read();
2058 let start = log.partition_point(|e| e.lsn() <= since_lsn);
2059 log[start..].to_vec()
2060 }
2061
2062 pub fn ddl_log_since(&self, since_lsn: u64) -> Vec<DdlChange> {
2063 let ddl = self.ddl_log.read();
2064 let start = ddl.partition_point(|(lsn, _)| *lsn <= since_lsn);
2065 ddl[start..].iter().map(|(_, c)| c.clone()).collect()
2066 }
2067
2068 #[allow(dead_code)]
2071 fn full_state_snapshot(&self) -> ChangeSet {
2072 let mut rows = Vec::new();
2073 let mut edges = Vec::new();
2074 let mut vectors = Vec::new();
2075 let mut ddl = Vec::new();
2076
2077 let meta_guard = self.relational_store.table_meta.read();
2078 let tables_guard = self.relational_store.tables.read();
2079
2080 for (name, meta) in meta_guard.iter() {
2082 ddl.push(ddl_change_from_meta(name, meta));
2083 }
2084
2085 let mut live_row_ids: HashSet<RowId> = HashSet::new();
2087 for (table_name, table_rows) in tables_guard.iter() {
2088 let meta = match meta_guard.get(table_name) {
2089 Some(m) => m,
2090 None => continue,
2091 };
2092 let key_col = meta.natural_key_column.clone().unwrap_or_else(|| {
2093 if meta
2094 .columns
2095 .iter()
2096 .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
2097 {
2098 "id".to_string()
2099 } else {
2100 String::new()
2101 }
2102 });
2103 if key_col.is_empty() {
2104 continue;
2105 }
2106 for row in table_rows.iter().filter(|r| r.deleted_tx.is_none()) {
2107 let key_val = match row.values.get(&key_col) {
2108 Some(v) => v.clone(),
2109 None => continue,
2110 };
2111 live_row_ids.insert(row.row_id);
2112 rows.push(RowChange {
2113 table: table_name.clone(),
2114 natural_key: NaturalKey {
2115 column: key_col.clone(),
2116 value: key_val,
2117 },
2118 values: row.values.clone(),
2119 deleted: false,
2120 lsn: row.lsn,
2121 });
2122 }
2123 }
2124
2125 drop(tables_guard);
2126 drop(meta_guard);
2127
2128 let fwd = self.graph_store.forward_adj.read();
2130 for (_source, entries) in fwd.iter() {
2131 for entry in entries.iter().filter(|e| e.deleted_tx.is_none()) {
2132 edges.push(EdgeChange {
2133 source: entry.source,
2134 target: entry.target,
2135 edge_type: entry.edge_type.clone(),
2136 properties: entry.properties.clone(),
2137 lsn: entry.lsn,
2138 });
2139 }
2140 }
2141 drop(fwd);
2142
2143 let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
2145 let vecs = self.vector_store.vectors.read();
2146 for entry in vecs.iter().filter(|v| v.deleted_tx.is_none()) {
2147 if !live_row_ids.contains(&entry.row_id) {
2148 continue; }
2150 if !seen_vector_rows.insert(entry.row_id) {
2151 continue; }
2153 vectors.push(VectorChange {
2154 row_id: entry.row_id,
2155 vector: entry.vector.clone(),
2156 lsn: entry.lsn,
2157 });
2158 }
2159 drop(vecs);
2160
2161 ChangeSet {
2162 rows,
2163 edges,
2164 vectors,
2165 ddl,
2166 }
2167 }
2168
2169 fn persisted_state_since(&self, since_lsn: u64) -> ChangeSet {
2170 if since_lsn == 0 {
2171 return self.full_state_snapshot();
2172 }
2173
2174 let mut rows = Vec::new();
2175 let mut edges = Vec::new();
2176 let mut vectors = Vec::new();
2177 let ddl = Vec::new();
2178
2179 let meta_guard = self.relational_store.table_meta.read();
2180 let tables_guard = self.relational_store.tables.read();
2181
2182 let mut live_row_ids: HashSet<RowId> = HashSet::new();
2183 for (table_name, table_rows) in tables_guard.iter() {
2184 let meta = match meta_guard.get(table_name) {
2185 Some(meta) => meta,
2186 None => continue,
2187 };
2188 let key_col = meta.natural_key_column.clone().unwrap_or_else(|| {
2189 if meta
2190 .columns
2191 .iter()
2192 .any(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
2193 {
2194 "id".to_string()
2195 } else {
2196 String::new()
2197 }
2198 });
2199 if key_col.is_empty() {
2200 continue;
2201 }
2202 for row in table_rows.iter().filter(|row| row.deleted_tx.is_none()) {
2203 live_row_ids.insert(row.row_id);
2204 if row.lsn <= since_lsn {
2205 continue;
2206 }
2207 let key_val = match row.values.get(&key_col) {
2208 Some(value) => value.clone(),
2209 None => continue,
2210 };
2211 rows.push(RowChange {
2212 table: table_name.clone(),
2213 natural_key: NaturalKey {
2214 column: key_col.clone(),
2215 value: key_val,
2216 },
2217 values: row.values.clone(),
2218 deleted: false,
2219 lsn: row.lsn,
2220 });
2221 }
2222 }
2223 drop(tables_guard);
2224 drop(meta_guard);
2225
2226 let fwd = self.graph_store.forward_adj.read();
2227 for entries in fwd.values() {
2228 for entry in entries
2229 .iter()
2230 .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
2231 {
2232 edges.push(EdgeChange {
2233 source: entry.source,
2234 target: entry.target,
2235 edge_type: entry.edge_type.clone(),
2236 properties: entry.properties.clone(),
2237 lsn: entry.lsn,
2238 });
2239 }
2240 }
2241 drop(fwd);
2242
2243 let mut seen_vector_rows: HashSet<RowId> = HashSet::new();
2244 let vecs = self.vector_store.vectors.read();
2245 for entry in vecs
2246 .iter()
2247 .filter(|entry| entry.deleted_tx.is_none() && entry.lsn > since_lsn)
2248 {
2249 if !live_row_ids.contains(&entry.row_id) {
2250 continue;
2251 }
2252 if !seen_vector_rows.insert(entry.row_id) {
2253 continue;
2254 }
2255 vectors.push(VectorChange {
2256 row_id: entry.row_id,
2257 vector: entry.vector.clone(),
2258 lsn: entry.lsn,
2259 });
2260 }
2261 drop(vecs);
2262
2263 ChangeSet {
2264 rows,
2265 edges,
2266 vectors,
2267 ddl,
2268 }
2269 }
2270
2271 fn preflight_sync_apply_memory(
2272 &self,
2273 changes: &ChangeSet,
2274 policies: &ConflictPolicies,
2275 ) -> Result<()> {
2276 let usage = self.accountant.usage();
2277 let Some(limit) = usage.limit else {
2278 return Ok(());
2279 };
2280 let available = usage.available.unwrap_or(limit);
2281 let mut required = 0usize;
2282
2283 for row in &changes.rows {
2284 if row.deleted || row.values.is_empty() {
2285 continue;
2286 }
2287
2288 let policy = policies
2289 .per_table
2290 .get(&row.table)
2291 .copied()
2292 .unwrap_or(policies.default);
2293 let existing = self.point_lookup(
2294 &row.table,
2295 &row.natural_key.column,
2296 &row.natural_key.value,
2297 self.snapshot(),
2298 )?;
2299
2300 if existing.is_some()
2301 && matches!(
2302 policy,
2303 ConflictPolicy::InsertIfNotExists | ConflictPolicy::ServerWins
2304 )
2305 {
2306 continue;
2307 }
2308
2309 required = required.saturating_add(
2310 self.table_meta(&row.table)
2311 .map(|meta| estimate_row_bytes_for_meta(&row.values, &meta, false))
2312 .unwrap_or_else(|| estimate_row_value_bytes(&row.values)),
2313 );
2314 }
2315
2316 for edge in &changes.edges {
2317 required = required.saturating_add(
2318 96 + edge.edge_type.len().saturating_mul(16)
2319 + estimate_row_value_bytes(&edge.properties),
2320 );
2321 }
2322
2323 for vector in &changes.vectors {
2324 if vector.vector.is_empty() {
2325 continue;
2326 }
2327 required = required.saturating_add(
2328 24 + vector
2329 .vector
2330 .len()
2331 .saturating_mul(std::mem::size_of::<f32>()),
2332 );
2333 }
2334
2335 if required > available {
2336 return Err(Error::MemoryBudgetExceeded {
2337 subsystem: "sync".to_string(),
2338 operation: "apply_changes".to_string(),
2339 requested_bytes: required,
2340 available_bytes: available,
2341 budget_limit_bytes: limit,
2342 hint:
2343 "Reduce sync batch size, split the push, or raise MEMORY_LIMIT on the server."
2344 .to_string(),
2345 });
2346 }
2347
2348 Ok(())
2349 }
2350
2351 pub fn changes_since(&self, since_lsn: u64) -> ChangeSet {
2353 if since_lsn > self.current_lsn() {
2355 return ChangeSet::default();
2356 }
2357
2358 let log = self.change_log.read();
2361 let change_first_lsn = log.first().map(|e| e.lsn());
2362 let change_log_empty = log.is_empty();
2363 drop(log);
2364
2365 let ddl = self.ddl_log.read();
2366 let ddl_first_lsn = ddl.first().map(|(lsn, _)| *lsn);
2367 let ddl_log_empty = ddl.is_empty();
2368 drop(ddl);
2369
2370 let has_table_data = !self
2371 .relational_store
2372 .tables
2373 .read()
2374 .values()
2375 .all(|rows| rows.is_empty());
2376 let has_table_meta = !self.relational_store.table_meta.read().is_empty();
2377
2378 if change_log_empty && ddl_log_empty && (has_table_data || has_table_meta) {
2381 return self.persisted_state_since(since_lsn);
2382 }
2383
2384 let min_first_lsn = match (change_first_lsn, ddl_first_lsn) {
2386 (Some(c), Some(d)) => Some(c.min(d)),
2387 (Some(c), None) => Some(c),
2388 (None, Some(d)) => Some(d),
2389 (None, None) => None, };
2391
2392 if min_first_lsn.is_some_and(|min_lsn| min_lsn > since_lsn + 1) {
2393 return self.persisted_state_since(since_lsn);
2395 }
2396
2397 let (ddl, change_entries) = self.with_commit_lock(|| {
2398 let ddl = self.ddl_log_since(since_lsn);
2399 let changes = self.change_log_since(since_lsn);
2400 (ddl, changes)
2401 });
2402
2403 let mut rows = Vec::new();
2404 let mut edges = Vec::new();
2405 let mut vectors = Vec::new();
2406
2407 for entry in change_entries {
2408 match entry {
2409 ChangeLogEntry::RowInsert { table, row_id, lsn } => {
2410 if let Some((natural_key, values)) = self.row_change_values(&table, row_id) {
2411 rows.push(RowChange {
2412 table,
2413 natural_key,
2414 values,
2415 deleted: false,
2416 lsn,
2417 });
2418 }
2419 }
2420 ChangeLogEntry::RowDelete {
2421 table,
2422 natural_key,
2423 lsn,
2424 ..
2425 } => {
2426 let mut values = HashMap::new();
2427 values.insert("__deleted".to_string(), Value::Bool(true));
2428 rows.push(RowChange {
2429 table,
2430 natural_key,
2431 values,
2432 deleted: true,
2433 lsn,
2434 });
2435 }
2436 ChangeLogEntry::EdgeInsert {
2437 source,
2438 target,
2439 edge_type,
2440 lsn,
2441 } => {
2442 let properties = self
2443 .edge_properties(source, target, &edge_type, lsn)
2444 .unwrap_or_default();
2445 edges.push(EdgeChange {
2446 source,
2447 target,
2448 edge_type,
2449 properties,
2450 lsn,
2451 });
2452 }
2453 ChangeLogEntry::EdgeDelete {
2454 source,
2455 target,
2456 edge_type,
2457 lsn,
2458 } => {
2459 let mut properties = HashMap::new();
2460 properties.insert("__deleted".to_string(), Value::Bool(true));
2461 edges.push(EdgeChange {
2462 source,
2463 target,
2464 edge_type,
2465 properties,
2466 lsn,
2467 });
2468 }
2469 ChangeLogEntry::VectorInsert { row_id, lsn } => {
2470 if let Some(vector) = self.vector_for_row_lsn(row_id, lsn) {
2471 vectors.push(VectorChange {
2472 row_id,
2473 vector,
2474 lsn,
2475 });
2476 }
2477 }
2478 ChangeLogEntry::VectorDelete { row_id, lsn } => vectors.push(VectorChange {
2479 row_id,
2480 vector: Vec::new(),
2481 lsn,
2482 }),
2483 }
2484 }
2485
2486 let insert_max_lsn: HashMap<(String, String, String), u64> = {
2492 let mut map: HashMap<(String, String, String), u64> = HashMap::new();
2493 for r in rows.iter().filter(|r| !r.deleted) {
2494 let key = (
2495 r.table.clone(),
2496 r.natural_key.column.clone(),
2497 format!("{:?}", r.natural_key.value),
2498 );
2499 let entry = map.entry(key).or_insert(0);
2500 if r.lsn > *entry {
2501 *entry = r.lsn;
2502 }
2503 }
2504 map
2505 };
2506 rows.retain(|r| {
2507 if r.deleted {
2508 let key = (
2509 r.table.clone(),
2510 r.natural_key.column.clone(),
2511 format!("{:?}", r.natural_key.value),
2512 );
2513 match insert_max_lsn.get(&key) {
2516 Some(&insert_lsn) => insert_lsn < r.lsn,
2517 None => true,
2518 }
2519 } else {
2520 true
2521 }
2522 });
2523
2524 ChangeSet {
2525 rows,
2526 edges,
2527 vectors,
2528 ddl,
2529 }
2530 }
2531
2532 pub fn current_lsn(&self) -> u64 {
2534 self.tx_mgr.current_lsn()
2535 }
2536
2537 pub fn subscribe(&self) -> Receiver<CommitEvent> {
2540 self.subscribe_with_capacity(DEFAULT_SUBSCRIPTION_CAPACITY)
2541 }
2542
2543 pub fn subscribe_with_capacity(&self, capacity: usize) -> Receiver<CommitEvent> {
2545 let (tx, rx) = mpsc::sync_channel(capacity.max(1));
2546 self.subscriptions.lock().subscribers.push(tx);
2547 rx
2548 }
2549
2550 pub fn subscription_health(&self) -> SubscriptionMetrics {
2552 let subscriptions = self.subscriptions.lock();
2553 SubscriptionMetrics {
2554 active_channels: subscriptions.subscribers.len(),
2555 events_sent: subscriptions.events_sent,
2556 events_dropped: subscriptions.events_dropped,
2557 }
2558 }
2559
2560 pub fn apply_changes(
2562 &self,
2563 mut changes: ChangeSet,
2564 policies: &ConflictPolicies,
2565 ) -> Result<ApplyResult> {
2566 self.plugin.on_sync_pull(&mut changes)?;
2567 self.check_disk_budget("sync_pull")?;
2568 self.preflight_sync_apply_memory(&changes, policies)?;
2569
2570 let mut tx = self.begin();
2571 let mut result = ApplyResult {
2572 applied_rows: 0,
2573 skipped_rows: 0,
2574 conflicts: Vec::new(),
2575 new_lsn: self.current_lsn(),
2576 };
2577 let vector_row_ids = changes.vectors.iter().map(|v| v.row_id).collect::<Vec<_>>();
2578 let mut vector_row_map: HashMap<u64, u64> = HashMap::new();
2579 let mut vector_row_idx = 0usize;
2580 let mut failed_row_ids: HashSet<u64> = HashSet::new();
2581 let mut table_meta_cache: HashMap<String, Option<TableMeta>> = HashMap::new();
2582 let mut visible_rows_cache: HashMap<String, Vec<VersionedRow>> = HashMap::new();
2583
2584 for ddl in changes.ddl.clone() {
2585 match ddl {
2586 DdlChange::CreateTable {
2587 name,
2588 columns,
2589 constraints,
2590 } => {
2591 if self.table_meta(&name).is_some() {
2592 if let Some(local_meta) = self.table_meta(&name) {
2593 let local_cols: Vec<(String, String)> = local_meta
2594 .columns
2595 .iter()
2596 .map(|c| {
2597 let ty = match c.column_type {
2598 ColumnType::Integer => "INTEGER".to_string(),
2599 ColumnType::Real => "REAL".to_string(),
2600 ColumnType::Text => "TEXT".to_string(),
2601 ColumnType::Boolean => "BOOLEAN".to_string(),
2602 ColumnType::Json => "JSON".to_string(),
2603 ColumnType::Uuid => "UUID".to_string(),
2604 ColumnType::Vector(dim) => format!("VECTOR({dim})"),
2605 ColumnType::Timestamp => "TIMESTAMP".to_string(),
2606 };
2607 (c.name.clone(), ty)
2608 })
2609 .collect();
2610 let remote_cols: Vec<(String, String)> = columns
2611 .iter()
2612 .map(|(col_name, col_type)| {
2613 let base_type = col_type
2614 .split_whitespace()
2615 .next()
2616 .unwrap_or(col_type)
2617 .to_string();
2618 (col_name.clone(), base_type)
2619 })
2620 .collect();
2621 let mut local_sorted = local_cols.clone();
2622 local_sorted.sort();
2623 let mut remote_sorted = remote_cols.clone();
2624 remote_sorted.sort();
2625 if local_sorted != remote_sorted {
2626 result.conflicts.push(Conflict {
2627 natural_key: NaturalKey {
2628 column: "table".to_string(),
2629 value: Value::Text(name.clone()),
2630 },
2631 resolution: ConflictPolicy::ServerWins,
2632 reason: Some(format!(
2633 "schema mismatch: local columns {:?} differ from remote {:?}",
2634 local_cols, remote_cols
2635 )),
2636 });
2637 }
2638 }
2639 continue;
2640 }
2641 let mut sql = format!(
2642 "CREATE TABLE {} ({})",
2643 name,
2644 columns
2645 .iter()
2646 .map(|(col, ty)| format!("{col} {ty}"))
2647 .collect::<Vec<_>>()
2648 .join(", ")
2649 );
2650 if !constraints.is_empty() {
2651 sql.push(' ');
2652 sql.push_str(&constraints.join(" "));
2653 }
2654 self.execute_in_tx(tx, &sql, &HashMap::new())?;
2655 table_meta_cache.remove(&name);
2656 visible_rows_cache.remove(&name);
2657 }
2658 DdlChange::DropTable { name } => {
2659 if self.table_meta(&name).is_some() {
2660 self.drop_table_aux_state(&name);
2661 self.relational_store().drop_table(&name);
2662 self.remove_persisted_table(&name)?;
2663 }
2664 table_meta_cache.remove(&name);
2665 visible_rows_cache.remove(&name);
2666 }
2667 DdlChange::AlterTable {
2668 name,
2669 columns,
2670 constraints,
2671 } => {
2672 if self.table_meta(&name).is_none() {
2673 continue;
2674 }
2675 self.relational_store().table_meta.write().remove(&name);
2676 let mut sql = format!(
2677 "CREATE TABLE {} ({})",
2678 name,
2679 columns
2680 .iter()
2681 .map(|(col, ty)| format!("{col} {ty}"))
2682 .collect::<Vec<_>>()
2683 .join(", ")
2684 );
2685 if !constraints.is_empty() {
2686 sql.push(' ');
2687 sql.push_str(&constraints.join(" "));
2688 }
2689 self.execute_in_tx(tx, &sql, &HashMap::new())?;
2690 table_meta_cache.remove(&name);
2691 visible_rows_cache.remove(&name);
2692 }
2693 }
2694 }
2695
2696 self.preflight_sync_apply_memory(&changes, policies)?;
2697
2698 for row in changes.rows {
2699 if row.values.is_empty() {
2700 result.skipped_rows += 1;
2701 self.commit_with_source(tx, CommitSource::SyncPull)?;
2702 tx = self.begin();
2703 continue;
2704 }
2705
2706 let policy = policies
2707 .per_table
2708 .get(&row.table)
2709 .copied()
2710 .unwrap_or(policies.default);
2711
2712 let existing = cached_point_lookup(
2713 self,
2714 &mut visible_rows_cache,
2715 &row.table,
2716 &row.natural_key.column,
2717 &row.natural_key.value,
2718 )?;
2719 let is_delete = row.deleted;
2720 let row_has_vector = cached_table_meta(self, &mut table_meta_cache, &row.table)
2721 .is_some_and(|meta| {
2722 meta.columns
2723 .iter()
2724 .any(|col| matches!(col.column_type, ColumnType::Vector(_)))
2725 });
2726
2727 if is_delete {
2728 if let Some(local) = existing {
2729 if row_has_vector
2730 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2731 {
2732 vector_row_map.insert(*remote_row_id, local.row_id);
2733 vector_row_idx += 1;
2734 }
2735 if let Err(err) = self.delete_row(tx, &row.table, local.row_id) {
2736 result.conflicts.push(Conflict {
2737 natural_key: row.natural_key.clone(),
2738 resolution: policy,
2739 reason: Some(format!("delete failed: {err}")),
2740 });
2741 result.skipped_rows += 1;
2742 } else {
2743 remove_cached_row(&mut visible_rows_cache, &row.table, local.row_id);
2744 result.applied_rows += 1;
2745 }
2746 } else {
2747 result.skipped_rows += 1;
2748 }
2749 self.commit_with_source(tx, CommitSource::SyncPull)?;
2750 tx = self.begin();
2751 continue;
2752 }
2753
2754 let mut values = row.values.clone();
2755 values.remove("__deleted");
2756
2757 match (existing, policy) {
2758 (None, _) => {
2759 if let Some(meta) = cached_table_meta(self, &mut table_meta_cache, &row.table) {
2760 let mut constraint_error: Option<String> = None;
2761
2762 for col_def in &meta.columns {
2763 if !col_def.nullable
2764 && !col_def.primary_key
2765 && col_def.default.is_none()
2766 {
2767 match values.get(&col_def.name) {
2768 None | Some(Value::Null) => {
2769 constraint_error = Some(format!(
2770 "NOT NULL constraint violated: {}.{}",
2771 row.table, col_def.name
2772 ));
2773 break;
2774 }
2775 _ => {}
2776 }
2777 }
2778 }
2779
2780 let has_unique = meta.columns.iter().any(|c| c.unique && !c.primary_key);
2781 if constraint_error.is_none() && has_unique {
2782 for col_def in &meta.columns {
2783 if col_def.unique
2784 && !col_def.primary_key
2785 && let Some(new_val) = values.get(&col_def.name)
2786 && *new_val != Value::Null
2787 && cached_visible_rows(
2788 self,
2789 &mut visible_rows_cache,
2790 &row.table,
2791 )?
2792 .iter()
2793 .any(|r| r.values.get(&col_def.name) == Some(new_val))
2794 {
2795 constraint_error = Some(format!(
2796 "UNIQUE constraint violated: {}.{}",
2797 row.table, col_def.name
2798 ));
2799 break;
2800 }
2801 }
2802 }
2803
2804 if let Some(err_msg) = constraint_error {
2805 result.skipped_rows += 1;
2806 if row_has_vector
2807 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2808 {
2809 failed_row_ids.insert(*remote_row_id);
2810 vector_row_idx += 1;
2811 }
2812 result.conflicts.push(Conflict {
2813 natural_key: row.natural_key.clone(),
2814 resolution: policy,
2815 reason: Some(err_msg),
2816 });
2817 self.commit_with_source(tx, CommitSource::SyncPull)?;
2818 tx = self.begin();
2819 continue;
2820 }
2821 }
2822
2823 match self.insert_row(tx, &row.table, values.clone()) {
2824 Ok(new_row_id) => {
2825 record_cached_insert(
2826 &mut visible_rows_cache,
2827 &row.table,
2828 VersionedRow {
2829 row_id: new_row_id,
2830 values: values.clone(),
2831 created_tx: tx,
2832 deleted_tx: None,
2833 lsn: row.lsn,
2834 created_at: None,
2835 },
2836 );
2837 result.applied_rows += 1;
2838 if row_has_vector
2839 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2840 {
2841 vector_row_map.insert(*remote_row_id, new_row_id);
2842 vector_row_idx += 1;
2843 }
2844 }
2845 Err(err) => {
2846 if is_fatal_sync_apply_error(&err) {
2847 return Err(err);
2848 }
2849 result.skipped_rows += 1;
2850 if row_has_vector
2851 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2852 {
2853 failed_row_ids.insert(*remote_row_id);
2854 vector_row_idx += 1;
2855 }
2856 result.conflicts.push(Conflict {
2857 natural_key: row.natural_key.clone(),
2858 resolution: policy,
2859 reason: Some(format!("{err}")),
2860 });
2861 }
2862 }
2863 }
2864 (Some(local), ConflictPolicy::InsertIfNotExists) => {
2865 if row_has_vector
2866 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2867 {
2868 vector_row_map.insert(*remote_row_id, local.row_id);
2869 vector_row_idx += 1;
2870 }
2871 result.skipped_rows += 1;
2872 }
2873 (Some(_), ConflictPolicy::ServerWins) => {
2874 result.skipped_rows += 1;
2875 if row_has_vector
2876 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2877 {
2878 failed_row_ids.insert(*remote_row_id);
2879 vector_row_idx += 1;
2880 }
2881 result.conflicts.push(Conflict {
2882 natural_key: row.natural_key.clone(),
2883 resolution: ConflictPolicy::ServerWins,
2884 reason: Some("server_wins".to_string()),
2885 });
2886 }
2887 (Some(local), ConflictPolicy::LatestWins) => {
2888 if row.lsn <= local.lsn {
2889 result.skipped_rows += 1;
2890 if row_has_vector
2891 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2892 {
2893 failed_row_ids.insert(*remote_row_id);
2894 vector_row_idx += 1;
2895 }
2896 result.conflicts.push(Conflict {
2897 natural_key: row.natural_key.clone(),
2898 resolution: ConflictPolicy::LatestWins,
2899 reason: Some("local_lsn_newer_or_equal".to_string()),
2900 });
2901 } else {
2902 if let Some(meta) = self.table_meta(&row.table)
2904 && let Some(sm) = &meta.state_machine
2905 {
2906 let sm_col = sm.column.clone();
2907 let transitions = sm.transitions.clone();
2908 let incoming_state = values.get(&sm_col).and_then(|v| match v {
2909 Value::Text(s) => Some(s.clone()),
2910 _ => None,
2911 });
2912 let local_state = local.values.get(&sm_col).and_then(|v| match v {
2913 Value::Text(s) => Some(s.clone()),
2914 _ => None,
2915 });
2916
2917 if let (Some(incoming), Some(current)) = (incoming_state, local_state) {
2918 let valid = transitions
2920 .get(¤t)
2921 .is_some_and(|targets| targets.contains(&incoming));
2922 if !valid && incoming != current {
2923 result.skipped_rows += 1;
2924 if row_has_vector
2925 && let Some(remote_row_id) =
2926 vector_row_ids.get(vector_row_idx)
2927 {
2928 failed_row_ids.insert(*remote_row_id);
2929 vector_row_idx += 1;
2930 }
2931 result.conflicts.push(Conflict {
2932 natural_key: row.natural_key.clone(),
2933 resolution: ConflictPolicy::LatestWins,
2934 reason: Some(format!(
2935 "state_machine: invalid transition {} -> {} (current: {})",
2936 current, incoming, current
2937 )),
2938 });
2939 self.commit_with_source(tx, CommitSource::SyncPull)?;
2940 tx = self.begin();
2941 continue;
2942 }
2943 }
2944 }
2945
2946 match self.upsert_row(
2947 tx,
2948 &row.table,
2949 &row.natural_key.column,
2950 values.clone(),
2951 ) {
2952 Ok(_) => {
2953 visible_rows_cache.remove(&row.table);
2954 result.applied_rows += 1;
2955 if row_has_vector
2956 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2957 {
2958 if let Ok(Some(found)) = self.point_lookup_in_tx(
2959 tx,
2960 &row.table,
2961 &row.natural_key.column,
2962 &row.natural_key.value,
2963 self.snapshot(),
2964 ) {
2965 vector_row_map.insert(*remote_row_id, found.row_id);
2966 }
2967 vector_row_idx += 1;
2968 }
2969 }
2970 Err(err) => {
2971 if is_fatal_sync_apply_error(&err) {
2972 return Err(err);
2973 }
2974 result.skipped_rows += 1;
2975 if row_has_vector
2976 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
2977 {
2978 failed_row_ids.insert(*remote_row_id);
2979 vector_row_idx += 1;
2980 }
2981 result.conflicts.push(Conflict {
2982 natural_key: row.natural_key.clone(),
2983 resolution: ConflictPolicy::LatestWins,
2984 reason: Some(format!("state_machine_or_constraint: {err}")),
2985 });
2986 }
2987 }
2988 }
2989 }
2990 (Some(_), ConflictPolicy::EdgeWins) => {
2991 result.conflicts.push(Conflict {
2992 natural_key: row.natural_key.clone(),
2993 resolution: ConflictPolicy::EdgeWins,
2994 reason: Some("edge_wins".to_string()),
2995 });
2996 match self.upsert_row(tx, &row.table, &row.natural_key.column, values.clone()) {
2997 Ok(_) => {
2998 visible_rows_cache.remove(&row.table);
2999 result.applied_rows += 1;
3000 if row_has_vector
3001 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3002 {
3003 if let Ok(Some(found)) = self.point_lookup_in_tx(
3004 tx,
3005 &row.table,
3006 &row.natural_key.column,
3007 &row.natural_key.value,
3008 self.snapshot(),
3009 ) {
3010 vector_row_map.insert(*remote_row_id, found.row_id);
3011 }
3012 vector_row_idx += 1;
3013 }
3014 }
3015 Err(err) => {
3016 if is_fatal_sync_apply_error(&err) {
3017 return Err(err);
3018 }
3019 result.skipped_rows += 1;
3020 if row_has_vector
3021 && let Some(remote_row_id) = vector_row_ids.get(vector_row_idx)
3022 {
3023 failed_row_ids.insert(*remote_row_id);
3024 vector_row_idx += 1;
3025 }
3026 if let Some(last) = result.conflicts.last_mut() {
3027 last.reason = Some(format!("state_machine_or_constraint: {err}"));
3028 }
3029 }
3030 }
3031 }
3032 }
3033
3034 self.commit_with_source(tx, CommitSource::SyncPull)?;
3035 tx = self.begin();
3036 }
3037
3038 for edge in changes.edges {
3039 let is_delete = matches!(edge.properties.get("__deleted"), Some(Value::Bool(true)));
3040 if is_delete {
3041 let _ = self.delete_edge(tx, edge.source, edge.target, &edge.edge_type);
3042 } else {
3043 let _ = self.insert_edge(
3044 tx,
3045 edge.source,
3046 edge.target,
3047 edge.edge_type,
3048 edge.properties,
3049 );
3050 }
3051 }
3052
3053 for vector in changes.vectors {
3054 if failed_row_ids.contains(&vector.row_id) {
3055 continue; }
3057 let local_row_id = vector_row_map
3058 .get(&vector.row_id)
3059 .copied()
3060 .unwrap_or(vector.row_id);
3061 if vector.vector.is_empty() {
3062 let _ = self.vector.delete_vector(tx, local_row_id);
3063 } else {
3064 if self.has_live_vector(local_row_id, self.snapshot()) {
3065 let _ = self.delete_vector(tx, local_row_id);
3066 }
3067 if let Err(err) = self.insert_vector(tx, local_row_id, vector.vector)
3068 && is_fatal_sync_apply_error(&err)
3069 {
3070 return Err(err);
3071 }
3072 }
3073 }
3074
3075 self.commit_with_source(tx, CommitSource::SyncPull)?;
3076 result.new_lsn = self.current_lsn();
3077 Ok(result)
3078 }
3079
3080 fn row_change_values(
3081 &self,
3082 table: &str,
3083 row_id: RowId,
3084 ) -> Option<(NaturalKey, HashMap<String, Value>)> {
3085 let tables = self.relational_store.tables.read();
3086 let meta = self.relational_store.table_meta.read();
3087 let rows = tables.get(table)?;
3088 let row = rows.iter().find(|r| r.row_id == row_id)?;
3089 let key_col = meta
3090 .get(table)
3091 .and_then(|m| m.natural_key_column.clone())
3092 .or_else(|| {
3093 meta.get(table).and_then(|m| {
3094 m.columns
3095 .iter()
3096 .find(|c| c.name == "id" && c.column_type == ColumnType::Uuid)
3097 .map(|_| "id".to_string())
3098 })
3099 })?;
3100
3101 let key_val = row.values.get(&key_col)?.clone();
3102 let values = row
3103 .values
3104 .iter()
3105 .map(|(k, v)| (k.clone(), v.clone()))
3106 .collect::<HashMap<_, _>>();
3107 Some((
3108 NaturalKey {
3109 column: key_col,
3110 value: key_val,
3111 },
3112 values,
3113 ))
3114 }
3115
3116 fn edge_properties(
3117 &self,
3118 source: NodeId,
3119 target: NodeId,
3120 edge_type: &str,
3121 lsn: u64,
3122 ) -> Option<HashMap<String, Value>> {
3123 self.graph_store
3124 .forward_adj
3125 .read()
3126 .get(&source)
3127 .and_then(|entries| {
3128 entries
3129 .iter()
3130 .find(|e| e.target == target && e.edge_type == edge_type && e.lsn == lsn)
3131 .map(|e| e.properties.clone())
3132 })
3133 }
3134
3135 fn vector_for_row_lsn(&self, row_id: RowId, lsn: u64) -> Option<Vec<f32>> {
3136 self.vector_store
3137 .vectors
3138 .read()
3139 .iter()
3140 .find(|v| v.row_id == row_id && v.lsn == lsn)
3141 .map(|v| v.vector.clone())
3142 }
3143}
3144
3145fn strip_internal_row_id(mut qr: QueryResult) -> QueryResult {
3146 if let Some(pos) = qr.columns.iter().position(|c| c == "row_id") {
3147 qr.columns.remove(pos);
3148 for row in &mut qr.rows {
3149 if pos < row.len() {
3150 row.remove(pos);
3151 }
3152 }
3153 }
3154 qr
3155}
3156
3157fn cached_table_meta(
3158 db: &Database,
3159 cache: &mut HashMap<String, Option<TableMeta>>,
3160 table: &str,
3161) -> Option<TableMeta> {
3162 cache
3163 .entry(table.to_string())
3164 .or_insert_with(|| db.table_meta(table))
3165 .clone()
3166}
3167
3168fn cached_visible_rows<'a>(
3169 db: &Database,
3170 cache: &'a mut HashMap<String, Vec<VersionedRow>>,
3171 table: &str,
3172) -> Result<&'a mut Vec<VersionedRow>> {
3173 if !cache.contains_key(table) {
3174 let rows = db.scan(table, db.snapshot())?;
3175 cache.insert(table.to_string(), rows);
3176 }
3177 Ok(cache.get_mut(table).expect("cached visible rows"))
3178}
3179
3180fn cached_point_lookup(
3181 db: &Database,
3182 cache: &mut HashMap<String, Vec<VersionedRow>>,
3183 table: &str,
3184 col: &str,
3185 value: &Value,
3186) -> Result<Option<VersionedRow>> {
3187 let rows = cached_visible_rows(db, cache, table)?;
3188 Ok(rows
3189 .iter()
3190 .find(|r| r.values.get(col) == Some(value))
3191 .cloned())
3192}
3193
3194fn record_cached_insert(
3195 cache: &mut HashMap<String, Vec<VersionedRow>>,
3196 table: &str,
3197 row: VersionedRow,
3198) {
3199 if let Some(rows) = cache.get_mut(table) {
3200 rows.push(row);
3201 }
3202}
3203
3204fn remove_cached_row(cache: &mut HashMap<String, Vec<VersionedRow>>, table: &str, row_id: RowId) {
3205 if let Some(rows) = cache.get_mut(table) {
3206 rows.retain(|row| row.row_id != row_id);
3207 }
3208}
3209
3210fn query_outcome_from_result(result: &Result<QueryResult>) -> QueryOutcome {
3211 match result {
3212 Ok(query_result) => QueryOutcome::Success {
3213 row_count: if query_result.rows.is_empty() {
3214 query_result.rows_affected as usize
3215 } else {
3216 query_result.rows.len()
3217 },
3218 },
3219 Err(error) => QueryOutcome::Error {
3220 error: error.to_string(),
3221 },
3222 }
3223}
3224
3225fn maybe_prebuild_hnsw(vector_store: &VectorStore, accountant: &MemoryAccountant) {
3226 if vector_store.vector_count() >= 1000 {
3227 let entries = vector_store.all_entries();
3228 let dim = vector_store.dimension().unwrap_or(0);
3229 let estimated_bytes = estimate_hnsw_index_bytes(entries.len(), dim);
3230 if accountant
3231 .try_allocate_for(
3232 estimated_bytes,
3233 "vector_index",
3234 "prebuild_hnsw",
3235 "Open the database with a larger MEMORY_LIMIT to enable HNSW indexing.",
3236 )
3237 .is_err()
3238 {
3239 return;
3240 }
3241
3242 let hnsw_opt = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
3243 HnswIndex::new(&entries, dim)
3244 }))
3245 .ok();
3246 if hnsw_opt.is_some() {
3247 vector_store.set_hnsw_bytes(estimated_bytes);
3248 } else {
3249 accountant.release(estimated_bytes);
3250 }
3251 let _ = vector_store.hnsw.set(RwLock::new(hnsw_opt));
3252 }
3253}
3254
3255fn estimate_row_bytes_for_meta(
3256 values: &HashMap<ColName, Value>,
3257 meta: &TableMeta,
3258 include_vectors: bool,
3259) -> usize {
3260 let mut bytes = 96usize;
3261 for column in &meta.columns {
3262 let Some(value) = values.get(&column.name) else {
3263 continue;
3264 };
3265 if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
3266 continue;
3267 }
3268 bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
3269 }
3270 bytes
3271}
3272
3273fn estimate_vector_bytes(vector: &[f32]) -> usize {
3274 24 + vector.len().saturating_mul(std::mem::size_of::<f32>())
3275}
3276
3277fn estimate_edge_bytes(
3278 source: NodeId,
3279 target: NodeId,
3280 edge_type: &str,
3281 properties: &HashMap<String, Value>,
3282) -> usize {
3283 AdjEntry {
3284 source,
3285 target,
3286 edge_type: edge_type.to_string(),
3287 properties: properties.clone(),
3288 created_tx: 0,
3289 deleted_tx: None,
3290 lsn: 0,
3291 }
3292 .estimated_bytes()
3293}
3294
3295fn estimate_hnsw_index_bytes(entry_count: usize, dimension: usize) -> usize {
3296 entry_count
3297 .saturating_mul(dimension)
3298 .saturating_mul(std::mem::size_of::<f32>())
3299 .saturating_mul(3)
3300}
3301
3302impl Drop for Database {
3303 fn drop(&mut self) {
3304 if self.closed.swap(true, Ordering::SeqCst) {
3305 return;
3306 }
3307 let runtime = self.pruning_runtime.get_mut();
3308 runtime.shutdown.store(true, Ordering::SeqCst);
3309 if let Some(handle) = runtime.handle.take() {
3310 let _ = handle.join();
3311 }
3312 self.subscriptions.lock().subscribers.clear();
3313 if let Some(persistence) = &self.persistence {
3314 persistence.close();
3315 }
3316 }
3317}
3318
3319fn sleep_with_shutdown(shutdown: &AtomicBool, interval: Duration) {
3320 let deadline = Instant::now() + interval;
3321 while !shutdown.load(Ordering::SeqCst) {
3322 let now = Instant::now();
3323 if now >= deadline {
3324 break;
3325 }
3326 let remaining = deadline.saturating_duration_since(now);
3327 thread::sleep(remaining.min(Duration::from_millis(50)));
3328 }
3329}
3330
3331fn prune_expired_rows(
3332 relational_store: &Arc<RelationalStore>,
3333 graph_store: &Arc<GraphStore>,
3334 vector_store: &Arc<VectorStore>,
3335 accountant: &MemoryAccountant,
3336 persistence: Option<&Arc<RedbPersistence>>,
3337 sync_watermark: u64,
3338) -> u64 {
3339 let now_millis = current_epoch_millis();
3340 let metas = relational_store.table_meta.read().clone();
3341 let mut pruned_by_table: HashMap<String, Vec<RowId>> = HashMap::new();
3342 let mut pruned_node_ids = HashSet::new();
3343 let mut released_row_bytes = 0usize;
3344
3345 {
3346 let mut tables = relational_store.tables.write();
3347 for (table_name, rows) in tables.iter_mut() {
3348 let Some(meta) = metas.get(table_name) else {
3349 continue;
3350 };
3351 if meta.default_ttl_seconds.is_none() {
3352 continue;
3353 }
3354
3355 rows.retain(|row| {
3356 if !row_is_prunable(row, meta, now_millis, sync_watermark) {
3357 return true;
3358 }
3359
3360 pruned_by_table
3361 .entry(table_name.clone())
3362 .or_default()
3363 .push(row.row_id);
3364 released_row_bytes = released_row_bytes
3365 .saturating_add(estimate_row_bytes_for_meta(&row.values, meta, false));
3366 if let Some(Value::Uuid(id)) = row.values.get("id") {
3367 pruned_node_ids.insert(*id);
3368 }
3369 false
3370 });
3371 }
3372 }
3373
3374 let pruned_row_ids: HashSet<RowId> = pruned_by_table
3375 .values()
3376 .flat_map(|rows| rows.iter().copied())
3377 .collect();
3378 if pruned_row_ids.is_empty() {
3379 return 0;
3380 }
3381
3382 let mut released_vector_bytes = 0usize;
3383 {
3384 let mut vectors = vector_store.vectors.write();
3385 vectors.retain(|entry| {
3386 if pruned_row_ids.contains(&entry.row_id) {
3387 released_vector_bytes =
3388 released_vector_bytes.saturating_add(entry.estimated_bytes());
3389 false
3390 } else {
3391 true
3392 }
3393 });
3394 }
3395 if let Some(hnsw) = vector_store.hnsw.get() {
3396 *hnsw.write() = None;
3397 }
3398
3399 let mut released_edge_bytes = 0usize;
3400 {
3401 let mut forward = graph_store.forward_adj.write();
3402 for entries in forward.values_mut() {
3403 entries.retain(|entry| {
3404 if pruned_node_ids.contains(&entry.source)
3405 || pruned_node_ids.contains(&entry.target)
3406 {
3407 released_edge_bytes =
3408 released_edge_bytes.saturating_add(entry.estimated_bytes());
3409 false
3410 } else {
3411 true
3412 }
3413 });
3414 }
3415 forward.retain(|_, entries| !entries.is_empty());
3416 }
3417 {
3418 let mut reverse = graph_store.reverse_adj.write();
3419 for entries in reverse.values_mut() {
3420 entries.retain(|entry| {
3421 !pruned_node_ids.contains(&entry.source) && !pruned_node_ids.contains(&entry.target)
3422 });
3423 }
3424 reverse.retain(|_, entries| !entries.is_empty());
3425 }
3426
3427 if let Some(persistence) = persistence {
3428 for table_name in pruned_by_table.keys() {
3429 let rows = relational_store
3430 .tables
3431 .read()
3432 .get(table_name)
3433 .cloned()
3434 .unwrap_or_default();
3435 let _ = persistence.rewrite_table_rows(table_name, &rows);
3436 }
3437
3438 let vectors = vector_store.vectors.read().clone();
3439 let edges = graph_store
3440 .forward_adj
3441 .read()
3442 .values()
3443 .flat_map(|entries| entries.iter().cloned())
3444 .collect::<Vec<_>>();
3445 let _ = persistence.rewrite_vectors(&vectors);
3446 let _ = persistence.rewrite_graph_edges(&edges);
3447 }
3448
3449 accountant.release(
3450 released_row_bytes
3451 .saturating_add(released_vector_bytes)
3452 .saturating_add(released_edge_bytes),
3453 );
3454
3455 pruned_row_ids.len() as u64
3456}
3457
3458fn row_is_prunable(
3459 row: &VersionedRow,
3460 meta: &TableMeta,
3461 now_millis: u64,
3462 sync_watermark: u64,
3463) -> bool {
3464 if meta.sync_safe && row.lsn >= sync_watermark {
3465 return false;
3466 }
3467
3468 let Some(default_ttl_seconds) = meta.default_ttl_seconds else {
3469 return false;
3470 };
3471
3472 if let Some(expires_column) = &meta.expires_column {
3473 match row.values.get(expires_column) {
3474 Some(Value::Timestamp(millis)) if *millis == i64::MAX => return false,
3475 Some(Value::Timestamp(millis)) if *millis < 0 => return true,
3476 Some(Value::Timestamp(millis)) => return (*millis as u64) <= now_millis,
3477 Some(Value::Null) | None => {}
3478 Some(_) => {}
3479 }
3480 }
3481
3482 let ttl_millis = default_ttl_seconds.saturating_mul(1000);
3483 row.created_at
3484 .map(|created_at| now_millis.saturating_sub(created_at) > ttl_millis)
3485 .unwrap_or(false)
3486}
3487
3488fn current_epoch_millis() -> u64 {
3489 std::time::SystemTime::now()
3490 .duration_since(std::time::UNIX_EPOCH)
3491 .unwrap_or_default()
3492 .as_millis() as u64
3493}
3494
3495fn max_tx_across_all(
3496 relational: &RelationalStore,
3497 graph: &GraphStore,
3498 vector: &VectorStore,
3499) -> TxId {
3500 let relational_max = relational
3501 .tables
3502 .read()
3503 .values()
3504 .flat_map(|rows| rows.iter())
3505 .flat_map(|row| std::iter::once(row.created_tx).chain(row.deleted_tx))
3506 .max()
3507 .unwrap_or(0);
3508 let graph_max = graph
3509 .forward_adj
3510 .read()
3511 .values()
3512 .flat_map(|entries| entries.iter())
3513 .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
3514 .max()
3515 .unwrap_or(0);
3516 let vector_max = vector
3517 .vectors
3518 .read()
3519 .iter()
3520 .flat_map(|entry| std::iter::once(entry.created_tx).chain(entry.deleted_tx))
3521 .max()
3522 .unwrap_or(0);
3523
3524 relational_max.max(graph_max).max(vector_max)
3525}
3526
3527fn max_lsn_across_all(
3528 relational: &RelationalStore,
3529 graph: &GraphStore,
3530 vector: &VectorStore,
3531) -> u64 {
3532 let relational_max = relational
3533 .tables
3534 .read()
3535 .values()
3536 .flat_map(|rows| rows.iter().map(|row| row.lsn))
3537 .max()
3538 .unwrap_or(0);
3539 let graph_max = graph
3540 .forward_adj
3541 .read()
3542 .values()
3543 .flat_map(|entries| entries.iter().map(|entry| entry.lsn))
3544 .max()
3545 .unwrap_or(0);
3546 let vector_max = vector
3547 .vectors
3548 .read()
3549 .iter()
3550 .map(|entry| entry.lsn)
3551 .max()
3552 .unwrap_or(0);
3553
3554 relational_max.max(graph_max).max(vector_max)
3555}
3556
3557fn persisted_memory_limit(path: &Path) -> Result<Option<usize>> {
3558 if !path.exists() {
3559 return Ok(None);
3560 }
3561 let persistence = RedbPersistence::open(path)?;
3562 let limit = persistence.load_config_value::<usize>("memory_limit")?;
3563 persistence.close();
3564 Ok(limit)
3565}
3566
3567fn is_fatal_sync_apply_error(err: &Error) -> bool {
3568 matches!(
3569 err,
3570 Error::MemoryBudgetExceeded { .. } | Error::DiskBudgetExceeded { .. }
3571 )
3572}
3573
3574fn ddl_change_from_create_table(ct: &CreateTable) -> DdlChange {
3575 DdlChange::CreateTable {
3576 name: ct.name.clone(),
3577 columns: ct
3578 .columns
3579 .iter()
3580 .map(|col| {
3581 (
3582 col.name.clone(),
3583 sql_type_for_ast_column(col, &ct.propagation_rules),
3584 )
3585 })
3586 .collect(),
3587 constraints: create_table_constraints_from_ast(ct),
3588 }
3589}
3590
3591fn ddl_change_from_meta(name: &str, meta: &TableMeta) -> DdlChange {
3592 DdlChange::CreateTable {
3593 name: name.to_string(),
3594 columns: meta
3595 .columns
3596 .iter()
3597 .map(|col| {
3598 (
3599 col.name.clone(),
3600 sql_type_for_meta_column(col, &meta.propagation_rules),
3601 )
3602 })
3603 .collect(),
3604 constraints: create_table_constraints_from_meta(meta),
3605 }
3606}
3607
3608fn sql_type_for_ast(data_type: &DataType) -> String {
3609 match data_type {
3610 DataType::Uuid => "UUID".to_string(),
3611 DataType::Text => "TEXT".to_string(),
3612 DataType::Integer => "INTEGER".to_string(),
3613 DataType::Real => "REAL".to_string(),
3614 DataType::Boolean => "BOOLEAN".to_string(),
3615 DataType::Timestamp => "TIMESTAMP".to_string(),
3616 DataType::Json => "JSON".to_string(),
3617 DataType::Vector(dim) => format!("VECTOR({dim})"),
3618 }
3619}
3620
3621fn sql_type_for_ast_column(
3622 col: &contextdb_parser::ast::ColumnDef,
3623 _rules: &[contextdb_parser::ast::AstPropagationRule],
3624) -> String {
3625 let mut ty = sql_type_for_ast(&col.data_type);
3626 if let Some(reference) = &col.references {
3627 ty.push_str(&format!(
3628 " REFERENCES {}({})",
3629 reference.table, reference.column
3630 ));
3631 for rule in &reference.propagation_rules {
3632 if let contextdb_parser::ast::AstPropagationRule::FkState {
3633 trigger_state,
3634 target_state,
3635 max_depth,
3636 abort_on_failure,
3637 } = rule
3638 {
3639 ty.push_str(&format!(
3640 " ON STATE {} PROPAGATE SET {}",
3641 trigger_state, target_state
3642 ));
3643 if max_depth.unwrap_or(10) != 10 {
3644 ty.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
3645 }
3646 if *abort_on_failure {
3647 ty.push_str(" ABORT ON FAILURE");
3648 }
3649 }
3650 }
3651 }
3652 if col.primary_key {
3653 ty.push_str(" PRIMARY KEY");
3654 }
3655 if !col.nullable && !col.primary_key {
3656 ty.push_str(" NOT NULL");
3657 }
3658 if col.unique {
3659 ty.push_str(" UNIQUE");
3660 }
3661 ty
3662}
3663
3664fn sql_type_for_meta_column(col: &contextdb_core::ColumnDef, rules: &[PropagationRule]) -> String {
3665 let mut ty = match col.column_type {
3666 ColumnType::Integer => "INTEGER".to_string(),
3667 ColumnType::Real => "REAL".to_string(),
3668 ColumnType::Text => "TEXT".to_string(),
3669 ColumnType::Boolean => "BOOLEAN".to_string(),
3670 ColumnType::Json => "JSON".to_string(),
3671 ColumnType::Uuid => "UUID".to_string(),
3672 ColumnType::Vector(dim) => format!("VECTOR({dim})"),
3673 ColumnType::Timestamp => "TIMESTAMP".to_string(),
3674 };
3675
3676 let fk_rules = rules
3677 .iter()
3678 .filter_map(|rule| match rule {
3679 PropagationRule::ForeignKey {
3680 fk_column,
3681 referenced_table,
3682 referenced_column,
3683 trigger_state,
3684 target_state,
3685 max_depth,
3686 abort_on_failure,
3687 } if fk_column == &col.name => Some((
3688 referenced_table,
3689 referenced_column,
3690 trigger_state,
3691 target_state,
3692 *max_depth,
3693 *abort_on_failure,
3694 )),
3695 _ => None,
3696 })
3697 .collect::<Vec<_>>();
3698
3699 if let Some(reference) = &col.references {
3700 ty.push_str(&format!(
3701 " REFERENCES {}({})",
3702 reference.table, reference.column
3703 ));
3704 } else if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
3705 ty.push_str(&format!(
3706 " REFERENCES {}({})",
3707 referenced_table, referenced_column
3708 ));
3709 }
3710
3711 if col.references.is_some() || !fk_rules.is_empty() {
3712 for (_, _, trigger_state, target_state, max_depth, abort_on_failure) in fk_rules {
3713 ty.push_str(&format!(
3714 " ON STATE {} PROPAGATE SET {}",
3715 trigger_state, target_state
3716 ));
3717 if max_depth != 10 {
3718 ty.push_str(&format!(" MAX DEPTH {max_depth}"));
3719 }
3720 if abort_on_failure {
3721 ty.push_str(" ABORT ON FAILURE");
3722 }
3723 }
3724 }
3725 if col.primary_key {
3726 ty.push_str(" PRIMARY KEY");
3727 }
3728 if !col.nullable && !col.primary_key {
3729 ty.push_str(" NOT NULL");
3730 }
3731 if col.unique {
3732 ty.push_str(" UNIQUE");
3733 }
3734 if col.expires {
3735 ty.push_str(" EXPIRES");
3736 }
3737
3738 ty
3739}
3740
3741fn create_table_constraints_from_ast(ct: &CreateTable) -> Vec<String> {
3742 let mut constraints = Vec::new();
3743
3744 if ct.immutable {
3745 constraints.push("IMMUTABLE".to_string());
3746 }
3747
3748 if let Some(sm) = &ct.state_machine {
3749 let transitions = sm
3750 .transitions
3751 .iter()
3752 .map(|(from, tos)| format!("{from} -> [{}]", tos.join(", ")))
3753 .collect::<Vec<_>>()
3754 .join(", ");
3755 constraints.push(format!("STATE MACHINE ({}: {})", sm.column, transitions));
3756 }
3757
3758 if !ct.dag_edge_types.is_empty() {
3759 let edge_types = ct
3760 .dag_edge_types
3761 .iter()
3762 .map(|edge_type| format!("'{edge_type}'"))
3763 .collect::<Vec<_>>()
3764 .join(", ");
3765 constraints.push(format!("DAG({edge_types})"));
3766 }
3767
3768 if let Some(retain) = &ct.retain {
3769 let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(retain.duration_seconds));
3770 if retain.sync_safe {
3771 clause.push_str(" SYNC SAFE");
3772 }
3773 constraints.push(clause);
3774 }
3775
3776 for unique_constraint in &ct.unique_constraints {
3777 constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
3778 }
3779
3780 for rule in &ct.propagation_rules {
3781 match rule {
3782 contextdb_parser::ast::AstPropagationRule::EdgeState {
3783 edge_type,
3784 direction,
3785 trigger_state,
3786 target_state,
3787 max_depth,
3788 abort_on_failure,
3789 } => {
3790 let mut clause = format!(
3791 "PROPAGATE ON EDGE {} {} STATE {} SET {}",
3792 edge_type, direction, trigger_state, target_state
3793 );
3794 if max_depth.unwrap_or(10) != 10 {
3795 clause.push_str(&format!(" MAX DEPTH {}", max_depth.unwrap_or(10)));
3796 }
3797 if *abort_on_failure {
3798 clause.push_str(" ABORT ON FAILURE");
3799 }
3800 constraints.push(clause);
3801 }
3802 contextdb_parser::ast::AstPropagationRule::VectorExclusion { trigger_state } => {
3803 constraints.push(format!(
3804 "PROPAGATE ON STATE {} EXCLUDE VECTOR",
3805 trigger_state
3806 ));
3807 }
3808 contextdb_parser::ast::AstPropagationRule::FkState { .. } => {}
3809 }
3810 }
3811
3812 constraints
3813}
3814
3815fn create_table_constraints_from_meta(meta: &TableMeta) -> Vec<String> {
3816 let mut constraints = Vec::new();
3817
3818 if meta.immutable {
3819 constraints.push("IMMUTABLE".to_string());
3820 }
3821
3822 if let Some(sm) = &meta.state_machine {
3823 let states = sm
3824 .transitions
3825 .iter()
3826 .map(|(from, to)| format!("{from} -> [{}]", to.join(", ")))
3827 .collect::<Vec<_>>()
3828 .join(", ");
3829 constraints.push(format!("STATE MACHINE ({}: {})", sm.column, states));
3830 }
3831
3832 if !meta.dag_edge_types.is_empty() {
3833 let edge_types = meta
3834 .dag_edge_types
3835 .iter()
3836 .map(|edge_type| format!("'{edge_type}'"))
3837 .collect::<Vec<_>>()
3838 .join(", ");
3839 constraints.push(format!("DAG({edge_types})"));
3840 }
3841
3842 if let Some(ttl_seconds) = meta.default_ttl_seconds {
3843 let mut clause = format!("RETAIN {}", ttl_seconds_to_sql(ttl_seconds));
3844 if meta.sync_safe {
3845 clause.push_str(" SYNC SAFE");
3846 }
3847 constraints.push(clause);
3848 }
3849
3850 for unique_constraint in &meta.unique_constraints {
3851 constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
3852 }
3853
3854 for rule in &meta.propagation_rules {
3855 match rule {
3856 PropagationRule::Edge {
3857 edge_type,
3858 direction,
3859 trigger_state,
3860 target_state,
3861 max_depth,
3862 abort_on_failure,
3863 } => {
3864 let dir = match direction {
3865 Direction::Incoming => "INCOMING",
3866 Direction::Outgoing => "OUTGOING",
3867 Direction::Both => "BOTH",
3868 };
3869 let mut clause = format!(
3870 "PROPAGATE ON EDGE {} {} STATE {} SET {}",
3871 edge_type, dir, trigger_state, target_state
3872 );
3873 if *max_depth != 10 {
3874 clause.push_str(&format!(" MAX DEPTH {max_depth}"));
3875 }
3876 if *abort_on_failure {
3877 clause.push_str(" ABORT ON FAILURE");
3878 }
3879 constraints.push(clause);
3880 }
3881 PropagationRule::VectorExclusion { trigger_state } => {
3882 constraints.push(format!(
3883 "PROPAGATE ON STATE {} EXCLUDE VECTOR",
3884 trigger_state
3885 ));
3886 }
3887 PropagationRule::ForeignKey { .. } => {}
3888 }
3889 }
3890
3891 constraints
3892}
3893
3894fn ttl_seconds_to_sql(seconds: u64) -> String {
3895 if seconds.is_multiple_of(24 * 60 * 60) {
3896 format!("{} DAYS", seconds / (24 * 60 * 60))
3897 } else if seconds.is_multiple_of(60 * 60) {
3898 format!("{} HOURS", seconds / (60 * 60))
3899 } else if seconds.is_multiple_of(60) {
3900 format!("{} MINUTES", seconds / 60)
3901 } else {
3902 format!("{seconds} SECONDS")
3903 }
3904}