1use crate::sync_types::{DdlChange, NaturalKey};
2use contextdb_core::{EdgeType, Lsn, NodeId, Result, RowId, TableName, Value, VectorIndexRef};
3use contextdb_graph::GraphStore;
4use contextdb_relational::RelationalStore;
5use contextdb_tx::{WriteSet, WriteSetApplicator};
6use contextdb_vector::VectorStore;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub enum ChangeLogEntry {
13 RowInsert {
14 table: TableName,
15 row_id: RowId,
16 lsn: Lsn,
17 },
18 RowDelete {
19 table: TableName,
20 row_id: RowId,
21 natural_key: NaturalKey,
22 lsn: Lsn,
23 },
24 EdgeInsert {
25 source: NodeId,
26 target: NodeId,
27 edge_type: EdgeType,
28 lsn: Lsn,
29 },
30 EdgeDelete {
31 source: NodeId,
32 target: NodeId,
33 edge_type: EdgeType,
34 lsn: Lsn,
35 },
36 VectorInsert {
37 index: VectorIndexRef,
38 row_id: RowId,
39 lsn: Lsn,
40 },
41 VectorDelete {
42 index: VectorIndexRef,
43 row_id: RowId,
44 lsn: Lsn,
45 },
46}
47
48impl ChangeLogEntry {
49 pub fn lsn(&self) -> Lsn {
50 match self {
51 ChangeLogEntry::RowInsert { lsn, .. }
52 | ChangeLogEntry::RowDelete { lsn, .. }
53 | ChangeLogEntry::EdgeInsert { lsn, .. }
54 | ChangeLogEntry::EdgeDelete { lsn, .. }
55 | ChangeLogEntry::VectorInsert { lsn, .. }
56 | ChangeLogEntry::VectorDelete { lsn, .. } => *lsn,
57 }
58 }
59}
60
61pub struct CompositeStore {
62 pub relational: Arc<RelationalStore>,
63 pub graph: Arc<GraphStore>,
64 pub vector: Arc<VectorStore>,
65 pub change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
66 pub ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
67}
68
69impl CompositeStore {
70 pub fn new(
71 relational: Arc<RelationalStore>,
72 graph: Arc<GraphStore>,
73 vector: Arc<VectorStore>,
74 change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
75 ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
76 ) -> Self {
77 Self {
78 relational,
79 graph,
80 vector,
81 change_log,
82 ddl_log,
83 }
84 }
85
86 pub(crate) fn build_change_log_entries(&self, ws: &WriteSet) -> Vec<ChangeLogEntry> {
87 let lsn = ws.commit_lsn.unwrap_or(Lsn(0));
88 let mut log_entries = Vec::new();
89
90 for (table, row) in &ws.relational_inserts {
91 log_entries.push(ChangeLogEntry::RowInsert {
92 table: table.clone(),
93 row_id: row.row_id,
94 lsn,
95 });
96 }
97
98 for (table, row_id, _) in &ws.relational_deletes {
99 let natural_key = self
100 .relational
101 .tables
102 .read()
103 .get(table)
104 .and_then(|rows| rows.iter().find(|r| r.row_id == *row_id))
105 .and_then(|row| {
106 row.values.get("id").cloned().map(|value| NaturalKey {
107 column: "id".to_string(),
108 value,
109 })
110 })
111 .unwrap_or_else(|| NaturalKey {
112 column: "id".to_string(),
113 value: Value::Int64(row_id.0 as i64),
114 });
115
116 log_entries.push(ChangeLogEntry::RowDelete {
117 table: table.clone(),
118 row_id: *row_id,
119 natural_key,
120 lsn,
121 });
122 }
123
124 for entry in &ws.adj_inserts {
125 log_entries.push(ChangeLogEntry::EdgeInsert {
126 source: entry.source,
127 target: entry.target,
128 edge_type: entry.edge_type.clone(),
129 lsn,
130 });
131 }
132
133 for (source, edge_type, target, _) in &ws.adj_deletes {
134 log_entries.push(ChangeLogEntry::EdgeDelete {
135 source: *source,
136 target: *target,
137 edge_type: edge_type.clone(),
138 lsn,
139 });
140 }
141
142 for entry in &ws.vector_inserts {
143 log_entries.push(ChangeLogEntry::VectorInsert {
144 index: entry.index.clone(),
145 row_id: entry.row_id,
146 lsn,
147 });
148 }
149
150 for (index, row_id, _) in &ws.vector_deletes {
151 log_entries.push(ChangeLogEntry::VectorDelete {
152 index: index.clone(),
153 row_id: *row_id,
154 lsn,
155 });
156 }
157
158 log_entries
159 }
160}
161
162impl WriteSetApplicator for CompositeStore {
163 fn apply(&self, ws: WriteSet) -> Result<()> {
164 let log_entries = self.build_change_log_entries(&ws);
165
166 self.relational.apply_inserts(ws.relational_inserts);
167 self.relational.apply_deletes(ws.relational_deletes);
168 self.graph.apply_inserts(ws.adj_inserts);
169 self.graph.apply_deletes(ws.adj_deletes);
170 self.vector.apply_inserts(ws.vector_inserts);
171 self.vector.apply_deletes(ws.vector_deletes);
172 self.vector
173 .apply_moves(ws.vector_moves, ws.commit_lsn.unwrap_or(Lsn(0)));
174 self.change_log.write().extend(log_entries);
175 Ok(())
176 }
177
178 fn new_row_id(&self) -> RowId {
179 self.relational.new_row_id()
180 }
181}