oxirs_tsdb/write/
buffer.rs

1//! In-memory write buffer for batching data points
2//!
3//! Buffers incoming data points before compressing and writing to disk.
4//! Provides flush triggers based on size and time.
5
6use crate::error::TsdbResult;
7use crate::series::DataPoint;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Instant;
11use tokio::sync::RwLock;
12
13/// Write buffer configuration
14#[derive(Debug, Clone)]
15pub struct BufferConfig {
16    /// Maximum number of points before auto-flush
17    pub max_points: usize,
18    /// Maximum time before auto-flush (in seconds)
19    pub max_age_secs: u64,
20    /// Enable automatic background flushing
21    pub auto_flush: bool,
22}
23
24impl Default for BufferConfig {
25    fn default() -> Self {
26        Self {
27            max_points: 100_000, // 100K points default
28            max_age_secs: 60,    // 1 minute default
29            auto_flush: true,
30        }
31    }
32}
33
34/// In-memory buffer for a single time series
35#[derive(Debug, Clone)]
36struct SeriesBuffer {
37    /// Buffered data points
38    points: Vec<DataPoint>,
39    /// Time when first point was added
40    first_insert: Instant,
41}
42
43impl SeriesBuffer {
44    fn new() -> Self {
45        Self {
46            points: Vec::new(),
47            first_insert: Instant::now(),
48        }
49    }
50
51    fn push(&mut self, point: DataPoint) {
52        if self.points.is_empty() {
53            self.first_insert = Instant::now();
54        }
55        self.points.push(point);
56    }
57
58    fn age_secs(&self) -> u64 {
59        self.first_insert.elapsed().as_secs()
60    }
61
62    fn len(&self) -> usize {
63        self.points.len()
64    }
65
66    fn is_empty(&self) -> bool {
67        self.points.is_empty()
68    }
69
70    fn drain(&mut self) -> Vec<DataPoint> {
71        let points = std::mem::take(&mut self.points);
72        self.first_insert = Instant::now();
73        points
74    }
75}
76
77/// Write buffer for time-series data
78///
79/// Provides thread-safe buffering with automatic flush triggers.
80pub struct WriteBuffer {
81    /// Configuration
82    config: BufferConfig,
83    /// Per-series buffers (series_id → buffer)
84    buffers: Arc<RwLock<HashMap<u64, SeriesBuffer>>>,
85}
86
87impl WriteBuffer {
88    /// Create new write buffer
89    pub fn new(config: BufferConfig) -> Self {
90        Self {
91            config,
92            buffers: Arc::new(RwLock::new(HashMap::new())),
93        }
94    }
95
96    /// Create with default configuration
97    pub fn with_defaults() -> Self {
98        Self::new(BufferConfig::default())
99    }
100
101    /// Insert a data point into the buffer
102    ///
103    /// Returns true if buffer should be flushed
104    pub async fn insert(&self, series_id: u64, point: DataPoint) -> TsdbResult<bool> {
105        let mut buffers = self.buffers.write().await;
106        let buffer = buffers.entry(series_id).or_insert_with(SeriesBuffer::new);
107
108        buffer.push(point);
109
110        // Check if flush is needed
111        let should_flush =
112            buffer.len() >= self.config.max_points || buffer.age_secs() >= self.config.max_age_secs;
113
114        Ok(should_flush)
115    }
116
117    /// Insert multiple points in a batch
118    pub async fn insert_batch(&self, entries: &[(u64, DataPoint)]) -> TsdbResult<bool> {
119        let mut buffers = self.buffers.write().await;
120        let mut should_flush = false;
121
122        for (series_id, point) in entries {
123            let buffer = buffers.entry(*series_id).or_insert_with(SeriesBuffer::new);
124
125            buffer.push(*point);
126
127            if buffer.len() >= self.config.max_points
128                || buffer.age_secs() >= self.config.max_age_secs
129            {
130                should_flush = true;
131            }
132        }
133
134        Ok(should_flush)
135    }
136
137    /// Flush all buffers for a specific series
138    pub async fn flush_series(&self, series_id: u64) -> TsdbResult<Vec<DataPoint>> {
139        let mut buffers = self.buffers.write().await;
140
141        if let Some(buffer) = buffers.get_mut(&series_id) {
142            Ok(buffer.drain())
143        } else {
144            Ok(Vec::new())
145        }
146    }
147
148    /// Flush all buffers
149    ///
150    /// Returns map of series_id → data points
151    pub async fn flush_all(&self) -> TsdbResult<HashMap<u64, Vec<DataPoint>>> {
152        let mut buffers = self.buffers.write().await;
153        let mut result = HashMap::new();
154
155        for (series_id, buffer) in buffers.iter_mut() {
156            if !buffer.is_empty() {
157                result.insert(*series_id, buffer.drain());
158            }
159        }
160
161        Ok(result)
162    }
163
164    /// Get buffer statistics
165    pub async fn stats(&self) -> BufferStats {
166        let buffers = self.buffers.read().await;
167
168        let total_points: usize = buffers.values().map(|b| b.len()).sum();
169        let num_series = buffers.len();
170        let oldest_age = buffers
171            .values()
172            .filter(|b| !b.is_empty())
173            .map(|b| b.age_secs())
174            .max()
175            .unwrap_or(0);
176
177        BufferStats {
178            total_points,
179            num_series,
180            oldest_age_secs: oldest_age,
181        }
182    }
183
184    /// Check which series need flushing
185    pub async fn series_needing_flush(&self) -> Vec<u64> {
186        let buffers = self.buffers.read().await;
187
188        buffers
189            .iter()
190            .filter(|(_, buffer)| {
191                buffer.len() >= self.config.max_points
192                    || buffer.age_secs() >= self.config.max_age_secs
193            })
194            .map(|(series_id, _)| *series_id)
195            .collect()
196    }
197}
198
199/// Buffer statistics
200#[derive(Debug, Clone)]
201pub struct BufferStats {
202    /// Total buffered data points across all series
203    pub total_points: usize,
204    /// Number of series with buffered data
205    pub num_series: usize,
206    /// Age of oldest buffered data (in seconds)
207    pub oldest_age_secs: u64,
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use chrono::Utc;
214
215    #[tokio::test]
216    async fn test_buffer_insert() {
217        let buffer = WriteBuffer::with_defaults();
218
219        let point = DataPoint {
220            timestamp: Utc::now(),
221            value: 22.5,
222        };
223
224        let should_flush = buffer.insert(1, point).await.unwrap();
225        assert!(!should_flush); // Should not flush with just 1 point
226
227        let stats = buffer.stats().await;
228        assert_eq!(stats.total_points, 1);
229        assert_eq!(stats.num_series, 1);
230    }
231
232    #[tokio::test]
233    async fn test_buffer_flush_on_size() {
234        let config = BufferConfig {
235            max_points: 10,
236            max_age_secs: 3600,
237            auto_flush: false,
238        };
239        let buffer = WriteBuffer::new(config);
240
241        let base_time = Utc::now();
242
243        // Insert 9 points - should not trigger flush
244        for i in 0..9 {
245            let point = DataPoint {
246                timestamp: base_time + chrono::Duration::seconds(i),
247                value: i as f64,
248            };
249            let should_flush = buffer.insert(1, point).await.unwrap();
250            assert!(!should_flush);
251        }
252
253        // Insert 10th point - should trigger flush
254        let point = DataPoint {
255            timestamp: base_time + chrono::Duration::seconds(9),
256            value: 9.0,
257        };
258        let should_flush = buffer.insert(1, point).await.unwrap();
259        assert!(should_flush);
260    }
261
262    #[tokio::test]
263    async fn test_buffer_flush_series() {
264        let buffer = WriteBuffer::with_defaults();
265
266        let base_time = Utc::now();
267
268        // Insert to series 1
269        for i in 0..5 {
270            let point = DataPoint {
271                timestamp: base_time + chrono::Duration::seconds(i),
272                value: i as f64,
273            };
274            buffer.insert(1, point).await.unwrap();
275        }
276
277        // Insert to series 2
278        for i in 0..3 {
279            let point = DataPoint {
280                timestamp: base_time + chrono::Duration::seconds(i),
281                value: (i + 100) as f64,
282            };
283            buffer.insert(2, point).await.unwrap();
284        }
285
286        // Flush only series 1
287        let points = buffer.flush_series(1).await.unwrap();
288        assert_eq!(points.len(), 5);
289
290        // Series 2 should still have data
291        let stats = buffer.stats().await;
292        assert_eq!(stats.total_points, 3);
293    }
294
295    #[tokio::test]
296    async fn test_buffer_flush_all() {
297        let buffer = WriteBuffer::with_defaults();
298
299        let base_time = Utc::now();
300
301        // Insert to multiple series
302        for series_id in 1_u64..=5_u64 {
303            for i in 0..10_i64 {
304                let point = DataPoint {
305                    timestamp: base_time + chrono::Duration::seconds(i),
306                    value: (series_id * 100 + i as u64) as f64,
307                };
308                buffer.insert(series_id, point).await.unwrap();
309            }
310        }
311
312        let stats = buffer.stats().await;
313        assert_eq!(stats.total_points, 50); // 5 series × 10 points
314        assert_eq!(stats.num_series, 5);
315
316        // Flush all
317        let flushed = buffer.flush_all().await.unwrap();
318        assert_eq!(flushed.len(), 5);
319        assert_eq!(flushed.get(&1).unwrap().len(), 10);
320
321        // Buffer should be empty
322        let stats = buffer.stats().await;
323        assert_eq!(stats.total_points, 0);
324    }
325
326    #[tokio::test]
327    async fn test_batch_insert() {
328        let buffer = WriteBuffer::with_defaults();
329
330        let base_time = Utc::now();
331        let mut batch = Vec::new();
332
333        for i in 0..100 {
334            let point = DataPoint {
335                timestamp: base_time + chrono::Duration::seconds(i),
336                value: i as f64,
337            };
338            batch.push((1, point));
339        }
340
341        buffer.insert_batch(&batch).await.unwrap();
342
343        let stats = buffer.stats().await;
344        assert_eq!(stats.total_points, 100);
345    }
346
347    #[tokio::test]
348    async fn test_series_needing_flush() {
349        let config = BufferConfig {
350            max_points: 10,
351            max_age_secs: 3600,
352            auto_flush: false,
353        };
354        let buffer = WriteBuffer::new(config);
355
356        let base_time = Utc::now();
357
358        // Series 1: 5 points (no flush needed)
359        for i in 0..5 {
360            let point = DataPoint {
361                timestamp: base_time + chrono::Duration::seconds(i),
362                value: i as f64,
363            };
364            buffer.insert(1, point).await.unwrap();
365        }
366
367        // Series 2: 10 points (flush needed)
368        for i in 0..10_i64 {
369            let point = DataPoint {
370                timestamp: base_time + chrono::Duration::seconds(i),
371                value: i as f64,
372            };
373            buffer.insert(2, point).await.unwrap();
374        }
375
376        let to_flush = buffer.series_needing_flush().await;
377        assert_eq!(to_flush.len(), 1);
378        assert_eq!(to_flush[0], 2);
379    }
380}