Skip to main content

ugnos/
storage.rs

1use crate::error::DbError;
2use crate::types::{DataPoint, TimeSeriesChunk};
3use std::collections::HashMap;
4use std::sync::{Arc, RwLock};
5
6/// Simple in-memory storage for time series data.
7/// Data is stored per series in TimeSeriesChunk.
8/// Uses RwLock for concurrent read access during queries and exclusive write access during flushes.
9#[derive(Debug, Default)]
10pub struct InMemoryStorage {
11    series_data: HashMap<String, Arc<RwLock<TimeSeriesChunk>>>,
12}
13
14impl InMemoryStorage {
15    /// Appends a batch of data points (flushed from the buffer) to the corresponding series chunk.
16    /// Creates the series chunk if it doesn't exist.
17    /// **Crucially, sorts the chunk by timestamp after appending.**
18    pub fn append_batch(&mut self, data: HashMap<String, Vec<DataPoint>>) -> Result<(), DbError> {
19        for (series_name, points) in data {
20            if points.is_empty() {
21                continue;
22            }
23
24            let chunk_arc = self
25                .series_data
26                .entry(series_name.clone())
27                .or_insert_with(|| Arc::new(RwLock::new(TimeSeriesChunk::default())));
28
29            let mut chunk_guard = chunk_arc.write()?;
30            chunk_guard.append_batch(points);
31
32            // --- Sort the chunk by timestamp after appending --- //
33            // Combine columns into tuples for sorting
34            let mut combined: Vec<_> = chunk_guard
35                .timestamps
36                .iter()
37                .zip(chunk_guard.values.iter())
38                .zip(chunk_guard.tags.iter())
39                .map(|((&ts, &val), tag)| (ts, val, tag.clone())) // Clone tag for ownership
40                .collect();
41
42            // Sort based on timestamp
43            combined.sort_unstable_by_key(|&(ts, _, _)| ts);
44
45            // Clear existing vectors and push sorted data back
46            chunk_guard.timestamps.clear();
47            chunk_guard.values.clear();
48            chunk_guard.tags.clear();
49
50            chunk_guard.timestamps.reserve(combined.len());
51            chunk_guard.values.reserve(combined.len());
52            chunk_guard.tags.reserve(combined.len());
53
54            for (ts, val, tag) in combined {
55                chunk_guard.timestamps.push(ts);
56                chunk_guard.values.push(val);
57                chunk_guard.tags.push(tag);
58            }
59            // --- End sorting --- //
60        }
61        Ok(())
62    }
63
64    /// Appends points directly to a series. Used for recovery.
65    pub fn append_points(&mut self, series: &str, points: Vec<DataPoint>) -> Result<(), DbError> {
66        if points.is_empty() {
67            return Ok(());
68        }
69
70        let chunk_arc = self
71            .series_data
72            .entry(series.to_string())
73            .or_insert_with(|| Arc::new(RwLock::new(TimeSeriesChunk::default())));
74
75        let mut chunk_guard = chunk_arc.write()?;
76        chunk_guard.append_batch(points);
77
78        // Sort the chunk by timestamp
79        let mut combined: Vec<_> = chunk_guard
80            .timestamps
81            .iter()
82            .zip(chunk_guard.values.iter())
83            .zip(chunk_guard.tags.iter())
84            .map(|((&ts, &val), tag)| (ts, val, tag.clone()))
85            .collect();
86
87        combined.sort_unstable_by_key(|&(ts, _, _)| ts);
88
89        chunk_guard.timestamps.clear();
90        chunk_guard.values.clear();
91        chunk_guard.tags.clear();
92
93        chunk_guard.timestamps.reserve(combined.len());
94        chunk_guard.values.reserve(combined.len());
95        chunk_guard.tags.reserve(combined.len());
96
97        for (ts, val, tag) in combined {
98            chunk_guard.timestamps.push(ts);
99            chunk_guard.values.push(val);
100            chunk_guard.tags.push(tag);
101        }
102
103        Ok(())
104    }
105
106    /// Retrieves a read-only reference (via Arc clone) to the chunk for a given series.
107    pub fn get_chunk_for_query(&self, series: &str) -> Option<Arc<RwLock<TimeSeriesChunk>>> {
108        self.series_data.get(series).cloned()
109    }
110
111    /// Returns a reference to all series for snapshot creation
112    pub fn get_all_series(&self) -> &HashMap<String, Arc<RwLock<TimeSeriesChunk>>> {
113        &self.series_data
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use crate::types::{TagSet, Timestamp, Value};
121    use std::thread;
122    use std::time::{SystemTime, UNIX_EPOCH};
123
124    fn create_point(ts: Timestamp, val: Value) -> DataPoint {
125        DataPoint {
126            timestamp: ts,
127            value: val,
128            tags: TagSet::new(),
129        }
130    }
131
132    fn create_point_with_tags(ts: Timestamp, val: Value, tags: TagSet) -> DataPoint {
133        DataPoint {
134            timestamp: ts,
135            value: val,
136            tags,
137        }
138    }
139
140    fn get_current_timestamp() -> Timestamp {
141        SystemTime::now()
142            .duration_since(UNIX_EPOCH)
143            .unwrap()
144            .as_nanos() as u64
145    }
146
147    fn create_tags(pairs: &[(&str, &str)]) -> TagSet {
148        pairs
149            .iter()
150            .map(|(k, v)| (k.to_string(), v.to_string()))
151            .collect()
152    }
153
154    #[test]
155    fn test_append_and_sort() {
156        let mut storage = InMemoryStorage::default();
157        let series = "test_sort";
158        let points1 = vec![create_point(100, 1.0), create_point(300, 3.0)];
159        let points2 = vec![create_point(50, 0.5), create_point(200, 2.0)];
160
161        let mut batch1 = HashMap::new();
162        batch1.insert(series.to_string(), points1);
163        storage.append_batch(batch1).unwrap();
164
165        let mut batch2 = HashMap::new();
166        batch2.insert(series.to_string(), points2);
167        storage.append_batch(batch2).unwrap();
168
169        let chunk_arc = storage.get_chunk_for_query(series).unwrap();
170        let chunk_guard = chunk_arc.read().unwrap();
171
172        assert_eq!(chunk_guard.len(), 4);
173        assert_eq!(chunk_guard.timestamps, vec![50, 100, 200, 300]);
174        assert_eq!(chunk_guard.values, vec![0.5, 1.0, 2.0, 3.0]);
175    }
176
177    #[test]
178    fn test_append_batch_multiple_series() {
179        let mut storage = InMemoryStorage::default();
180
181        // Create three series with real timestamps
182        let ts1 = get_current_timestamp();
183        let ts2 = ts1 + 100;
184        let ts3 = ts1 + 200;
185
186        let mut batch = HashMap::new();
187        batch.insert(
188            "series1".to_string(),
189            vec![create_point(ts1, 1.0), create_point(ts2, 1.1)],
190        );
191        batch.insert("series2".to_string(), vec![create_point(ts3, 2.0)]);
192        batch.insert(
193            "series3".to_string(),
194            vec![create_point(ts2, 3.0), create_point(ts1, 3.1)],
195        );
196
197        storage.append_batch(batch).unwrap();
198
199        // Verify series1
200        let chunk_arc = storage.get_chunk_for_query("series1").unwrap();
201        let chunk_guard = chunk_arc.read().unwrap();
202        assert_eq!(chunk_guard.len(), 2);
203        assert_eq!(chunk_guard.timestamps, vec![ts1, ts2]);
204        assert_eq!(chunk_guard.values, vec![1.0, 1.1]);
205
206        // Verify series2
207        let chunk_arc = storage.get_chunk_for_query("series2").unwrap();
208        let chunk_guard = chunk_arc.read().unwrap();
209        assert_eq!(chunk_guard.len(), 1);
210        assert_eq!(chunk_guard.timestamps, vec![ts3]);
211        assert_eq!(chunk_guard.values, vec![2.0]);
212
213        // Verify series3 (should be sorted by timestamp)
214        let chunk_arc = storage.get_chunk_for_query("series3").unwrap();
215        let chunk_guard = chunk_arc.read().unwrap();
216        assert_eq!(chunk_guard.len(), 2);
217        assert_eq!(chunk_guard.timestamps, vec![ts1, ts2]); // Sorted
218        assert_eq!(chunk_guard.values, vec![3.1, 3.0]); // Values match sorted order
219    }
220
221    #[test]
222    fn test_append_points() {
223        let mut storage = InMemoryStorage::default();
224        let series = "test_append_points";
225
226        // Generate real timestamps with small delays to ensure uniqueness
227        let ts1 = get_current_timestamp();
228        thread::sleep(std::time::Duration::from_nanos(1));
229        let ts2 = get_current_timestamp();
230        thread::sleep(std::time::Duration::from_nanos(1));
231        let ts3 = get_current_timestamp();
232
233        // Create points with out-of-order timestamps to test sorting
234        let points = vec![
235            create_point(ts2, 2.0),
236            create_point(ts1, 1.0),
237            create_point(ts3, 3.0),
238        ];
239
240        storage.append_points(series, points).unwrap();
241
242        // Verify points were added and sorted
243        let chunk_arc = storage.get_chunk_for_query(series).unwrap();
244        let chunk_guard = chunk_arc.read().unwrap();
245
246        assert_eq!(chunk_guard.len(), 3);
247        assert_eq!(chunk_guard.timestamps, vec![ts1, ts2, ts3]); // Should be sorted
248        assert_eq!(chunk_guard.values, vec![1.0, 2.0, 3.0]); // Values should match the sorted order
249    }
250
251    #[test]
252    fn test_get_chunk_for_query() {
253        let mut storage = InMemoryStorage::default();
254        let series = "test_get_chunk";
255
256        // Create and add a point
257        let ts = get_current_timestamp();
258        let points = vec![create_point(ts, 42.0)];
259
260        let mut batch = HashMap::new();
261        batch.insert(series.to_string(), points);
262        storage.append_batch(batch).unwrap();
263
264        // Test getting an existing chunk
265        let chunk_opt = storage.get_chunk_for_query(series);
266        assert!(chunk_opt.is_some());
267
268        let chunk_arc = chunk_opt.unwrap();
269        let chunk_guard = chunk_arc.read().unwrap();
270        assert_eq!(chunk_guard.len(), 1);
271        assert_eq!(chunk_guard.timestamps[0], ts);
272        assert_eq!(chunk_guard.values[0], 42.0);
273
274        // Test getting a non-existent chunk
275        let non_existent = storage.get_chunk_for_query("non_existent");
276        assert!(non_existent.is_none());
277    }
278
279    #[test]
280    fn test_get_all_series() {
281        let mut storage = InMemoryStorage::default();
282
283        // Create three series with real timestamps
284        let ts1 = get_current_timestamp();
285        let ts2 = ts1 + 100;
286
287        let mut batch = HashMap::new();
288        batch.insert("series1".to_string(), vec![create_point(ts1, 1.0)]);
289        batch.insert("series2".to_string(), vec![create_point(ts2, 2.0)]);
290
291        storage.append_batch(batch).unwrap();
292
293        // Get all series
294        let all_series = storage.get_all_series();
295
296        // Verify the number of series
297        assert_eq!(all_series.len(), 2);
298
299        // Verify series names
300        assert!(all_series.contains_key("series1"));
301        assert!(all_series.contains_key("series2"));
302
303        // Verify series contents
304        let series1_arc = all_series.get("series1").unwrap();
305        let series1_guard = series1_arc.read().unwrap();
306        assert_eq!(series1_guard.timestamps[0], ts1);
307
308        let series2_arc = all_series.get("series2").unwrap();
309        let series2_guard = series2_arc.read().unwrap();
310        assert_eq!(series2_guard.timestamps[0], ts2);
311    }
312
313    #[test]
314    fn test_append_with_tags() {
315        let mut storage = InMemoryStorage::default();
316        let series = "test_tags";
317
318        // Create real timestamps
319        let ts1 = get_current_timestamp();
320        thread::sleep(std::time::Duration::from_nanos(1));
321        let ts2 = get_current_timestamp();
322
323        // Create tags
324        let tags1 = create_tags(&[("region", "us-east"), ("host", "server1")]);
325        let tags2 = create_tags(&[("region", "us-west"), ("host", "server2")]);
326
327        // Create points with tags
328        let points = vec![
329            create_point_with_tags(ts1, 1.0, tags1.clone()),
330            create_point_with_tags(ts2, 2.0, tags2.clone()),
331        ];
332
333        let mut batch = HashMap::new();
334        batch.insert(series.to_string(), points);
335        storage.append_batch(batch).unwrap();
336
337        // Verify points and tags
338        let chunk_arc = storage.get_chunk_for_query(series).unwrap();
339        let chunk_guard = chunk_arc.read().unwrap();
340
341        assert_eq!(chunk_guard.len(), 2);
342
343        // Verify tags for first point
344        assert_eq!(chunk_guard.tags[0], tags1);
345        assert_eq!(
346            chunk_guard.tags[0].get("region"),
347            Some(&"us-east".to_string())
348        );
349
350        // Verify tags for second point
351        assert_eq!(chunk_guard.tags[1], tags2);
352        assert_eq!(
353            chunk_guard.tags[1].get("host"),
354            Some(&"server2".to_string())
355        );
356    }
357
358    #[test]
359    fn test_empty_batch() {
360        let mut storage = InMemoryStorage::default();
361
362        // Try to append an empty batch
363        let mut empty_batch = HashMap::new();
364        empty_batch.insert("empty_series".to_string(), Vec::new());
365
366        storage.append_batch(empty_batch).unwrap();
367
368        // The series should not be created
369        assert!(storage.get_chunk_for_query("empty_series").is_none());
370    }
371
372    #[test]
373    fn test_append_out_of_order_points() {
374        let mut storage = InMemoryStorage::default();
375        let series = "out_of_order";
376
377        // Create timestamps with guaranteed ordering
378        let ts1 = get_current_timestamp();
379        let ts2 = ts1 + 1000;
380        let ts3 = ts1 + 2000;
381        let ts4 = ts1 + 3000;
382        let ts5 = ts1 + 4000;
383
384        // First batch - in order
385        let mut batch1 = HashMap::new();
386        batch1.insert(
387            series.to_string(),
388            vec![
389                create_point(ts1, 1.0),
390                create_point(ts2, 2.0),
391                create_point(ts3, 3.0),
392            ],
393        );
394        storage.append_batch(batch1).unwrap();
395
396        // Second batch - mixed order, some before, some after, some between existing points
397        let mut batch2 = HashMap::new();
398        batch2.insert(
399            series.to_string(),
400            vec![
401                create_point(ts5, 5.0),       // after existing points
402                create_point(ts2 - 500, 1.5), // between existing points
403                create_point(ts1 - 500, 0.5), // before all existing points
404                create_point(ts4, 4.0),       // after existing points
405            ],
406        );
407        storage.append_batch(batch2).unwrap();
408
409        // Verify all points are stored in correct order
410        let chunk_arc = storage.get_chunk_for_query(series).unwrap();
411        let chunk_guard = chunk_arc.read().unwrap();
412
413        assert_eq!(chunk_guard.len(), 7);
414
415        // Expected timestamps in sorted order
416        let expected_ts = vec![ts1 - 500, ts1, ts2 - 500, ts2, ts3, ts4, ts5];
417        let expected_values = vec![0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0];
418
419        assert_eq!(chunk_guard.timestamps, expected_ts);
420        assert_eq!(chunk_guard.values, expected_values);
421    }
422}