Skip to main content

dbx_core/storage/
realtime_sync.rs

1//! HTAP 실시간 동기화 설정
2//!
3//! Delta Store에 insert 후 Columnar Cache로 즉시 또는 비동기 배치로 전파하는
4//! 실시간 동기화 모드를 정의합니다.
5//!
6//! # 동기화 모드
7//!
8//! - [`SyncMode::Immediate`] — 모든 insert 후 즉시 동기화
9//! - [`SyncMode::Threshold(n)`] — n행 쌓이면 자동 flush (기존 방식)
10//! - [`SyncMode::AsyncBatch`] — 백그라운드 스레드, max_latency_ms 주기 flush
11
12/// 실시간 동기화 모드
13#[derive(Debug, Clone, PartialEq)]
14pub enum SyncMode {
15    /// 모든 insert 직후 즉시 columnar cache에 반영
16    Immediate,
17    /// Delta 항목 수 >= threshold 이면 자동 flush (기존 방식)
18    Threshold(usize),
19    /// 백그라운드 스레드가 max_latency_ms마다 flush
20    AsyncBatch { max_latency_ms: u64 },
21}
22
23/// HTAP 실시간 동기화 설정
24#[derive(Debug, Clone)]
25pub struct RealtimeSyncConfig {
26    /// 동기화 모드
27    pub mode: SyncMode,
28    /// 배치 크기 (flush 당 최대 처리 행 수)
29    pub batch_size: usize,
30}
31
32impl Default for RealtimeSyncConfig {
33    fn default() -> Self {
34        Self {
35            mode: SyncMode::Threshold(10_000),
36            batch_size: 1_000,
37        }
38    }
39}
40
41impl RealtimeSyncConfig {
42    /// Immediate 동기화 설정 생성
43    pub fn immediate() -> Self {
44        Self {
45            mode: SyncMode::Immediate,
46            batch_size: 1,
47        }
48    }
49
50    /// Threshold 기반 동기화 설정 생성
51    pub fn threshold(n: usize) -> Self {
52        Self {
53            mode: SyncMode::Threshold(n),
54            batch_size: n.min(10_000),
55        }
56    }
57
58    /// 비동기 배치 동기화 설정 생성
59    pub fn async_batch(max_latency_ms: u64, batch_size: usize) -> Self {
60        Self {
61            mode: SyncMode::AsyncBatch { max_latency_ms },
62            batch_size,
63        }
64    }
65
66    /// 현재 설정이 즉시 동기화 모드인지 확인
67    pub fn is_immediate(&self) -> bool {
68        matches!(self.mode, SyncMode::Immediate)
69    }
70
71    /// Delta 행 수가 flush 기준에 도달했는지 확인
72    ///
73    /// `Immediate` → 항상 true  
74    /// `Threshold(n)` → row_count >= n  
75    /// `AsyncBatch` → false (백그라운드 스레드가 결정)  
76    pub fn should_flush(&self, row_count: usize) -> bool {
77        match &self.mode {
78            SyncMode::Immediate => true,
79            SyncMode::Threshold(n) => row_count >= *n,
80            SyncMode::AsyncBatch { .. } => false,
81        }
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88
89    #[test]
90    fn test_default_config() {
91        let config = RealtimeSyncConfig::default();
92        assert_eq!(config.mode, SyncMode::Threshold(10_000));
93        assert_eq!(config.batch_size, 1_000);
94    }
95
96    #[test]
97    fn test_immediate_mode() {
98        let config = RealtimeSyncConfig::immediate();
99        assert!(config.is_immediate());
100        assert!(config.should_flush(0));
101        assert!(config.should_flush(1_000_000));
102    }
103
104    #[test]
105    fn test_threshold_should_flush() {
106        let config = RealtimeSyncConfig::threshold(100);
107        assert!(!config.should_flush(99));
108        assert!(config.should_flush(100));
109        assert!(config.should_flush(500));
110    }
111
112    #[test]
113    fn test_async_batch_never_auto_flush() {
114        let config = RealtimeSyncConfig::async_batch(50, 1_000);
115        assert!(!config.should_flush(0));
116        assert!(!config.should_flush(1_000_000));
117        assert!(!config.is_immediate());
118        assert_eq!(config.mode, SyncMode::AsyncBatch { max_latency_ms: 50 });
119    }
120
121    #[test]
122    fn test_clone_and_debug() {
123        let config = RealtimeSyncConfig::immediate();
124        let cloned = config.clone();
125        assert!(cloned.is_immediate());
126        // Debug 출력 가능 확인
127        let _ = format!("{:?}", cloned);
128    }
129}