use crate::sync_types::{DdlChange, NaturalKey};
use contextdb_core::{EdgeType, NodeId, Result, RowId, TableName, Value};
use contextdb_graph::GraphStore;
use contextdb_relational::RelationalStore;
use contextdb_tx::{WriteSet, WriteSetApplicator};
use contextdb_vector::VectorStore;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ChangeLogEntry {
RowInsert {
table: TableName,
row_id: RowId,
lsn: u64,
},
RowDelete {
table: TableName,
row_id: RowId,
natural_key: NaturalKey,
lsn: u64,
},
EdgeInsert {
source: NodeId,
target: NodeId,
edge_type: EdgeType,
lsn: u64,
},
EdgeDelete {
source: NodeId,
target: NodeId,
edge_type: EdgeType,
lsn: u64,
},
VectorInsert {
row_id: RowId,
lsn: u64,
},
VectorDelete {
row_id: RowId,
lsn: u64,
},
}
impl ChangeLogEntry {
pub fn lsn(&self) -> u64 {
match self {
ChangeLogEntry::RowInsert { lsn, .. }
| ChangeLogEntry::RowDelete { lsn, .. }
| ChangeLogEntry::EdgeInsert { lsn, .. }
| ChangeLogEntry::EdgeDelete { lsn, .. }
| ChangeLogEntry::VectorInsert { lsn, .. }
| ChangeLogEntry::VectorDelete { lsn, .. } => *lsn,
}
}
}
pub struct CompositeStore {
pub relational: Arc<RelationalStore>,
pub graph: Arc<GraphStore>,
pub vector: Arc<VectorStore>,
pub change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
pub ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
}
impl CompositeStore {
pub fn new(
relational: Arc<RelationalStore>,
graph: Arc<GraphStore>,
vector: Arc<VectorStore>,
change_log: Arc<RwLock<Vec<ChangeLogEntry>>>,
ddl_log: Arc<RwLock<Vec<(u64, DdlChange)>>>,
) -> Self {
Self {
relational,
graph,
vector,
change_log,
ddl_log,
}
}
pub(crate) fn build_change_log_entries(&self, ws: &WriteSet) -> Vec<ChangeLogEntry> {
let lsn = ws.commit_lsn.unwrap_or(0);
let mut log_entries = Vec::new();
for (table, row) in &ws.relational_inserts {
log_entries.push(ChangeLogEntry::RowInsert {
table: table.clone(),
row_id: row.row_id,
lsn,
});
}
for (table, row_id, _) in &ws.relational_deletes {
let natural_key = self
.relational
.tables
.read()
.get(table)
.and_then(|rows| rows.iter().find(|r| r.row_id == *row_id))
.and_then(|row| {
row.values.get("id").cloned().map(|value| NaturalKey {
column: "id".to_string(),
value,
})
})
.unwrap_or_else(|| NaturalKey {
column: "id".to_string(),
value: Value::Int64(*row_id as i64),
});
log_entries.push(ChangeLogEntry::RowDelete {
table: table.clone(),
row_id: *row_id,
natural_key,
lsn,
});
}
for entry in &ws.adj_inserts {
log_entries.push(ChangeLogEntry::EdgeInsert {
source: entry.source,
target: entry.target,
edge_type: entry.edge_type.clone(),
lsn,
});
}
for (source, edge_type, target, _) in &ws.adj_deletes {
log_entries.push(ChangeLogEntry::EdgeDelete {
source: *source,
target: *target,
edge_type: edge_type.clone(),
lsn,
});
}
for entry in &ws.vector_inserts {
log_entries.push(ChangeLogEntry::VectorInsert {
row_id: entry.row_id,
lsn,
});
}
for (row_id, _) in &ws.vector_deletes {
log_entries.push(ChangeLogEntry::VectorDelete {
row_id: *row_id,
lsn,
});
}
log_entries
}
}
impl WriteSetApplicator for CompositeStore {
fn apply(&self, ws: WriteSet) -> Result<()> {
let log_entries = self.build_change_log_entries(&ws);
self.relational.apply_inserts(ws.relational_inserts);
self.relational.apply_deletes(ws.relational_deletes);
self.graph.apply_inserts(ws.adj_inserts);
self.graph.apply_deletes(ws.adj_deletes);
self.vector.apply_inserts(ws.vector_inserts);
self.vector.apply_deletes(ws.vector_deletes);
self.change_log.write().extend(log_entries);
Ok(())
}
fn new_row_id(&self) -> RowId {
self.relational.new_row_id()
}
}