ugnos/
storage.rs

1use crate::error::DbError;
2use crate::types::{DataPoint, TagSet, TimeSeriesChunk, Timestamp, Value};
3use std::collections::HashMap;
4use std::sync::{Arc, RwLock};
5
6/// Simple in-memory storage for time series data.
7/// Data is stored per series in TimeSeriesChunk.
8/// Uses RwLock for concurrent read access during queries and exclusive write access during flushes.
9#[derive(Debug, Default)]
10pub struct InMemoryStorage {
11    series_data: HashMap<String, Arc<RwLock<TimeSeriesChunk>>>,
12}
13
14impl InMemoryStorage {
15    /// Appends a batch of data points (flushed from the buffer) to the corresponding series chunk.
16    /// Creates the series chunk if it doesn't exist.
17    /// **Crucially, sorts the chunk by timestamp after appending.**
18    pub fn append_batch(&mut self, data: HashMap<String, Vec<DataPoint>>) -> Result<(), DbError> {
19        for (series_name, points) in data {
20            if points.is_empty() {
21                continue;
22            }
23
24            let chunk_arc = self
25                .series_data
26                .entry(series_name.clone())
27                .or_insert_with(|| Arc::new(RwLock::new(TimeSeriesChunk::default())));
28
29            let mut chunk_guard = chunk_arc.write()?;
30            chunk_guard.append_batch(points);
31
32            // --- Sort the chunk by timestamp after appending --- //
33            // Combine columns into tuples for sorting
34            let mut combined: Vec<_> = chunk_guard
35                .timestamps
36                .iter()
37                .zip(chunk_guard.values.iter())
38                .zip(chunk_guard.tags.iter())
39                .map(|((&ts, &val), tag)| (ts, val, tag.clone())) // Clone tag for ownership
40                .collect();
41
42            // Sort based on timestamp
43            combined.sort_unstable_by_key(|&(ts, _, _)| ts);
44
45            // Clear existing vectors and push sorted data back
46            chunk_guard.timestamps.clear();
47            chunk_guard.values.clear();
48            chunk_guard.tags.clear();
49
50            chunk_guard.timestamps.reserve(combined.len());
51            chunk_guard.values.reserve(combined.len());
52            chunk_guard.tags.reserve(combined.len());
53
54            for (ts, val, tag) in combined {
55                chunk_guard.timestamps.push(ts);
56                chunk_guard.values.push(val);
57                chunk_guard.tags.push(tag);
58            }
59            // --- End sorting --- //
60        }
61        Ok(())
62    }
63
64    /// Appends points directly to a series. Used for recovery.
65    pub fn append_points(&mut self, series: &str, points: Vec<DataPoint>) -> Result<(), DbError> {
66        if points.is_empty() {
67            return Ok(());
68        }
69
70        let chunk_arc = self
71            .series_data
72            .entry(series.to_string())
73            .or_insert_with(|| Arc::new(RwLock::new(TimeSeriesChunk::default())));
74
75        let mut chunk_guard = chunk_arc.write()?;
76        chunk_guard.append_batch(points);
77
78        // Sort the chunk by timestamp
79        let mut combined: Vec<_> = chunk_guard
80            .timestamps
81            .iter()
82            .zip(chunk_guard.values.iter())
83            .zip(chunk_guard.tags.iter())
84            .map(|((&ts, &val), tag)| (ts, val, tag.clone()))
85            .collect();
86
87        combined.sort_unstable_by_key(|&(ts, _, _)| ts);
88
89        chunk_guard.timestamps.clear();
90        chunk_guard.values.clear();
91        chunk_guard.tags.clear();
92
93        chunk_guard.timestamps.reserve(combined.len());
94        chunk_guard.values.reserve(combined.len());
95        chunk_guard.tags.reserve(combined.len());
96
97        for (ts, val, tag) in combined {
98            chunk_guard.timestamps.push(ts);
99            chunk_guard.values.push(val);
100            chunk_guard.tags.push(tag);
101        }
102
103        Ok(())
104    }
105
106    /// Retrieves a read-only reference (via Arc clone) to the chunk for a given series.
107    pub fn get_chunk_for_query(
108        &self,
109        series: &str,
110    ) -> Option<Arc<RwLock<TimeSeriesChunk>>> {
111        self.series_data.get(series).cloned()
112    }
113    
114    /// Returns a reference to all series for snapshot creation
115    pub fn get_all_series(&self) -> &HashMap<String, Arc<RwLock<TimeSeriesChunk>>> {
116        &self.series_data
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use crate::types::TagSet;
124    use std::time::{SystemTime, UNIX_EPOCH};
125    use std::thread;
126
127    fn create_point(ts: Timestamp, val: Value) -> DataPoint {
128        DataPoint {
129            timestamp: ts,
130            value: val,
131            tags: TagSet::new(),
132        }
133    }
134
135    fn create_point_with_tags(ts: Timestamp, val: Value, tags: TagSet) -> DataPoint {
136        DataPoint {
137            timestamp: ts,
138            value: val,
139            tags,
140        }
141    }
142
143    fn get_current_timestamp() -> Timestamp {
144        SystemTime::now()
145            .duration_since(UNIX_EPOCH)
146            .unwrap()
147            .as_nanos() as u64
148    }
149
150    fn create_tags(pairs: &[(&str, &str)]) -> TagSet {
151        pairs
152            .iter()
153            .map(|(k, v)| (k.to_string(), v.to_string()))
154            .collect()
155    }
156
157    #[test]
158    fn test_append_and_sort() {
159        let mut storage = InMemoryStorage::default();
160        let series = "test_sort";
161        let points1 = vec![create_point(100, 1.0), create_point(300, 3.0)];
162        let points2 = vec![create_point(50, 0.5), create_point(200, 2.0)];
163
164        let mut batch1 = HashMap::new();
165        batch1.insert(series.to_string(), points1);
166        storage.append_batch(batch1).unwrap();
167
168        let mut batch2 = HashMap::new();
169        batch2.insert(series.to_string(), points2);
170        storage.append_batch(batch2).unwrap();
171
172        let chunk_arc = storage.get_chunk_for_query(series).unwrap();
173        let chunk_guard = chunk_arc.read().unwrap();
174
175        assert_eq!(chunk_guard.len(), 4);
176        assert_eq!(chunk_guard.timestamps, vec![50, 100, 200, 300]);
177        assert_eq!(chunk_guard.values, vec![0.5, 1.0, 2.0, 3.0]);
178    }
179    
180    #[test]
181    fn test_append_batch_multiple_series() {
182        let mut storage = InMemoryStorage::default();
183        
184        // Create three series with real timestamps
185        let ts1 = get_current_timestamp();
186        let ts2 = ts1 + 100;
187        let ts3 = ts1 + 200;
188        
189        let mut batch = HashMap::new();
190        batch.insert("series1".to_string(), vec![create_point(ts1, 1.0), create_point(ts2, 1.1)]);
191        batch.insert("series2".to_string(), vec![create_point(ts3, 2.0)]);
192        batch.insert("series3".to_string(), vec![create_point(ts2, 3.0), create_point(ts1, 3.1)]);
193        
194        storage.append_batch(batch).unwrap();
195        
196        // Verify series1
197        let chunk_arc = storage.get_chunk_for_query("series1").unwrap();
198        let chunk_guard = chunk_arc.read().unwrap();
199        assert_eq!(chunk_guard.len(), 2);
200        assert_eq!(chunk_guard.timestamps, vec![ts1, ts2]);
201        assert_eq!(chunk_guard.values, vec![1.0, 1.1]);
202        
203        // Verify series2
204        let chunk_arc = storage.get_chunk_for_query("series2").unwrap();
205        let chunk_guard = chunk_arc.read().unwrap();
206        assert_eq!(chunk_guard.len(), 1);
207        assert_eq!(chunk_guard.timestamps, vec![ts3]);
208        assert_eq!(chunk_guard.values, vec![2.0]);
209        
210        // Verify series3 (should be sorted by timestamp)
211        let chunk_arc = storage.get_chunk_for_query("series3").unwrap();
212        let chunk_guard = chunk_arc.read().unwrap();
213        assert_eq!(chunk_guard.len(), 2);
214        assert_eq!(chunk_guard.timestamps, vec![ts1, ts2]); // Sorted
215        assert_eq!(chunk_guard.values, vec![3.1, 3.0]); // Values match sorted order
216    }
217    
218    #[test]
219    fn test_append_points() {
220        let mut storage = InMemoryStorage::default();
221        let series = "test_append_points";
222        
223        // Generate real timestamps with small delays to ensure uniqueness
224        let ts1 = get_current_timestamp();
225        thread::sleep(std::time::Duration::from_nanos(1));
226        let ts2 = get_current_timestamp();
227        thread::sleep(std::time::Duration::from_nanos(1));
228        let ts3 = get_current_timestamp();
229        
230        // Create points with out-of-order timestamps to test sorting
231        let points = vec![
232            create_point(ts2, 2.0),
233            create_point(ts1, 1.0),
234            create_point(ts3, 3.0),
235        ];
236        
237        storage.append_points(series, points).unwrap();
238        
239        // Verify points were added and sorted
240        let chunk_arc = storage.get_chunk_for_query(series).unwrap();
241        let chunk_guard = chunk_arc.read().unwrap();
242        
243        assert_eq!(chunk_guard.len(), 3);
244        assert_eq!(chunk_guard.timestamps, vec![ts1, ts2, ts3]); // Should be sorted
245        assert_eq!(chunk_guard.values, vec![1.0, 2.0, 3.0]); // Values should match the sorted order
246    }
247    
248    #[test]
249    fn test_get_chunk_for_query() {
250        let mut storage = InMemoryStorage::default();
251        let series = "test_get_chunk";
252        
253        // Create and add a point
254        let ts = get_current_timestamp();
255        let points = vec![create_point(ts, 42.0)];
256        
257        let mut batch = HashMap::new();
258        batch.insert(series.to_string(), points);
259        storage.append_batch(batch).unwrap();
260        
261        // Test getting an existing chunk
262        let chunk_opt = storage.get_chunk_for_query(series);
263        assert!(chunk_opt.is_some());
264        
265        let chunk_arc = chunk_opt.unwrap();
266        let chunk_guard = chunk_arc.read().unwrap();
267        assert_eq!(chunk_guard.len(), 1);
268        assert_eq!(chunk_guard.timestamps[0], ts);
269        assert_eq!(chunk_guard.values[0], 42.0);
270        
271        // Test getting a non-existent chunk
272        let non_existent = storage.get_chunk_for_query("non_existent");
273        assert!(non_existent.is_none());
274    }
275    
276    #[test]
277    fn test_get_all_series() {
278        let mut storage = InMemoryStorage::default();
279        
280        // Create three series with real timestamps
281        let ts1 = get_current_timestamp();
282        let ts2 = ts1 + 100;
283        
284        let mut batch = HashMap::new();
285        batch.insert("series1".to_string(), vec![create_point(ts1, 1.0)]);
286        batch.insert("series2".to_string(), vec![create_point(ts2, 2.0)]);
287        
288        storage.append_batch(batch).unwrap();
289        
290        // Get all series
291        let all_series = storage.get_all_series();
292        
293        // Verify the number of series
294        assert_eq!(all_series.len(), 2);
295        
296        // Verify series names
297        assert!(all_series.contains_key("series1"));
298        assert!(all_series.contains_key("series2"));
299        
300        // Verify series contents
301        let series1_arc = all_series.get("series1").unwrap();
302        let series1_guard = series1_arc.read().unwrap();
303        assert_eq!(series1_guard.timestamps[0], ts1);
304        
305        let series2_arc = all_series.get("series2").unwrap();
306        let series2_guard = series2_arc.read().unwrap();
307        assert_eq!(series2_guard.timestamps[0], ts2);
308    }
309    
310    #[test]
311    fn test_append_with_tags() {
312        let mut storage = InMemoryStorage::default();
313        let series = "test_tags";
314        
315        // Create real timestamps
316        let ts1 = get_current_timestamp();
317        thread::sleep(std::time::Duration::from_nanos(1));
318        let ts2 = get_current_timestamp();
319        
320        // Create tags
321        let tags1 = create_tags(&[("region", "us-east"), ("host", "server1")]);
322        let tags2 = create_tags(&[("region", "us-west"), ("host", "server2")]);
323        
324        // Create points with tags
325        let points = vec![
326            create_point_with_tags(ts1, 1.0, tags1.clone()),
327            create_point_with_tags(ts2, 2.0, tags2.clone()),
328        ];
329        
330        let mut batch = HashMap::new();
331        batch.insert(series.to_string(), points);
332        storage.append_batch(batch).unwrap();
333        
334        // Verify points and tags
335        let chunk_arc = storage.get_chunk_for_query(series).unwrap();
336        let chunk_guard = chunk_arc.read().unwrap();
337        
338        assert_eq!(chunk_guard.len(), 2);
339        
340        // Verify tags for first point
341        assert_eq!(chunk_guard.tags[0], tags1);
342        assert_eq!(chunk_guard.tags[0].get("region"), Some(&"us-east".to_string()));
343        
344        // Verify tags for second point
345        assert_eq!(chunk_guard.tags[1], tags2);
346        assert_eq!(chunk_guard.tags[1].get("host"), Some(&"server2".to_string()));
347    }
348    
349    #[test]
350    fn test_empty_batch() {
351        let mut storage = InMemoryStorage::default();
352        
353        // Try to append an empty batch
354        let mut empty_batch = HashMap::new();
355        empty_batch.insert("empty_series".to_string(), Vec::new());
356        
357        storage.append_batch(empty_batch).unwrap();
358        
359        // The series should not be created
360        assert!(storage.get_chunk_for_query("empty_series").is_none());
361    }
362    
363    #[test]
364    fn test_append_out_of_order_points() {
365        let mut storage = InMemoryStorage::default();
366        let series = "out_of_order";
367        
368        // Create timestamps with guaranteed ordering
369        let ts1 = get_current_timestamp();
370        let ts2 = ts1 + 1000;
371        let ts3 = ts1 + 2000;
372        let ts4 = ts1 + 3000;
373        let ts5 = ts1 + 4000;
374        
375        // First batch - in order
376        let mut batch1 = HashMap::new();
377        batch1.insert(series.to_string(), vec![
378            create_point(ts1, 1.0),
379            create_point(ts2, 2.0),
380            create_point(ts3, 3.0),
381        ]);
382        storage.append_batch(batch1).unwrap();
383        
384        // Second batch - mixed order, some before, some after, some between existing points
385        let mut batch2 = HashMap::new();
386        batch2.insert(series.to_string(), vec![
387            create_point(ts5, 5.0), // after existing points
388            create_point(ts2 - 500, 1.5), // between existing points
389            create_point(ts1 - 500, 0.5), // before all existing points
390            create_point(ts4, 4.0), // after existing points
391        ]);
392        storage.append_batch(batch2).unwrap();
393        
394        // Verify all points are stored in correct order
395        let chunk_arc = storage.get_chunk_for_query(series).unwrap();
396        let chunk_guard = chunk_arc.read().unwrap();
397        
398        assert_eq!(chunk_guard.len(), 7);
399        
400        // Expected timestamps in sorted order
401        let expected_ts = vec![ts1 - 500, ts1, ts2 - 500, ts2, ts3, ts4, ts5];
402        let expected_values = vec![0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0];
403        
404        assert_eq!(chunk_guard.timestamps, expected_ts);
405        assert_eq!(chunk_guard.values, expected_values);
406    }
407}
408