Skip to main content

contextdb_engine/
composite_store.rs

1use crate::sync_types::{DdlChange, NaturalKey};
2use contextdb_core::{EdgeType, Lsn, NodeId, Result, RowId, TableName, Value, VectorIndexRef};
3use contextdb_graph::GraphStore;
4use contextdb_relational::RelationalStore;
5use contextdb_tx::{WriteSet, WriteSetApplicator};
6use contextdb_vector::VectorStore;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub enum ChangeLogEntry {
13    RowInsert {
14        table: TableName,
15        row_id: RowId,
16        lsn: Lsn,
17    },
18    RowDelete {
19        table: TableName,
20        row_id: RowId,
21        natural_key: NaturalKey,
22        lsn: Lsn,
23    },
24    EdgeInsert {
25        source: NodeId,
26        target: NodeId,
27        edge_type: EdgeType,
28        lsn: Lsn,
29    },
30    EdgeDelete {
31        source: NodeId,
32        target: NodeId,
33        edge_type: EdgeType,
34        lsn: Lsn,
35    },
36    VectorInsert {
37        index: VectorIndexRef,
38        row_id: RowId,
39        lsn: Lsn,
40    },
41    VectorDelete {
42        index: VectorIndexRef,
43        row_id: RowId,
44        lsn: Lsn,
45    },
46}
47
48impl ChangeLogEntry {
49    pub fn lsn(&self) -> Lsn {
50        match self {
51            ChangeLogEntry::RowInsert { lsn, .. }
52            | ChangeLogEntry::RowDelete { lsn, .. }
53            | ChangeLogEntry::EdgeInsert { lsn, .. }
54            | ChangeLogEntry::EdgeDelete { lsn, .. }
55            | ChangeLogEntry::VectorInsert { lsn, .. }
56            | ChangeLogEntry::VectorDelete { lsn, .. } => *lsn,
57        }
58    }
59}
60
61pub struct CompositeStore {
62    pub relational: Arc<RelationalStore>,
63    pub graph: Arc<GraphStore>,
64    pub vector: Arc<VectorStore>,
65    pub change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
66    pub ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
67}
68
69impl CompositeStore {
70    pub fn new(
71        relational: Arc<RelationalStore>,
72        graph: Arc<GraphStore>,
73        vector: Arc<VectorStore>,
74        change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
75        ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
76    ) -> Self {
77        Self {
78            relational,
79            graph,
80            vector,
81            change_log,
82            ddl_log,
83        }
84    }
85
86    pub(crate) fn build_change_log_entries(&self, ws: &WriteSet) -> Vec<ChangeLogEntry> {
87        let lsn = ws.commit_lsn.unwrap_or(Lsn(0));
88        let mut log_entries = Vec::new();
89
90        for (table, row) in &ws.relational_inserts {
91            log_entries.push(ChangeLogEntry::RowInsert {
92                table: table.clone(),
93                row_id: row.row_id,
94                lsn,
95            });
96        }
97
98        for (table, row_id, _) in &ws.relational_deletes {
99            let natural_key = self
100                .relational
101                .tables
102                .read()
103                .get(table)
104                .and_then(|rows| rows.iter().find(|r| r.row_id == *row_id))
105                .and_then(|row| {
106                    row.values.get("id").cloned().map(|value| NaturalKey {
107                        column: "id".to_string(),
108                        value,
109                    })
110                })
111                .unwrap_or_else(|| NaturalKey {
112                    column: "id".to_string(),
113                    value: Value::Int64(row_id.0 as i64),
114                });
115
116            log_entries.push(ChangeLogEntry::RowDelete {
117                table: table.clone(),
118                row_id: *row_id,
119                natural_key,
120                lsn,
121            });
122        }
123
124        for entry in &ws.adj_inserts {
125            log_entries.push(ChangeLogEntry::EdgeInsert {
126                source: entry.source,
127                target: entry.target,
128                edge_type: entry.edge_type.clone(),
129                lsn,
130            });
131        }
132
133        for (source, edge_type, target, _) in &ws.adj_deletes {
134            log_entries.push(ChangeLogEntry::EdgeDelete {
135                source: *source,
136                target: *target,
137                edge_type: edge_type.clone(),
138                lsn,
139            });
140        }
141
142        for entry in &ws.vector_inserts {
143            log_entries.push(ChangeLogEntry::VectorInsert {
144                index: entry.index.clone(),
145                row_id: entry.row_id,
146                lsn,
147            });
148        }
149
150        for (index, row_id, _) in &ws.vector_deletes {
151            log_entries.push(ChangeLogEntry::VectorDelete {
152                index: index.clone(),
153                row_id: *row_id,
154                lsn,
155            });
156        }
157
158        log_entries
159    }
160}
161
162impl WriteSetApplicator for CompositeStore {
163    fn apply(&self, ws: WriteSet) -> Result<()> {
164        let log_entries = self.build_change_log_entries(&ws);
165
166        self.relational.apply_inserts(ws.relational_inserts);
167        self.relational.apply_deletes(ws.relational_deletes);
168        self.graph.apply_inserts(ws.adj_inserts);
169        self.graph.apply_deletes(ws.adj_deletes);
170        self.vector.apply_inserts(ws.vector_inserts);
171        self.vector.apply_deletes(ws.vector_deletes);
172        self.vector
173            .apply_moves(ws.vector_moves, ws.commit_lsn.unwrap_or(Lsn(0)));
174        self.change_log.write().extend(log_entries);
175        Ok(())
176    }
177
178    fn new_row_id(&self) -> RowId {
179        self.relational.new_row_id()
180    }
181}