use crate::engine::Database;
use crate::error::DbxResult;
use std::sync::{Arc, mpsc};
use std::thread;
use std::time::Duration;
#[derive(Debug, Clone)]
pub enum StreamEvent {
Insert { key: String, value: Vec<u8> },
Update { key: String, value: Vec<u8> },
Delete { key: String },
}
pub struct StreamIngester {
sender: mpsc::SyncSender<Vec<StreamEvent>>,
_handle: thread::JoinHandle<()>,
}
impl StreamIngester {
pub fn new(db: Arc<Database>, table: &str, batch_size: usize, max_latency: Duration) -> Self {
let (tx, rx) = mpsc::sync_channel::<Vec<StreamEvent>>(batch_size * 4);
let table = table.to_string();
let handle = thread::spawn(move || {
let mut buffer: Vec<StreamEvent> = Vec::with_capacity(batch_size);
let mut deadline = std::time::Instant::now() + max_latency;
loop {
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
match rx.recv_timeout(remaining) {
Ok(events) => buffer.extend(events),
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => {
Self::flush_buffer(&db, &table, &mut buffer);
break;
}
}
if buffer.len() >= batch_size || std::time::Instant::now() >= deadline {
Self::flush_buffer(&db, &table, &mut buffer);
deadline = std::time::Instant::now() + max_latency;
}
}
});
Self {
sender: tx,
_handle: handle,
}
}
fn flush_buffer(db: &Database, table: &str, buffer: &mut Vec<StreamEvent>) {
for event in buffer.drain(..) {
match event {
StreamEvent::Insert { key, value } => {
let _ = db.insert(table, key.as_bytes(), &value);
}
StreamEvent::Update { key, value } => {
let _ = db.insert(table, key.as_bytes(), &value);
}
StreamEvent::Delete { key } => {
let _ = db.delete(table, key.as_bytes());
}
}
}
}
pub fn sender(&self) -> mpsc::SyncSender<Vec<StreamEvent>> {
self.sender.clone()
}
pub fn flush(self) -> DbxResult<()> {
drop(self.sender);
self._handle.join().ok();
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn make_db() -> Arc<Database> {
Arc::new(Database::open_in_memory().unwrap())
}
#[test]
fn test_stream_ingester_insert_update_delete() {
let db = make_db();
let ingester = StreamIngester::new(Arc::clone(&db), "kv", 100, Duration::from_millis(50));
let tx = ingester.sender();
tx.send(vec![
StreamEvent::Insert {
key: "k1".into(),
value: b"v1".to_vec(),
},
StreamEvent::Insert {
key: "k2".into(),
value: b"v2".to_vec(),
},
])
.unwrap();
tx.send(vec![StreamEvent::Update {
key: "k1".into(),
value: b"v1-updated".to_vec(),
}])
.unwrap();
tx.send(vec![StreamEvent::Delete { key: "k2".into() }])
.unwrap();
drop(tx);
ingester.flush().unwrap();
let v1 = db.get("kv", b"k1").unwrap();
assert_eq!(v1, Some(b"v1-updated".to_vec()));
let v2 = db.get("kv", b"k2").unwrap();
assert!(v2.is_none());
}
#[test]
fn test_stream_ingester_high_throughput() {
let db = make_db();
let ingester = StreamIngester::new(
Arc::clone(&db),
"telemetry",
500,
Duration::from_millis(100),
);
let tx = ingester.sender();
for i in 0u32..10 {
let batch: Vec<StreamEvent> = (0..100)
.map(|j| StreamEvent::Insert {
key: format!("key_{}", i * 100 + j),
value: format!("val_{}", i * 100 + j).into_bytes(),
})
.collect();
tx.send(batch).unwrap();
}
drop(tx); ingester.flush().unwrap();
for i in 0..10u32 {
let key = format!("key_{}", i);
assert!(
db.get("telemetry", key.as_bytes()).unwrap().is_some(),
"key_{} should exist",
i
);
}
}
#[test]
fn test_stream_ingester_mixed_dml() {
let db = make_db();
let ingester =
StreamIngester::new(Arc::clone(&db), "inventory", 500, Duration::from_millis(50));
let tx = ingester.sender();
let inserts: Vec<StreamEvent> = (0u32..100)
.map(|i| StreamEvent::Insert {
key: format!("item_{}", i),
value: format!("qty={}", 10).into_bytes(),
})
.collect();
tx.send(inserts).unwrap();
let deletes: Vec<StreamEvent> = (0u32..100)
.filter(|i| i % 2 == 0)
.map(|i| StreamEvent::Delete {
key: format!("item_{}", i),
})
.collect();
tx.send(deletes).unwrap();
drop(tx); ingester.flush().unwrap();
for i in 0u32..100 {
let key = format!("item_{}", i);
let val = db.get("inventory", key.as_bytes()).unwrap();
if i % 2 == 1 {
assert!(val.is_some(), "item_{} should exist", i);
} else {
assert!(val.is_none(), "item_{} should be deleted", i);
}
}
}
}