use std::{
collections::HashMap,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::{mpsc, RwLock};
use crate::{
error::Result,
store::Store,
streaming::{CDCConfig, RDFEvent, StreamingManager},
};
use oxirs_core::{Quad, Triple};
#[async_trait::async_trait]
pub trait CDCListener: Send + Sync {
async fn on_triple_added(&self, triple: &Triple, graph: Option<&str>);
async fn on_triple_removed(&self, triple: &Triple, graph: Option<&str>);
async fn on_quad_added(&self, quad: &Quad);
async fn on_quad_removed(&self, quad: &Quad);
async fn on_graph_cleared(&self, graph: &str);
async fn on_transaction_start(&self, tx_id: &str);
async fn on_transaction_commit(&self, tx_id: &str);
async fn on_transaction_rollback(&self, tx_id: &str);
}
pub struct CDCManager {
config: CDCConfig,
streaming_manager: Arc<StreamingManager>,
transaction_buffer: Arc<RwLock<HashMap<String, Vec<RDFEvent>>>>,
event_channel: (
mpsc::Sender<CDCEvent>,
Arc<RwLock<mpsc::Receiver<CDCEvent>>>,
),
}
#[derive(Debug, Clone)]
enum CDCEvent {
TripleAdded {
triple: Triple,
graph: Option<String>,
},
TripleRemoved {
triple: Triple,
graph: Option<String>,
},
QuadAdded {
quad: Quad,
},
QuadRemoved {
quad: Quad,
},
GraphCleared {
graph: String,
},
TransactionStart {
tx_id: String,
},
TransactionCommit {
tx_id: String,
},
TransactionRollback {
tx_id: String,
},
}
impl CDCManager {
pub fn new(config: CDCConfig, streaming_manager: Arc<StreamingManager>) -> Self {
let (tx, rx) = mpsc::channel(10000);
Self {
config,
streaming_manager,
transaction_buffer: Arc::new(RwLock::new(HashMap::new())),
event_channel: (tx, Arc::new(RwLock::new(rx))),
}
}
pub async fn start(&self) {
let config = self.config.clone();
let streaming_manager = self.streaming_manager.clone();
let transaction_buffer = self.transaction_buffer.clone();
let receiver = self.event_channel.1.clone();
tokio::spawn(async move {
let mut batch = Vec::new();
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
loop {
tokio::select! {
_ = interval.tick() => {
if !batch.is_empty() && batch.len() >= config.batch_size {
Self::process_batch(&streaming_manager, &batch).await;
batch.clear();
}
}
event = async {
let mut rx = receiver.write().await;
rx.recv().await
} => {
if let Some(event) = event {
match event {
CDCEvent::TransactionStart { tx_id } => {
let mut buffer = transaction_buffer.write().await;
buffer.insert(tx_id, Vec::new());
}
CDCEvent::TransactionCommit { tx_id } => {
let mut buffer = transaction_buffer.write().await;
if let Some(events) = buffer.remove(&tx_id) {
let timestamp = Self::current_timestamp();
let tx_event = RDFEvent::Transaction {
id: tx_id,
events,
timestamp,
};
let _ = streaming_manager.send_event(tx_event).await;
}
}
CDCEvent::TransactionRollback { tx_id } => {
let mut buffer = transaction_buffer.write().await;
buffer.remove(&tx_id);
}
_ => {
if let Some(rdf_event) = Self::convert_to_rdf_event(&event, &config) {
let buffer = transaction_buffer.read().await;
let in_transaction = buffer.keys().next().cloned();
drop(buffer);
if let Some(tx_id) = in_transaction {
let mut buffer = transaction_buffer.write().await;
if let Some(tx_events) = buffer.get_mut(&tx_id) {
tx_events.push(rdf_event);
}
} else {
batch.push(rdf_event);
}
}
}
}
} else {
if !batch.is_empty() {
Self::process_batch(&streaming_manager, &batch).await;
}
break;
}
}
}
}
});
}
async fn process_batch(streaming_manager: &StreamingManager, batch: &[RDFEvent]) {
for event in batch {
if let Err(e) = streaming_manager.send_event(event.clone()).await {
tracing::error!("Failed to send CDC event: {}", e);
}
}
}
fn convert_to_rdf_event(event: &CDCEvent, config: &CDCConfig) -> Option<RDFEvent> {
let timestamp = Self::current_timestamp();
match event {
CDCEvent::TripleAdded { triple, graph } if config.capture_inserts => {
Some(RDFEvent::TripleAdded {
triple: triple.clone(),
graph: graph.clone(),
timestamp,
})
}
CDCEvent::TripleRemoved { triple, graph } if config.capture_deletes => {
Some(RDFEvent::TripleRemoved {
triple: triple.clone(),
graph: graph.clone(),
timestamp,
})
}
CDCEvent::QuadAdded { quad } if config.capture_inserts => Some(RDFEvent::QuadAdded {
quad: quad.clone(),
timestamp,
}),
CDCEvent::QuadRemoved { quad } if config.capture_deletes => {
Some(RDFEvent::QuadRemoved {
quad: quad.clone(),
timestamp,
})
}
CDCEvent::GraphCleared { graph } if config.capture_deletes => {
Some(RDFEvent::GraphCleared {
graph: graph.clone(),
timestamp,
})
}
_ => None,
}
}
fn current_timestamp() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_millis() as i64
}
}
#[async_trait::async_trait]
impl CDCListener for CDCManager {
async fn on_triple_added(&self, triple: &Triple, graph: Option<&str>) {
if self.config.enabled && self.config.capture_inserts {
let event = CDCEvent::TripleAdded {
triple: triple.clone(),
graph: graph.map(|s| s.to_string()),
};
let _ = self.event_channel.0.send(event).await;
}
}
async fn on_triple_removed(&self, triple: &Triple, graph: Option<&str>) {
if self.config.enabled && self.config.capture_deletes {
let event = CDCEvent::TripleRemoved {
triple: triple.clone(),
graph: graph.map(|s| s.to_string()),
};
let _ = self.event_channel.0.send(event).await;
}
}
async fn on_quad_added(&self, quad: &Quad) {
if self.config.enabled && self.config.capture_inserts {
let event = CDCEvent::QuadAdded { quad: quad.clone() };
let _ = self.event_channel.0.send(event).await;
}
}
async fn on_quad_removed(&self, quad: &Quad) {
if self.config.enabled && self.config.capture_deletes {
let event = CDCEvent::QuadRemoved { quad: quad.clone() };
let _ = self.event_channel.0.send(event).await;
}
}
async fn on_graph_cleared(&self, graph: &str) {
if self.config.enabled && self.config.capture_deletes {
let event = CDCEvent::GraphCleared {
graph: graph.to_string(),
};
let _ = self.event_channel.0.send(event).await;
}
}
async fn on_transaction_start(&self, tx_id: &str) {
if self.config.enabled {
let event = CDCEvent::TransactionStart {
tx_id: tx_id.to_string(),
};
let _ = self.event_channel.0.send(event).await;
}
}
async fn on_transaction_commit(&self, tx_id: &str) {
if self.config.enabled {
let event = CDCEvent::TransactionCommit {
tx_id: tx_id.to_string(),
};
let _ = self.event_channel.0.send(event).await;
}
}
async fn on_transaction_rollback(&self, tx_id: &str) {
if self.config.enabled {
let event = CDCEvent::TransactionRollback {
tx_id: tx_id.to_string(),
};
let _ = self.event_channel.0.send(event).await;
}
}
}
pub struct CDCStore {
inner: Store,
cdc_listeners: Arc<RwLock<Vec<Arc<dyn CDCListener>>>>,
}
impl CDCStore {
pub fn new(store: Store) -> Self {
Self {
inner: store,
cdc_listeners: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn add_listener(&self, listener: Arc<dyn CDCListener>) {
let mut listeners = self.cdc_listeners.write().await;
listeners.push(listener);
}
pub async fn clear_listeners(&self) {
let mut listeners = self.cdc_listeners.write().await;
listeners.clear();
}
async fn notify_listeners<F, Fut>(&self, f: F)
where
F: Fn(Arc<dyn CDCListener>) -> Fut,
Fut: std::future::Future<Output = ()>,
{
let listeners = self.cdc_listeners.read().await;
for listener in listeners.iter() {
f(listener.clone()).await;
}
}
pub async fn add_triple(&self, triple: Triple, graph: Option<String>) -> Result<()> {
let triple_clone = triple.clone();
let graph_ref = graph.as_deref();
self.notify_listeners(|listener| {
let triple_ref = &triple_clone;
async move {
listener.on_triple_added(triple_ref, graph_ref).await;
}
})
.await;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_cdc_config() {
let config = CDCConfig::default();
assert!(config.enabled);
assert!(config.capture_inserts);
assert!(config.capture_deletes);
assert!(config.capture_updates);
assert_eq!(config.batch_size, 100);
}
#[test]
fn test_timestamp_generation() {
let ts1 = CDCManager::current_timestamp();
std::thread::sleep(std::time::Duration::from_millis(10));
let ts2 = CDCManager::current_timestamp();
assert!(ts2 > ts1);
}
}