Skip to main content

dbx_core/engine/
stream_ingester.rs

1//! 스트리밍 수집 파이프라인 — CDC 스타일 INSERT/UPDATE/DELETE 이벤트 처리
2//!
3//! # 개요
4//!
5//! `StreamIngester`는 채널 기반의 스트리밍 수집 파이프라인을 제공합니다.
6//! Kafka, Kinesis 등 메시지 스트림에서 수신하는 CDC 이벤트(INSERT/UPDATE/DELETE)를
7//! `StreamEvent` enum으로 표현하여 background 스레드가 DBX에 일괄 반영합니다.
8//!
9//! # 사용 예
10//!
11//! ```rust,no_run
12//! use dbx_core::{Database, engine::stream_ingester::{StreamIngester, StreamEvent}};
13//! use std::{sync::Arc, time::Duration};
14//!
15//! let db = Arc::new(Database::open_in_memory().unwrap());
16//! let ingester = StreamIngester::new(Arc::clone(&db), "orders", 1000, Duration::from_millis(100));
17//! let tx = ingester.sender();
18//!
19//! tx.send(vec![
20//!     StreamEvent::Insert { key: "order:1".into(), value: b"[1, \"pending\"]".to_vec() },
21//!     StreamEvent::Update { key: "order:2".into(), value: b"[2, \"shipped\"]".to_vec() },
22//!     StreamEvent::Delete { key: "order:3".into() },
23//! ]).unwrap();
24//!
25//! ingester.flush().unwrap();
26//! ```
27
28use crate::engine::Database;
29use crate::error::DbxResult;
30use std::sync::{Arc, mpsc};
31use std::thread;
32use std::time::Duration;
33
34/// 스트림 이벤트 — INSERT / UPDATE / DELETE를 통합 표현
35///
36/// CDC (Change Data Capture) 및 이벤트 소싱 패턴에서
37/// 스트림으로 전달되는 모든 DML 연산을 지원합니다.
38///
39/// # 주의
40///
41/// `Update`는 DBX의 key 기반 upsert 시맨틱을 활용하므로
42/// 내부적으로 `db.insert()`를 재사용합니다.
43#[derive(Debug, Clone)]
44pub enum StreamEvent {
45    /// 새 레코드 삽입
46    Insert { key: String, value: Vec<u8> },
47    /// 기존 레코드 갱신 (키 기반 upsert)
48    Update { key: String, value: Vec<u8> },
49    /// 레코드 삭제 (키 기반)
50    Delete { key: String },
51}
52
53/// 채널 기반 스트리밍 수집 파이프라인
54///
55/// 백그라운드 스레드가 채널에서 `StreamEvent` 배치를 수신하여
56/// `batch_size` 또는 `max_latency` 조건 충족 시 DBX에 일괄 반영합니다.
57pub struct StreamIngester {
58    sender: mpsc::SyncSender<Vec<StreamEvent>>,
59    _handle: thread::JoinHandle<()>,
60}
61
62impl StreamIngester {
63    /// 스트림 인제스터 생성
64    ///
65    /// # 인수
66    ///
67    /// * `db` - 대상 데이터베이스 (Arc 공유)
68    /// * `table` - 이벤트를 적용할 테이블 이름
69    /// * `batch_size` - 이 이벤트 수에 도달하면 즉시 flush
70    /// * `max_latency` - 버퍼가 차지 않아도 이 시간마다 강제 flush
71    pub fn new(db: Arc<Database>, table: &str, batch_size: usize, max_latency: Duration) -> Self {
72        let (tx, rx) = mpsc::sync_channel::<Vec<StreamEvent>>(batch_size * 4);
73        let table = table.to_string();
74
75        let handle = thread::spawn(move || {
76            let mut buffer: Vec<StreamEvent> = Vec::with_capacity(batch_size);
77            let mut deadline = std::time::Instant::now() + max_latency;
78
79            loop {
80                let remaining = deadline.saturating_duration_since(std::time::Instant::now());
81                match rx.recv_timeout(remaining) {
82                    Ok(events) => buffer.extend(events),
83                    Err(mpsc::RecvTimeoutError::Timeout) => {}
84                    Err(mpsc::RecvTimeoutError::Disconnected) => {
85                        // 채널 닫힘: 남은 버퍼 모두 flush 후 종료
86                        Self::flush_buffer(&db, &table, &mut buffer);
87                        break;
88                    }
89                }
90
91                if buffer.len() >= batch_size || std::time::Instant::now() >= deadline {
92                    Self::flush_buffer(&db, &table, &mut buffer);
93                    deadline = std::time::Instant::now() + max_latency;
94                }
95            }
96        });
97
98        Self {
99            sender: tx,
100            _handle: handle,
101        }
102    }
103
104    /// 버퍼의 이벤트를 DB에 적용
105    ///
106    /// DML 종류에 따라 insert / delete를 선택합니다.
107    fn flush_buffer(db: &Database, table: &str, buffer: &mut Vec<StreamEvent>) {
108        for event in buffer.drain(..) {
109            match event {
110                StreamEvent::Insert { key, value } => {
111                    let _ = db.insert(table, key.as_bytes(), &value);
112                }
113                StreamEvent::Update { key, value } => {
114                    // UPDATE = key 기반 upsert (DBX insert는 덮어쓰기 시맨틱)
115                    let _ = db.insert(table, key.as_bytes(), &value);
116                }
117                StreamEvent::Delete { key } => {
118                    let _ = db.delete(table, key.as_bytes());
119                }
120            }
121        }
122    }
123
124    /// 이벤트 배치 송신용 sender 클론 반환
125    ///
126    /// 여러 스레드에서 동시에 이벤트를 전송할 수 있습니다.
127    pub fn sender(&self) -> mpsc::SyncSender<Vec<StreamEvent>> {
128        self.sender.clone()
129    }
130
131    /// 채널을 닫고, 백그라운드 스레드가 남은 이벤트를 모두 flush한 뒤 종료할 때까지 대기
132    ///
133    /// 이 메서드 호출 후에는 `sender`를 통한 이벤트 전송이 불가능합니다.
134    pub fn flush(self) -> DbxResult<()> {
135        // sender를 drop하면 채널이 닫힘 → 백그라운드 스레드 Disconnected 감지
136        // → 남은 이벤트 flush → 스레드 종료
137        drop(self.sender);
138        self._handle.join().ok();
139        Ok(())
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use std::time::Duration;
147
148    fn make_db() -> Arc<Database> {
149        Arc::new(Database::open_in_memory().unwrap())
150    }
151
152    #[test]
153    fn test_stream_ingester_insert_update_delete() {
154        let db = make_db();
155        let ingester = StreamIngester::new(Arc::clone(&db), "kv", 100, Duration::from_millis(50));
156        let tx = ingester.sender();
157
158        // INSERT 2건
159        tx.send(vec![
160            StreamEvent::Insert {
161                key: "k1".into(),
162                value: b"v1".to_vec(),
163            },
164            StreamEvent::Insert {
165                key: "k2".into(),
166                value: b"v2".to_vec(),
167            },
168        ])
169        .unwrap();
170
171        // UPDATE k1
172        tx.send(vec![StreamEvent::Update {
173            key: "k1".into(),
174            value: b"v1-updated".to_vec(),
175        }])
176        .unwrap();
177
178        // DELETE k2
179        tx.send(vec![StreamEvent::Delete { key: "k2".into() }])
180            .unwrap();
181
182        // ⚠️ [핵심 원인 해결]
183        // `tx`가 아직 살아있으면 백그라운드 스레드는 채널이 닫혔다고 판단하지 않고
184        // recv_timeout 상태에서 무한 대기(Deadlock)에 빠집니다.
185        // `ingester.flush()`가 내부적으로 자신의 sender를 버리지만,
186        // 로컬 변수인 `tx`가 여전히 활성화되어 있기 때문입니다.
187        drop(tx);
188
189        ingester.flush().unwrap();
190
191        // k1은 갱신된 값, k2는 삭제됨
192        let v1 = db.get("kv", b"k1").unwrap();
193        assert_eq!(v1, Some(b"v1-updated".to_vec()));
194        let v2 = db.get("kv", b"k2").unwrap();
195        assert!(v2.is_none());
196    }
197
198    #[test]
199    fn test_stream_ingester_high_throughput() {
200        let db = make_db();
201        let ingester = StreamIngester::new(
202            Arc::clone(&db),
203            "telemetry",
204            500,
205            Duration::from_millis(100),
206        );
207        let tx = ingester.sender();
208
209        // 1,000건 INSERT (10 배치 × 100건)
210        for i in 0u32..10 {
211            let batch: Vec<StreamEvent> = (0..100)
212                .map(|j| StreamEvent::Insert {
213                    key: format!("key_{}", i * 100 + j),
214                    value: format!("val_{}", i * 100 + j).into_bytes(),
215                })
216                .collect();
217            tx.send(batch).unwrap();
218        }
219
220        drop(tx); // Deadlock 방지
221        ingester.flush().unwrap();
222
223        // 샘플링 검증 (key_0 ~ key_9 존재 확인)
224        for i in 0..10u32 {
225            let key = format!("key_{}", i);
226            assert!(
227                db.get("telemetry", key.as_bytes()).unwrap().is_some(),
228                "key_{} should exist",
229                i
230            );
231        }
232    }
233
234    #[test]
235    fn test_stream_ingester_mixed_dml() {
236        let db = make_db();
237        let ingester =
238            StreamIngester::new(Arc::clone(&db), "inventory", 500, Duration::from_millis(50));
239        let tx = ingester.sender();
240
241        // 100건 INSERT
242        let inserts: Vec<StreamEvent> = (0u32..100)
243            .map(|i| StreamEvent::Insert {
244                key: format!("item_{}", i),
245                value: format!("qty={}", 10).into_bytes(),
246            })
247            .collect();
248        tx.send(inserts).unwrap();
249
250        // 짝수 key DELETE (0,2,4,6,8 → 5건)
251        let deletes: Vec<StreamEvent> = (0u32..100)
252            .filter(|i| i % 2 == 0)
253            .map(|i| StreamEvent::Delete {
254                key: format!("item_{}", i),
255            })
256            .collect();
257        tx.send(deletes).unwrap();
258
259        drop(tx); // Deadlock 방지
260        ingester.flush().unwrap();
261
262        // 홀수 key는 존재, 짝수 key는 없어야 함
263        for i in 0u32..100 {
264            let key = format!("item_{}", i);
265            let val = db.get("inventory", key.as_bytes()).unwrap();
266            if i % 2 == 1 {
267                assert!(val.is_some(), "item_{} should exist", i);
268            } else {
269                assert!(val.is_none(), "item_{} should be deleted", i);
270            }
271        }
272    }
273}