oxirs_tsdb/write/
buffer.rs1use crate::error::TsdbResult;
7use crate::series::DataPoint;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Instant;
11use tokio::sync::RwLock;
12
13#[derive(Debug, Clone)]
15pub struct BufferConfig {
16 pub max_points: usize,
18 pub max_age_secs: u64,
20 pub auto_flush: bool,
22}
23
24impl Default for BufferConfig {
25 fn default() -> Self {
26 Self {
27 max_points: 100_000, max_age_secs: 60, auto_flush: true,
30 }
31 }
32}
33
34#[derive(Debug, Clone)]
36struct SeriesBuffer {
37 points: Vec<DataPoint>,
39 first_insert: Instant,
41}
42
43impl SeriesBuffer {
44 fn new() -> Self {
45 Self {
46 points: Vec::new(),
47 first_insert: Instant::now(),
48 }
49 }
50
51 fn push(&mut self, point: DataPoint) {
52 if self.points.is_empty() {
53 self.first_insert = Instant::now();
54 }
55 self.points.push(point);
56 }
57
58 fn age_secs(&self) -> u64 {
59 self.first_insert.elapsed().as_secs()
60 }
61
62 fn len(&self) -> usize {
63 self.points.len()
64 }
65
66 fn is_empty(&self) -> bool {
67 self.points.is_empty()
68 }
69
70 fn drain(&mut self) -> Vec<DataPoint> {
71 let points = std::mem::take(&mut self.points);
72 self.first_insert = Instant::now();
73 points
74 }
75}
76
77pub struct WriteBuffer {
81 config: BufferConfig,
83 buffers: Arc<RwLock<HashMap<u64, SeriesBuffer>>>,
85}
86
87impl WriteBuffer {
88 pub fn new(config: BufferConfig) -> Self {
90 Self {
91 config,
92 buffers: Arc::new(RwLock::new(HashMap::new())),
93 }
94 }
95
96 pub fn with_defaults() -> Self {
98 Self::new(BufferConfig::default())
99 }
100
101 pub async fn insert(&self, series_id: u64, point: DataPoint) -> TsdbResult<bool> {
105 let mut buffers = self.buffers.write().await;
106 let buffer = buffers.entry(series_id).or_insert_with(SeriesBuffer::new);
107
108 buffer.push(point);
109
110 let should_flush =
112 buffer.len() >= self.config.max_points || buffer.age_secs() >= self.config.max_age_secs;
113
114 Ok(should_flush)
115 }
116
117 pub async fn insert_batch(&self, entries: &[(u64, DataPoint)]) -> TsdbResult<bool> {
119 let mut buffers = self.buffers.write().await;
120 let mut should_flush = false;
121
122 for (series_id, point) in entries {
123 let buffer = buffers.entry(*series_id).or_insert_with(SeriesBuffer::new);
124
125 buffer.push(*point);
126
127 if buffer.len() >= self.config.max_points
128 || buffer.age_secs() >= self.config.max_age_secs
129 {
130 should_flush = true;
131 }
132 }
133
134 Ok(should_flush)
135 }
136
137 pub async fn flush_series(&self, series_id: u64) -> TsdbResult<Vec<DataPoint>> {
139 let mut buffers = self.buffers.write().await;
140
141 if let Some(buffer) = buffers.get_mut(&series_id) {
142 Ok(buffer.drain())
143 } else {
144 Ok(Vec::new())
145 }
146 }
147
148 pub async fn flush_all(&self) -> TsdbResult<HashMap<u64, Vec<DataPoint>>> {
152 let mut buffers = self.buffers.write().await;
153 let mut result = HashMap::new();
154
155 for (series_id, buffer) in buffers.iter_mut() {
156 if !buffer.is_empty() {
157 result.insert(*series_id, buffer.drain());
158 }
159 }
160
161 Ok(result)
162 }
163
164 pub async fn stats(&self) -> BufferStats {
166 let buffers = self.buffers.read().await;
167
168 let total_points: usize = buffers.values().map(|b| b.len()).sum();
169 let num_series = buffers.len();
170 let oldest_age = buffers
171 .values()
172 .filter(|b| !b.is_empty())
173 .map(|b| b.age_secs())
174 .max()
175 .unwrap_or(0);
176
177 BufferStats {
178 total_points,
179 num_series,
180 oldest_age_secs: oldest_age,
181 }
182 }
183
184 pub async fn series_needing_flush(&self) -> Vec<u64> {
186 let buffers = self.buffers.read().await;
187
188 buffers
189 .iter()
190 .filter(|(_, buffer)| {
191 buffer.len() >= self.config.max_points
192 || buffer.age_secs() >= self.config.max_age_secs
193 })
194 .map(|(series_id, _)| *series_id)
195 .collect()
196 }
197}
198
199#[derive(Debug, Clone)]
201pub struct BufferStats {
202 pub total_points: usize,
204 pub num_series: usize,
206 pub oldest_age_secs: u64,
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use chrono::Utc;
214
215 #[tokio::test]
216 async fn test_buffer_insert() {
217 let buffer = WriteBuffer::with_defaults();
218
219 let point = DataPoint {
220 timestamp: Utc::now(),
221 value: 22.5,
222 };
223
224 let should_flush = buffer.insert(1, point).await.unwrap();
225 assert!(!should_flush); let stats = buffer.stats().await;
228 assert_eq!(stats.total_points, 1);
229 assert_eq!(stats.num_series, 1);
230 }
231
232 #[tokio::test]
233 async fn test_buffer_flush_on_size() {
234 let config = BufferConfig {
235 max_points: 10,
236 max_age_secs: 3600,
237 auto_flush: false,
238 };
239 let buffer = WriteBuffer::new(config);
240
241 let base_time = Utc::now();
242
243 for i in 0..9 {
245 let point = DataPoint {
246 timestamp: base_time + chrono::Duration::seconds(i),
247 value: i as f64,
248 };
249 let should_flush = buffer.insert(1, point).await.unwrap();
250 assert!(!should_flush);
251 }
252
253 let point = DataPoint {
255 timestamp: base_time + chrono::Duration::seconds(9),
256 value: 9.0,
257 };
258 let should_flush = buffer.insert(1, point).await.unwrap();
259 assert!(should_flush);
260 }
261
262 #[tokio::test]
263 async fn test_buffer_flush_series() {
264 let buffer = WriteBuffer::with_defaults();
265
266 let base_time = Utc::now();
267
268 for i in 0..5 {
270 let point = DataPoint {
271 timestamp: base_time + chrono::Duration::seconds(i),
272 value: i as f64,
273 };
274 buffer.insert(1, point).await.unwrap();
275 }
276
277 for i in 0..3 {
279 let point = DataPoint {
280 timestamp: base_time + chrono::Duration::seconds(i),
281 value: (i + 100) as f64,
282 };
283 buffer.insert(2, point).await.unwrap();
284 }
285
286 let points = buffer.flush_series(1).await.unwrap();
288 assert_eq!(points.len(), 5);
289
290 let stats = buffer.stats().await;
292 assert_eq!(stats.total_points, 3);
293 }
294
295 #[tokio::test]
296 async fn test_buffer_flush_all() {
297 let buffer = WriteBuffer::with_defaults();
298
299 let base_time = Utc::now();
300
301 for series_id in 1_u64..=5_u64 {
303 for i in 0..10_i64 {
304 let point = DataPoint {
305 timestamp: base_time + chrono::Duration::seconds(i),
306 value: (series_id * 100 + i as u64) as f64,
307 };
308 buffer.insert(series_id, point).await.unwrap();
309 }
310 }
311
312 let stats = buffer.stats().await;
313 assert_eq!(stats.total_points, 50); assert_eq!(stats.num_series, 5);
315
316 let flushed = buffer.flush_all().await.unwrap();
318 assert_eq!(flushed.len(), 5);
319 assert_eq!(flushed.get(&1).unwrap().len(), 10);
320
321 let stats = buffer.stats().await;
323 assert_eq!(stats.total_points, 0);
324 }
325
326 #[tokio::test]
327 async fn test_batch_insert() {
328 let buffer = WriteBuffer::with_defaults();
329
330 let base_time = Utc::now();
331 let mut batch = Vec::new();
332
333 for i in 0..100 {
334 let point = DataPoint {
335 timestamp: base_time + chrono::Duration::seconds(i),
336 value: i as f64,
337 };
338 batch.push((1, point));
339 }
340
341 buffer.insert_batch(&batch).await.unwrap();
342
343 let stats = buffer.stats().await;
344 assert_eq!(stats.total_points, 100);
345 }
346
347 #[tokio::test]
348 async fn test_series_needing_flush() {
349 let config = BufferConfig {
350 max_points: 10,
351 max_age_secs: 3600,
352 auto_flush: false,
353 };
354 let buffer = WriteBuffer::new(config);
355
356 let base_time = Utc::now();
357
358 for i in 0..5 {
360 let point = DataPoint {
361 timestamp: base_time + chrono::Duration::seconds(i),
362 value: i as f64,
363 };
364 buffer.insert(1, point).await.unwrap();
365 }
366
367 for i in 0..10_i64 {
369 let point = DataPoint {
370 timestamp: base_time + chrono::Duration::seconds(i),
371 value: i as f64,
372 };
373 buffer.insert(2, point).await.unwrap();
374 }
375
376 let to_flush = buffer.series_needing_flush().await;
377 assert_eq!(to_flush.len(), 1);
378 assert_eq!(to_flush[0], 2);
379 }
380}