1use crate::error::DbError;
2use crate::types::{DataPoint, TimeSeriesChunk};
3use std::collections::HashMap;
4use std::sync::{Arc, RwLock};
5
6#[derive(Debug, Default)]
10pub struct InMemoryStorage {
11 series_data: HashMap<String, Arc<RwLock<TimeSeriesChunk>>>,
12}
13
14impl InMemoryStorage {
15 pub fn append_batch(&mut self, data: HashMap<String, Vec<DataPoint>>) -> Result<(), DbError> {
19 for (series_name, points) in data {
20 if points.is_empty() {
21 continue;
22 }
23
24 let chunk_arc = self
25 .series_data
26 .entry(series_name.clone())
27 .or_insert_with(|| Arc::new(RwLock::new(TimeSeriesChunk::default())));
28
29 let mut chunk_guard = chunk_arc.write()?;
30 chunk_guard.append_batch(points);
31
32 let mut combined: Vec<_> = chunk_guard
35 .timestamps
36 .iter()
37 .zip(chunk_guard.values.iter())
38 .zip(chunk_guard.tags.iter())
39 .map(|((&ts, &val), tag)| (ts, val, tag.clone())) .collect();
41
42 combined.sort_unstable_by_key(|&(ts, _, _)| ts);
44
45 chunk_guard.timestamps.clear();
47 chunk_guard.values.clear();
48 chunk_guard.tags.clear();
49
50 chunk_guard.timestamps.reserve(combined.len());
51 chunk_guard.values.reserve(combined.len());
52 chunk_guard.tags.reserve(combined.len());
53
54 for (ts, val, tag) in combined {
55 chunk_guard.timestamps.push(ts);
56 chunk_guard.values.push(val);
57 chunk_guard.tags.push(tag);
58 }
59 }
61 Ok(())
62 }
63
64 pub fn append_points(&mut self, series: &str, points: Vec<DataPoint>) -> Result<(), DbError> {
66 if points.is_empty() {
67 return Ok(());
68 }
69
70 let chunk_arc = self
71 .series_data
72 .entry(series.to_string())
73 .or_insert_with(|| Arc::new(RwLock::new(TimeSeriesChunk::default())));
74
75 let mut chunk_guard = chunk_arc.write()?;
76 chunk_guard.append_batch(points);
77
78 let mut combined: Vec<_> = chunk_guard
80 .timestamps
81 .iter()
82 .zip(chunk_guard.values.iter())
83 .zip(chunk_guard.tags.iter())
84 .map(|((&ts, &val), tag)| (ts, val, tag.clone()))
85 .collect();
86
87 combined.sort_unstable_by_key(|&(ts, _, _)| ts);
88
89 chunk_guard.timestamps.clear();
90 chunk_guard.values.clear();
91 chunk_guard.tags.clear();
92
93 chunk_guard.timestamps.reserve(combined.len());
94 chunk_guard.values.reserve(combined.len());
95 chunk_guard.tags.reserve(combined.len());
96
97 for (ts, val, tag) in combined {
98 chunk_guard.timestamps.push(ts);
99 chunk_guard.values.push(val);
100 chunk_guard.tags.push(tag);
101 }
102
103 Ok(())
104 }
105
106 pub fn get_chunk_for_query(&self, series: &str) -> Option<Arc<RwLock<TimeSeriesChunk>>> {
108 self.series_data.get(series).cloned()
109 }
110
111 pub fn get_all_series(&self) -> &HashMap<String, Arc<RwLock<TimeSeriesChunk>>> {
113 &self.series_data
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use crate::types::{TagSet, Timestamp, Value};
121 use std::thread;
122 use std::time::{SystemTime, UNIX_EPOCH};
123
124 fn create_point(ts: Timestamp, val: Value) -> DataPoint {
125 DataPoint {
126 timestamp: ts,
127 value: val,
128 tags: TagSet::new(),
129 }
130 }
131
132 fn create_point_with_tags(ts: Timestamp, val: Value, tags: TagSet) -> DataPoint {
133 DataPoint {
134 timestamp: ts,
135 value: val,
136 tags,
137 }
138 }
139
140 fn get_current_timestamp() -> Timestamp {
141 SystemTime::now()
142 .duration_since(UNIX_EPOCH)
143 .unwrap()
144 .as_nanos() as u64
145 }
146
147 fn create_tags(pairs: &[(&str, &str)]) -> TagSet {
148 pairs
149 .iter()
150 .map(|(k, v)| (k.to_string(), v.to_string()))
151 .collect()
152 }
153
154 #[test]
155 fn test_append_and_sort() {
156 let mut storage = InMemoryStorage::default();
157 let series = "test_sort";
158 let points1 = vec![create_point(100, 1.0), create_point(300, 3.0)];
159 let points2 = vec![create_point(50, 0.5), create_point(200, 2.0)];
160
161 let mut batch1 = HashMap::new();
162 batch1.insert(series.to_string(), points1);
163 storage.append_batch(batch1).unwrap();
164
165 let mut batch2 = HashMap::new();
166 batch2.insert(series.to_string(), points2);
167 storage.append_batch(batch2).unwrap();
168
169 let chunk_arc = storage.get_chunk_for_query(series).unwrap();
170 let chunk_guard = chunk_arc.read().unwrap();
171
172 assert_eq!(chunk_guard.len(), 4);
173 assert_eq!(chunk_guard.timestamps, vec![50, 100, 200, 300]);
174 assert_eq!(chunk_guard.values, vec![0.5, 1.0, 2.0, 3.0]);
175 }
176
177 #[test]
178 fn test_append_batch_multiple_series() {
179 let mut storage = InMemoryStorage::default();
180
181 let ts1 = get_current_timestamp();
183 let ts2 = ts1 + 100;
184 let ts3 = ts1 + 200;
185
186 let mut batch = HashMap::new();
187 batch.insert(
188 "series1".to_string(),
189 vec![create_point(ts1, 1.0), create_point(ts2, 1.1)],
190 );
191 batch.insert("series2".to_string(), vec![create_point(ts3, 2.0)]);
192 batch.insert(
193 "series3".to_string(),
194 vec![create_point(ts2, 3.0), create_point(ts1, 3.1)],
195 );
196
197 storage.append_batch(batch).unwrap();
198
199 let chunk_arc = storage.get_chunk_for_query("series1").unwrap();
201 let chunk_guard = chunk_arc.read().unwrap();
202 assert_eq!(chunk_guard.len(), 2);
203 assert_eq!(chunk_guard.timestamps, vec![ts1, ts2]);
204 assert_eq!(chunk_guard.values, vec![1.0, 1.1]);
205
206 let chunk_arc = storage.get_chunk_for_query("series2").unwrap();
208 let chunk_guard = chunk_arc.read().unwrap();
209 assert_eq!(chunk_guard.len(), 1);
210 assert_eq!(chunk_guard.timestamps, vec![ts3]);
211 assert_eq!(chunk_guard.values, vec![2.0]);
212
213 let chunk_arc = storage.get_chunk_for_query("series3").unwrap();
215 let chunk_guard = chunk_arc.read().unwrap();
216 assert_eq!(chunk_guard.len(), 2);
217 assert_eq!(chunk_guard.timestamps, vec![ts1, ts2]); assert_eq!(chunk_guard.values, vec![3.1, 3.0]); }
220
221 #[test]
222 fn test_append_points() {
223 let mut storage = InMemoryStorage::default();
224 let series = "test_append_points";
225
226 let ts1 = get_current_timestamp();
228 thread::sleep(std::time::Duration::from_nanos(1));
229 let ts2 = get_current_timestamp();
230 thread::sleep(std::time::Duration::from_nanos(1));
231 let ts3 = get_current_timestamp();
232
233 let points = vec![
235 create_point(ts2, 2.0),
236 create_point(ts1, 1.0),
237 create_point(ts3, 3.0),
238 ];
239
240 storage.append_points(series, points).unwrap();
241
242 let chunk_arc = storage.get_chunk_for_query(series).unwrap();
244 let chunk_guard = chunk_arc.read().unwrap();
245
246 assert_eq!(chunk_guard.len(), 3);
247 assert_eq!(chunk_guard.timestamps, vec![ts1, ts2, ts3]); assert_eq!(chunk_guard.values, vec![1.0, 2.0, 3.0]); }
250
251 #[test]
252 fn test_get_chunk_for_query() {
253 let mut storage = InMemoryStorage::default();
254 let series = "test_get_chunk";
255
256 let ts = get_current_timestamp();
258 let points = vec![create_point(ts, 42.0)];
259
260 let mut batch = HashMap::new();
261 batch.insert(series.to_string(), points);
262 storage.append_batch(batch).unwrap();
263
264 let chunk_opt = storage.get_chunk_for_query(series);
266 assert!(chunk_opt.is_some());
267
268 let chunk_arc = chunk_opt.unwrap();
269 let chunk_guard = chunk_arc.read().unwrap();
270 assert_eq!(chunk_guard.len(), 1);
271 assert_eq!(chunk_guard.timestamps[0], ts);
272 assert_eq!(chunk_guard.values[0], 42.0);
273
274 let non_existent = storage.get_chunk_for_query("non_existent");
276 assert!(non_existent.is_none());
277 }
278
279 #[test]
280 fn test_get_all_series() {
281 let mut storage = InMemoryStorage::default();
282
283 let ts1 = get_current_timestamp();
285 let ts2 = ts1 + 100;
286
287 let mut batch = HashMap::new();
288 batch.insert("series1".to_string(), vec![create_point(ts1, 1.0)]);
289 batch.insert("series2".to_string(), vec![create_point(ts2, 2.0)]);
290
291 storage.append_batch(batch).unwrap();
292
293 let all_series = storage.get_all_series();
295
296 assert_eq!(all_series.len(), 2);
298
299 assert!(all_series.contains_key("series1"));
301 assert!(all_series.contains_key("series2"));
302
303 let series1_arc = all_series.get("series1").unwrap();
305 let series1_guard = series1_arc.read().unwrap();
306 assert_eq!(series1_guard.timestamps[0], ts1);
307
308 let series2_arc = all_series.get("series2").unwrap();
309 let series2_guard = series2_arc.read().unwrap();
310 assert_eq!(series2_guard.timestamps[0], ts2);
311 }
312
313 #[test]
314 fn test_append_with_tags() {
315 let mut storage = InMemoryStorage::default();
316 let series = "test_tags";
317
318 let ts1 = get_current_timestamp();
320 thread::sleep(std::time::Duration::from_nanos(1));
321 let ts2 = get_current_timestamp();
322
323 let tags1 = create_tags(&[("region", "us-east"), ("host", "server1")]);
325 let tags2 = create_tags(&[("region", "us-west"), ("host", "server2")]);
326
327 let points = vec![
329 create_point_with_tags(ts1, 1.0, tags1.clone()),
330 create_point_with_tags(ts2, 2.0, tags2.clone()),
331 ];
332
333 let mut batch = HashMap::new();
334 batch.insert(series.to_string(), points);
335 storage.append_batch(batch).unwrap();
336
337 let chunk_arc = storage.get_chunk_for_query(series).unwrap();
339 let chunk_guard = chunk_arc.read().unwrap();
340
341 assert_eq!(chunk_guard.len(), 2);
342
343 assert_eq!(chunk_guard.tags[0], tags1);
345 assert_eq!(
346 chunk_guard.tags[0].get("region"),
347 Some(&"us-east".to_string())
348 );
349
350 assert_eq!(chunk_guard.tags[1], tags2);
352 assert_eq!(
353 chunk_guard.tags[1].get("host"),
354 Some(&"server2".to_string())
355 );
356 }
357
358 #[test]
359 fn test_empty_batch() {
360 let mut storage = InMemoryStorage::default();
361
362 let mut empty_batch = HashMap::new();
364 empty_batch.insert("empty_series".to_string(), Vec::new());
365
366 storage.append_batch(empty_batch).unwrap();
367
368 assert!(storage.get_chunk_for_query("empty_series").is_none());
370 }
371
372 #[test]
373 fn test_append_out_of_order_points() {
374 let mut storage = InMemoryStorage::default();
375 let series = "out_of_order";
376
377 let ts1 = get_current_timestamp();
379 let ts2 = ts1 + 1000;
380 let ts3 = ts1 + 2000;
381 let ts4 = ts1 + 3000;
382 let ts5 = ts1 + 4000;
383
384 let mut batch1 = HashMap::new();
386 batch1.insert(
387 series.to_string(),
388 vec![
389 create_point(ts1, 1.0),
390 create_point(ts2, 2.0),
391 create_point(ts3, 3.0),
392 ],
393 );
394 storage.append_batch(batch1).unwrap();
395
396 let mut batch2 = HashMap::new();
398 batch2.insert(
399 series.to_string(),
400 vec![
401 create_point(ts5, 5.0), create_point(ts2 - 500, 1.5), create_point(ts1 - 500, 0.5), create_point(ts4, 4.0), ],
406 );
407 storage.append_batch(batch2).unwrap();
408
409 let chunk_arc = storage.get_chunk_for_query(series).unwrap();
411 let chunk_guard = chunk_arc.read().unwrap();
412
413 assert_eq!(chunk_guard.len(), 7);
414
415 let expected_ts = vec![ts1 - 500, ts1, ts2 - 500, ts2, ts3, ts4, ts5];
417 let expected_values = vec![0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0];
418
419 assert_eq!(chunk_guard.timestamps, expected_ts);
420 assert_eq!(chunk_guard.values, expected_values);
421 }
422}