dbx_core/storage/
realtime_sync.rs1#[derive(Debug, Clone, PartialEq)]
14pub enum SyncMode {
15 Immediate,
17 Threshold(usize),
19 AsyncBatch { max_latency_ms: u64 },
21}
22
23#[derive(Debug, Clone)]
25pub struct RealtimeSyncConfig {
26 pub mode: SyncMode,
28 pub batch_size: usize,
30}
31
32impl Default for RealtimeSyncConfig {
33 fn default() -> Self {
34 Self {
35 mode: SyncMode::Threshold(10_000),
36 batch_size: 1_000,
37 }
38 }
39}
40
41impl RealtimeSyncConfig {
42 pub fn immediate() -> Self {
44 Self {
45 mode: SyncMode::Immediate,
46 batch_size: 1,
47 }
48 }
49
50 pub fn threshold(n: usize) -> Self {
52 Self {
53 mode: SyncMode::Threshold(n),
54 batch_size: n.min(10_000),
55 }
56 }
57
58 pub fn async_batch(max_latency_ms: u64, batch_size: usize) -> Self {
60 Self {
61 mode: SyncMode::AsyncBatch { max_latency_ms },
62 batch_size,
63 }
64 }
65
66 pub fn is_immediate(&self) -> bool {
68 matches!(self.mode, SyncMode::Immediate)
69 }
70
71 pub fn should_flush(&self, row_count: usize) -> bool {
77 match &self.mode {
78 SyncMode::Immediate => true,
79 SyncMode::Threshold(n) => row_count >= *n,
80 SyncMode::AsyncBatch { .. } => false,
81 }
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88
89 #[test]
90 fn test_default_config() {
91 let config = RealtimeSyncConfig::default();
92 assert_eq!(config.mode, SyncMode::Threshold(10_000));
93 assert_eq!(config.batch_size, 1_000);
94 }
95
96 #[test]
97 fn test_immediate_mode() {
98 let config = RealtimeSyncConfig::immediate();
99 assert!(config.is_immediate());
100 assert!(config.should_flush(0));
101 assert!(config.should_flush(1_000_000));
102 }
103
104 #[test]
105 fn test_threshold_should_flush() {
106 let config = RealtimeSyncConfig::threshold(100);
107 assert!(!config.should_flush(99));
108 assert!(config.should_flush(100));
109 assert!(config.should_flush(500));
110 }
111
112 #[test]
113 fn test_async_batch_never_auto_flush() {
114 let config = RealtimeSyncConfig::async_batch(50, 1_000);
115 assert!(!config.should_flush(0));
116 assert!(!config.should_flush(1_000_000));
117 assert!(!config.is_immediate());
118 assert_eq!(config.mode, SyncMode::AsyncBatch { max_latency_ms: 50 });
119 }
120
121 #[test]
122 fn test_clone_and_debug() {
123 let config = RealtimeSyncConfig::immediate();
124 let cloned = config.clone();
125 assert!(cloned.is_immediate());
126 let _ = format!("{:?}", cloned);
128 }
129}