Skip to main content

contextdb_engine/
composite_store.rs

1use crate::sync_types::{DdlChange, NaturalKey};
2use contextdb_core::{EdgeType, Lsn, NodeId, Result, RowId, TableName, Value};
3use contextdb_graph::GraphStore;
4use contextdb_relational::RelationalStore;
5use contextdb_tx::{WriteSet, WriteSetApplicator};
6use contextdb_vector::VectorStore;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub enum ChangeLogEntry {
13    RowInsert {
14        table: TableName,
15        row_id: RowId,
16        lsn: Lsn,
17    },
18    RowDelete {
19        table: TableName,
20        row_id: RowId,
21        natural_key: NaturalKey,
22        lsn: Lsn,
23    },
24    EdgeInsert {
25        source: NodeId,
26        target: NodeId,
27        edge_type: EdgeType,
28        lsn: Lsn,
29    },
30    EdgeDelete {
31        source: NodeId,
32        target: NodeId,
33        edge_type: EdgeType,
34        lsn: Lsn,
35    },
36    VectorInsert {
37        row_id: RowId,
38        lsn: Lsn,
39    },
40    VectorDelete {
41        row_id: RowId,
42        lsn: Lsn,
43    },
44}
45
46impl ChangeLogEntry {
47    pub fn lsn(&self) -> Lsn {
48        match self {
49            ChangeLogEntry::RowInsert { lsn, .. }
50            | ChangeLogEntry::RowDelete { lsn, .. }
51            | ChangeLogEntry::EdgeInsert { lsn, .. }
52            | ChangeLogEntry::EdgeDelete { lsn, .. }
53            | ChangeLogEntry::VectorInsert { lsn, .. }
54            | ChangeLogEntry::VectorDelete { lsn, .. } => *lsn,
55        }
56    }
57}
58
59pub struct CompositeStore {
60    pub relational: Arc<RelationalStore>,
61    pub graph: Arc<GraphStore>,
62    pub vector: Arc<VectorStore>,
63    pub change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
64    pub ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
65}
66
67impl CompositeStore {
68    pub fn new(
69        relational: Arc<RelationalStore>,
70        graph: Arc<GraphStore>,
71        vector: Arc<VectorStore>,
72        change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
73        ddl_log: Arc<RwLock<Vec<(Lsn, DdlChange)>>>,
74    ) -> Self {
75        Self {
76            relational,
77            graph,
78            vector,
79            change_log,
80            ddl_log,
81        }
82    }
83
84    pub(crate) fn build_change_log_entries(&self, ws: &WriteSet) -> Vec<ChangeLogEntry> {
85        let lsn = ws.commit_lsn.unwrap_or(Lsn(0));
86        let mut log_entries = Vec::new();
87
88        for (table, row) in &ws.relational_inserts {
89            log_entries.push(ChangeLogEntry::RowInsert {
90                table: table.clone(),
91                row_id: row.row_id,
92                lsn,
93            });
94        }
95
96        for (table, row_id, _) in &ws.relational_deletes {
97            let natural_key = self
98                .relational
99                .tables
100                .read()
101                .get(table)
102                .and_then(|rows| rows.iter().find(|r| r.row_id == *row_id))
103                .and_then(|row| {
104                    row.values.get("id").cloned().map(|value| NaturalKey {
105                        column: "id".to_string(),
106                        value,
107                    })
108                })
109                .unwrap_or_else(|| NaturalKey {
110                    column: "id".to_string(),
111                    value: Value::Int64(row_id.0 as i64),
112                });
113
114            log_entries.push(ChangeLogEntry::RowDelete {
115                table: table.clone(),
116                row_id: *row_id,
117                natural_key,
118                lsn,
119            });
120        }
121
122        for entry in &ws.adj_inserts {
123            log_entries.push(ChangeLogEntry::EdgeInsert {
124                source: entry.source,
125                target: entry.target,
126                edge_type: entry.edge_type.clone(),
127                lsn,
128            });
129        }
130
131        for (source, edge_type, target, _) in &ws.adj_deletes {
132            log_entries.push(ChangeLogEntry::EdgeDelete {
133                source: *source,
134                target: *target,
135                edge_type: edge_type.clone(),
136                lsn,
137            });
138        }
139
140        for entry in &ws.vector_inserts {
141            log_entries.push(ChangeLogEntry::VectorInsert {
142                row_id: entry.row_id,
143                lsn,
144            });
145        }
146
147        for (row_id, _) in &ws.vector_deletes {
148            log_entries.push(ChangeLogEntry::VectorDelete {
149                row_id: *row_id,
150                lsn,
151            });
152        }
153
154        log_entries
155    }
156}
157
158impl WriteSetApplicator for CompositeStore {
159    fn apply(&self, ws: WriteSet) -> Result<()> {
160        let log_entries = self.build_change_log_entries(&ws);
161
162        self.relational.apply_inserts(ws.relational_inserts);
163        self.relational.apply_deletes(ws.relational_deletes);
164        self.graph.apply_inserts(ws.adj_inserts);
165        self.graph.apply_deletes(ws.adj_deletes);
166        self.vector.apply_inserts(ws.vector_inserts);
167        self.vector.apply_deletes(ws.vector_deletes);
168        self.change_log.write().extend(log_entries);
169        Ok(())
170    }
171
172    fn new_row_id(&self) -> RowId {
173        self.relational.new_row_id()
174    }
175}