1use crate::error::DbError;
2use crate::types::{DataPoint, TagSet, TimeSeriesChunk, Timestamp, Value};
3use std::collections::HashMap;
4use std::sync::{Arc, RwLock};
5
6#[derive(Debug, Default)]
10pub struct InMemoryStorage {
11 series_data: HashMap<String, Arc<RwLock<TimeSeriesChunk>>>,
12}
13
14impl InMemoryStorage {
15 pub fn append_batch(&mut self, data: HashMap<String, Vec<DataPoint>>) -> Result<(), DbError> {
19 for (series_name, points) in data {
20 if points.is_empty() {
21 continue;
22 }
23
24 let chunk_arc = self
25 .series_data
26 .entry(series_name.clone())
27 .or_insert_with(|| Arc::new(RwLock::new(TimeSeriesChunk::default())));
28
29 let mut chunk_guard = chunk_arc.write()?;
30 chunk_guard.append_batch(points);
31
32 let mut combined: Vec<_> = chunk_guard
35 .timestamps
36 .iter()
37 .zip(chunk_guard.values.iter())
38 .zip(chunk_guard.tags.iter())
39 .map(|((&ts, &val), tag)| (ts, val, tag.clone())) .collect();
41
42 combined.sort_unstable_by_key(|&(ts, _, _)| ts);
44
45 chunk_guard.timestamps.clear();
47 chunk_guard.values.clear();
48 chunk_guard.tags.clear();
49
50 chunk_guard.timestamps.reserve(combined.len());
51 chunk_guard.values.reserve(combined.len());
52 chunk_guard.tags.reserve(combined.len());
53
54 for (ts, val, tag) in combined {
55 chunk_guard.timestamps.push(ts);
56 chunk_guard.values.push(val);
57 chunk_guard.tags.push(tag);
58 }
59 }
61 Ok(())
62 }
63
64 pub fn append_points(&mut self, series: &str, points: Vec<DataPoint>) -> Result<(), DbError> {
66 if points.is_empty() {
67 return Ok(());
68 }
69
70 let chunk_arc = self
71 .series_data
72 .entry(series.to_string())
73 .or_insert_with(|| Arc::new(RwLock::new(TimeSeriesChunk::default())));
74
75 let mut chunk_guard = chunk_arc.write()?;
76 chunk_guard.append_batch(points);
77
78 let mut combined: Vec<_> = chunk_guard
80 .timestamps
81 .iter()
82 .zip(chunk_guard.values.iter())
83 .zip(chunk_guard.tags.iter())
84 .map(|((&ts, &val), tag)| (ts, val, tag.clone()))
85 .collect();
86
87 combined.sort_unstable_by_key(|&(ts, _, _)| ts);
88
89 chunk_guard.timestamps.clear();
90 chunk_guard.values.clear();
91 chunk_guard.tags.clear();
92
93 chunk_guard.timestamps.reserve(combined.len());
94 chunk_guard.values.reserve(combined.len());
95 chunk_guard.tags.reserve(combined.len());
96
97 for (ts, val, tag) in combined {
98 chunk_guard.timestamps.push(ts);
99 chunk_guard.values.push(val);
100 chunk_guard.tags.push(tag);
101 }
102
103 Ok(())
104 }
105
106 pub fn get_chunk_for_query(
108 &self,
109 series: &str,
110 ) -> Option<Arc<RwLock<TimeSeriesChunk>>> {
111 self.series_data.get(series).cloned()
112 }
113
114 pub fn get_all_series(&self) -> &HashMap<String, Arc<RwLock<TimeSeriesChunk>>> {
116 &self.series_data
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use crate::types::TagSet;
124 use std::time::{SystemTime, UNIX_EPOCH};
125 use std::thread;
126
127 fn create_point(ts: Timestamp, val: Value) -> DataPoint {
128 DataPoint {
129 timestamp: ts,
130 value: val,
131 tags: TagSet::new(),
132 }
133 }
134
135 fn create_point_with_tags(ts: Timestamp, val: Value, tags: TagSet) -> DataPoint {
136 DataPoint {
137 timestamp: ts,
138 value: val,
139 tags,
140 }
141 }
142
143 fn get_current_timestamp() -> Timestamp {
144 SystemTime::now()
145 .duration_since(UNIX_EPOCH)
146 .unwrap()
147 .as_nanos() as u64
148 }
149
150 fn create_tags(pairs: &[(&str, &str)]) -> TagSet {
151 pairs
152 .iter()
153 .map(|(k, v)| (k.to_string(), v.to_string()))
154 .collect()
155 }
156
157 #[test]
158 fn test_append_and_sort() {
159 let mut storage = InMemoryStorage::default();
160 let series = "test_sort";
161 let points1 = vec![create_point(100, 1.0), create_point(300, 3.0)];
162 let points2 = vec![create_point(50, 0.5), create_point(200, 2.0)];
163
164 let mut batch1 = HashMap::new();
165 batch1.insert(series.to_string(), points1);
166 storage.append_batch(batch1).unwrap();
167
168 let mut batch2 = HashMap::new();
169 batch2.insert(series.to_string(), points2);
170 storage.append_batch(batch2).unwrap();
171
172 let chunk_arc = storage.get_chunk_for_query(series).unwrap();
173 let chunk_guard = chunk_arc.read().unwrap();
174
175 assert_eq!(chunk_guard.len(), 4);
176 assert_eq!(chunk_guard.timestamps, vec![50, 100, 200, 300]);
177 assert_eq!(chunk_guard.values, vec![0.5, 1.0, 2.0, 3.0]);
178 }
179
180 #[test]
181 fn test_append_batch_multiple_series() {
182 let mut storage = InMemoryStorage::default();
183
184 let ts1 = get_current_timestamp();
186 let ts2 = ts1 + 100;
187 let ts3 = ts1 + 200;
188
189 let mut batch = HashMap::new();
190 batch.insert("series1".to_string(), vec![create_point(ts1, 1.0), create_point(ts2, 1.1)]);
191 batch.insert("series2".to_string(), vec![create_point(ts3, 2.0)]);
192 batch.insert("series3".to_string(), vec![create_point(ts2, 3.0), create_point(ts1, 3.1)]);
193
194 storage.append_batch(batch).unwrap();
195
196 let chunk_arc = storage.get_chunk_for_query("series1").unwrap();
198 let chunk_guard = chunk_arc.read().unwrap();
199 assert_eq!(chunk_guard.len(), 2);
200 assert_eq!(chunk_guard.timestamps, vec![ts1, ts2]);
201 assert_eq!(chunk_guard.values, vec![1.0, 1.1]);
202
203 let chunk_arc = storage.get_chunk_for_query("series2").unwrap();
205 let chunk_guard = chunk_arc.read().unwrap();
206 assert_eq!(chunk_guard.len(), 1);
207 assert_eq!(chunk_guard.timestamps, vec![ts3]);
208 assert_eq!(chunk_guard.values, vec![2.0]);
209
210 let chunk_arc = storage.get_chunk_for_query("series3").unwrap();
212 let chunk_guard = chunk_arc.read().unwrap();
213 assert_eq!(chunk_guard.len(), 2);
214 assert_eq!(chunk_guard.timestamps, vec![ts1, ts2]); assert_eq!(chunk_guard.values, vec![3.1, 3.0]); }
217
218 #[test]
219 fn test_append_points() {
220 let mut storage = InMemoryStorage::default();
221 let series = "test_append_points";
222
223 let ts1 = get_current_timestamp();
225 thread::sleep(std::time::Duration::from_nanos(1));
226 let ts2 = get_current_timestamp();
227 thread::sleep(std::time::Duration::from_nanos(1));
228 let ts3 = get_current_timestamp();
229
230 let points = vec![
232 create_point(ts2, 2.0),
233 create_point(ts1, 1.0),
234 create_point(ts3, 3.0),
235 ];
236
237 storage.append_points(series, points).unwrap();
238
239 let chunk_arc = storage.get_chunk_for_query(series).unwrap();
241 let chunk_guard = chunk_arc.read().unwrap();
242
243 assert_eq!(chunk_guard.len(), 3);
244 assert_eq!(chunk_guard.timestamps, vec![ts1, ts2, ts3]); assert_eq!(chunk_guard.values, vec![1.0, 2.0, 3.0]); }
247
248 #[test]
249 fn test_get_chunk_for_query() {
250 let mut storage = InMemoryStorage::default();
251 let series = "test_get_chunk";
252
253 let ts = get_current_timestamp();
255 let points = vec![create_point(ts, 42.0)];
256
257 let mut batch = HashMap::new();
258 batch.insert(series.to_string(), points);
259 storage.append_batch(batch).unwrap();
260
261 let chunk_opt = storage.get_chunk_for_query(series);
263 assert!(chunk_opt.is_some());
264
265 let chunk_arc = chunk_opt.unwrap();
266 let chunk_guard = chunk_arc.read().unwrap();
267 assert_eq!(chunk_guard.len(), 1);
268 assert_eq!(chunk_guard.timestamps[0], ts);
269 assert_eq!(chunk_guard.values[0], 42.0);
270
271 let non_existent = storage.get_chunk_for_query("non_existent");
273 assert!(non_existent.is_none());
274 }
275
276 #[test]
277 fn test_get_all_series() {
278 let mut storage = InMemoryStorage::default();
279
280 let ts1 = get_current_timestamp();
282 let ts2 = ts1 + 100;
283
284 let mut batch = HashMap::new();
285 batch.insert("series1".to_string(), vec![create_point(ts1, 1.0)]);
286 batch.insert("series2".to_string(), vec![create_point(ts2, 2.0)]);
287
288 storage.append_batch(batch).unwrap();
289
290 let all_series = storage.get_all_series();
292
293 assert_eq!(all_series.len(), 2);
295
296 assert!(all_series.contains_key("series1"));
298 assert!(all_series.contains_key("series2"));
299
300 let series1_arc = all_series.get("series1").unwrap();
302 let series1_guard = series1_arc.read().unwrap();
303 assert_eq!(series1_guard.timestamps[0], ts1);
304
305 let series2_arc = all_series.get("series2").unwrap();
306 let series2_guard = series2_arc.read().unwrap();
307 assert_eq!(series2_guard.timestamps[0], ts2);
308 }
309
310 #[test]
311 fn test_append_with_tags() {
312 let mut storage = InMemoryStorage::default();
313 let series = "test_tags";
314
315 let ts1 = get_current_timestamp();
317 thread::sleep(std::time::Duration::from_nanos(1));
318 let ts2 = get_current_timestamp();
319
320 let tags1 = create_tags(&[("region", "us-east"), ("host", "server1")]);
322 let tags2 = create_tags(&[("region", "us-west"), ("host", "server2")]);
323
324 let points = vec![
326 create_point_with_tags(ts1, 1.0, tags1.clone()),
327 create_point_with_tags(ts2, 2.0, tags2.clone()),
328 ];
329
330 let mut batch = HashMap::new();
331 batch.insert(series.to_string(), points);
332 storage.append_batch(batch).unwrap();
333
334 let chunk_arc = storage.get_chunk_for_query(series).unwrap();
336 let chunk_guard = chunk_arc.read().unwrap();
337
338 assert_eq!(chunk_guard.len(), 2);
339
340 assert_eq!(chunk_guard.tags[0], tags1);
342 assert_eq!(chunk_guard.tags[0].get("region"), Some(&"us-east".to_string()));
343
344 assert_eq!(chunk_guard.tags[1], tags2);
346 assert_eq!(chunk_guard.tags[1].get("host"), Some(&"server2".to_string()));
347 }
348
349 #[test]
350 fn test_empty_batch() {
351 let mut storage = InMemoryStorage::default();
352
353 let mut empty_batch = HashMap::new();
355 empty_batch.insert("empty_series".to_string(), Vec::new());
356
357 storage.append_batch(empty_batch).unwrap();
358
359 assert!(storage.get_chunk_for_query("empty_series").is_none());
361 }
362
363 #[test]
364 fn test_append_out_of_order_points() {
365 let mut storage = InMemoryStorage::default();
366 let series = "out_of_order";
367
368 let ts1 = get_current_timestamp();
370 let ts2 = ts1 + 1000;
371 let ts3 = ts1 + 2000;
372 let ts4 = ts1 + 3000;
373 let ts5 = ts1 + 4000;
374
375 let mut batch1 = HashMap::new();
377 batch1.insert(series.to_string(), vec![
378 create_point(ts1, 1.0),
379 create_point(ts2, 2.0),
380 create_point(ts3, 3.0),
381 ]);
382 storage.append_batch(batch1).unwrap();
383
384 let mut batch2 = HashMap::new();
386 batch2.insert(series.to_string(), vec![
387 create_point(ts5, 5.0), create_point(ts2 - 500, 1.5), create_point(ts1 - 500, 0.5), create_point(ts4, 4.0), ]);
392 storage.append_batch(batch2).unwrap();
393
394 let chunk_arc = storage.get_chunk_for_query(series).unwrap();
396 let chunk_guard = chunk_arc.read().unwrap();
397
398 assert_eq!(chunk_guard.len(), 7);
399
400 let expected_ts = vec![ts1 - 500, ts1, ts2 - 500, ts2, ts3, ts4, ts5];
402 let expected_values = vec![0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0];
403
404 assert_eq!(chunk_guard.timestamps, expected_ts);
405 assert_eq!(chunk_guard.values, expected_values);
406 }
407}
408