1use crate::sync_types::{DdlChange, NaturalKey};
2use contextdb_core::{EdgeType, NodeId, Result, RowId, TableName, Value};
3use contextdb_graph::GraphStore;
4use contextdb_relational::RelationalStore;
5use contextdb_tx::{WriteSet, WriteSetApplicator};
6use contextdb_vector::VectorStore;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub enum ChangeLogEntry {
13 RowInsert {
14 table: TableName,
15 row_id: RowId,
16 lsn: u64,
17 },
18 RowDelete {
19 table: TableName,
20 row_id: RowId,
21 natural_key: NaturalKey,
22 lsn: u64,
23 },
24 EdgeInsert {
25 source: NodeId,
26 target: NodeId,
27 edge_type: EdgeType,
28 lsn: u64,
29 },
30 EdgeDelete {
31 source: NodeId,
32 target: NodeId,
33 edge_type: EdgeType,
34 lsn: u64,
35 },
36 VectorInsert {
37 row_id: RowId,
38 lsn: u64,
39 },
40 VectorDelete {
41 row_id: RowId,
42 lsn: u64,
43 },
44}
45
46impl ChangeLogEntry {
47 pub fn lsn(&self) -> u64 {
48 match self {
49 ChangeLogEntry::RowInsert { lsn, .. }
50 | ChangeLogEntry::RowDelete { lsn, .. }
51 | ChangeLogEntry::EdgeInsert { lsn, .. }
52 | ChangeLogEntry::EdgeDelete { lsn, .. }
53 | ChangeLogEntry::VectorInsert { lsn, .. }
54 | ChangeLogEntry::VectorDelete { lsn, .. } => *lsn,
55 }
56 }
57}
58
59pub struct CompositeStore {
60 pub relational: Arc<RelationalStore>,
61 pub graph: Arc<GraphStore>,
62 pub vector: Arc<VectorStore>,
63 pub change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
64 pub ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
65}
66
67impl CompositeStore {
68 pub fn new(
69 relational: Arc<RelationalStore>,
70 graph: Arc<GraphStore>,
71 vector: Arc<VectorStore>,
72 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
73 ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
74 ) -> Self {
75 Self {
76 relational,
77 graph,
78 vector,
79 change_log,
80 ddl_log,
81 }
82 }
83
84 pub(crate) fn build_change_log_entries(&self, ws: &WriteSet) -> Vec<ChangeLogEntry> {
85 let lsn = ws.commit_lsn.unwrap_or(0);
86 let mut log_entries = Vec::new();
87
88 for (table, row) in &ws.relational_inserts {
89 log_entries.push(ChangeLogEntry::RowInsert {
90 table: table.clone(),
91 row_id: row.row_id,
92 lsn,
93 });
94 }
95
96 for (table, row_id, _) in &ws.relational_deletes {
97 let natural_key = self
98 .relational
99 .tables
100 .read()
101 .get(table)
102 .and_then(|rows| rows.iter().find(|r| r.row_id == *row_id))
103 .and_then(|row| {
104 row.values.get("id").cloned().map(|value| NaturalKey {
105 column: "id".to_string(),
106 value,
107 })
108 })
109 .unwrap_or_else(|| NaturalKey {
110 column: "id".to_string(),
111 value: Value::Int64(*row_id as i64),
112 });
113
114 log_entries.push(ChangeLogEntry::RowDelete {
115 table: table.clone(),
116 row_id: *row_id,
117 natural_key,
118 lsn,
119 });
120 }
121
122 for entry in &ws.adj_inserts {
123 log_entries.push(ChangeLogEntry::EdgeInsert {
124 source: entry.source,
125 target: entry.target,
126 edge_type: entry.edge_type.clone(),
127 lsn,
128 });
129 }
130
131 for (source, edge_type, target, _) in &ws.adj_deletes {
132 log_entries.push(ChangeLogEntry::EdgeDelete {
133 source: *source,
134 target: *target,
135 edge_type: edge_type.clone(),
136 lsn,
137 });
138 }
139
140 for entry in &ws.vector_inserts {
141 log_entries.push(ChangeLogEntry::VectorInsert {
142 row_id: entry.row_id,
143 lsn,
144 });
145 }
146
147 for (row_id, _) in &ws.vector_deletes {
148 log_entries.push(ChangeLogEntry::VectorDelete {
149 row_id: *row_id,
150 lsn,
151 });
152 }
153
154 log_entries
155 }
156}
157
158impl WriteSetApplicator for CompositeStore {
159 fn apply(&self, ws: WriteSet) -> Result<()> {
160 let log_entries = self.build_change_log_entries(&ws);
161
162 self.relational.apply_inserts(ws.relational_inserts);
163 self.relational.apply_deletes(ws.relational_deletes);
164 self.graph.apply_inserts(ws.adj_inserts);
165 self.graph.apply_deletes(ws.adj_deletes);
166 self.vector.apply_inserts(ws.vector_inserts);
167 self.vector.apply_deletes(ws.vector_deletes);
168 self.change_log.write().extend(log_entries);
169 Ok(())
170 }
171
172 fn new_row_id(&self) -> RowId {
173 self.relational.new_row_id()
174 }
175}