dbx_core/engine/
stream_ingester.rs1use crate::engine::Database;
29use crate::error::DbxResult;
30use std::sync::{Arc, mpsc};
31use std::thread;
32use std::time::Duration;
33
34#[derive(Debug, Clone)]
44pub enum StreamEvent {
45 Insert { key: String, value: Vec<u8> },
47 Update { key: String, value: Vec<u8> },
49 Delete { key: String },
51}
52
53pub struct StreamIngester {
58 sender: mpsc::SyncSender<Vec<StreamEvent>>,
59 _handle: thread::JoinHandle<()>,
60}
61
62impl StreamIngester {
63 pub fn new(db: Arc<Database>, table: &str, batch_size: usize, max_latency: Duration) -> Self {
72 let (tx, rx) = mpsc::sync_channel::<Vec<StreamEvent>>(batch_size * 4);
73 let table = table.to_string();
74
75 let handle = thread::spawn(move || {
76 let mut buffer: Vec<StreamEvent> = Vec::with_capacity(batch_size);
77 let mut deadline = std::time::Instant::now() + max_latency;
78
79 loop {
80 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
81 match rx.recv_timeout(remaining) {
82 Ok(events) => buffer.extend(events),
83 Err(mpsc::RecvTimeoutError::Timeout) => {}
84 Err(mpsc::RecvTimeoutError::Disconnected) => {
85 Self::flush_buffer(&db, &table, &mut buffer);
87 break;
88 }
89 }
90
91 if buffer.len() >= batch_size || std::time::Instant::now() >= deadline {
92 Self::flush_buffer(&db, &table, &mut buffer);
93 deadline = std::time::Instant::now() + max_latency;
94 }
95 }
96 });
97
98 Self {
99 sender: tx,
100 _handle: handle,
101 }
102 }
103
104 fn flush_buffer(db: &Database, table: &str, buffer: &mut Vec<StreamEvent>) {
108 for event in buffer.drain(..) {
109 match event {
110 StreamEvent::Insert { key, value } => {
111 let _ = db.insert(table, key.as_bytes(), &value);
112 }
113 StreamEvent::Update { key, value } => {
114 let _ = db.insert(table, key.as_bytes(), &value);
116 }
117 StreamEvent::Delete { key } => {
118 let _ = db.delete(table, key.as_bytes());
119 }
120 }
121 }
122 }
123
124 pub fn sender(&self) -> mpsc::SyncSender<Vec<StreamEvent>> {
128 self.sender.clone()
129 }
130
131 pub fn flush(self) -> DbxResult<()> {
135 drop(self.sender);
138 self._handle.join().ok();
139 Ok(())
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use std::time::Duration;
147
148 fn make_db() -> Arc<Database> {
149 Arc::new(Database::open_in_memory().unwrap())
150 }
151
152 #[test]
153 fn test_stream_ingester_insert_update_delete() {
154 let db = make_db();
155 let ingester = StreamIngester::new(Arc::clone(&db), "kv", 100, Duration::from_millis(50));
156 let tx = ingester.sender();
157
158 tx.send(vec![
160 StreamEvent::Insert {
161 key: "k1".into(),
162 value: b"v1".to_vec(),
163 },
164 StreamEvent::Insert {
165 key: "k2".into(),
166 value: b"v2".to_vec(),
167 },
168 ])
169 .unwrap();
170
171 tx.send(vec![StreamEvent::Update {
173 key: "k1".into(),
174 value: b"v1-updated".to_vec(),
175 }])
176 .unwrap();
177
178 tx.send(vec![StreamEvent::Delete { key: "k2".into() }])
180 .unwrap();
181
182 drop(tx);
188
189 ingester.flush().unwrap();
190
191 let v1 = db.get("kv", b"k1").unwrap();
193 assert_eq!(v1, Some(b"v1-updated".to_vec()));
194 let v2 = db.get("kv", b"k2").unwrap();
195 assert!(v2.is_none());
196 }
197
198 #[test]
199 fn test_stream_ingester_high_throughput() {
200 let db = make_db();
201 let ingester = StreamIngester::new(
202 Arc::clone(&db),
203 "telemetry",
204 500,
205 Duration::from_millis(100),
206 );
207 let tx = ingester.sender();
208
209 for i in 0u32..10 {
211 let batch: Vec<StreamEvent> = (0..100)
212 .map(|j| StreamEvent::Insert {
213 key: format!("key_{}", i * 100 + j),
214 value: format!("val_{}", i * 100 + j).into_bytes(),
215 })
216 .collect();
217 tx.send(batch).unwrap();
218 }
219
220 drop(tx); ingester.flush().unwrap();
222
223 for i in 0..10u32 {
225 let key = format!("key_{}", i);
226 assert!(
227 db.get("telemetry", key.as_bytes()).unwrap().is_some(),
228 "key_{} should exist",
229 i
230 );
231 }
232 }
233
234 #[test]
235 fn test_stream_ingester_mixed_dml() {
236 let db = make_db();
237 let ingester =
238 StreamIngester::new(Arc::clone(&db), "inventory", 500, Duration::from_millis(50));
239 let tx = ingester.sender();
240
241 let inserts: Vec<StreamEvent> = (0u32..100)
243 .map(|i| StreamEvent::Insert {
244 key: format!("item_{}", i),
245 value: format!("qty={}", 10).into_bytes(),
246 })
247 .collect();
248 tx.send(inserts).unwrap();
249
250 let deletes: Vec<StreamEvent> = (0u32..100)
252 .filter(|i| i % 2 == 0)
253 .map(|i| StreamEvent::Delete {
254 key: format!("item_{}", i),
255 })
256 .collect();
257 tx.send(deletes).unwrap();
258
259 drop(tx); ingester.flush().unwrap();
261
262 for i in 0u32..100 {
264 let key = format!("item_{}", i);
265 let val = db.get("inventory", key.as_bytes()).unwrap();
266 if i % 2 == 1 {
267 assert!(val.is_some(), "item_{} should exist", i);
268 } else {
269 assert!(val.is_none(), "item_{} should be deleted", i);
270 }
271 }
272 }
273}