tstorage/partition/
memory.rs

1use crate::metric::{DataPoint, Row};
2use anyhow::Result;
3use std::path::Path;
4
5use super::{disk::flush, Boundary, Partition, PartitionError, PointPartitionOrdering};
6
7#[derive(Debug)]
8pub struct MemoryPartition {
9    pub map: dashmap::DashMap<String, MetricEntry>,
10    partition_boundary: Boundary,
11}
12
13impl Partition for MemoryPartition {
14    fn select(&self, name: &str, start: i64, end: i64) -> Result<Vec<DataPoint>> {
15        Ok(match self.partition_boundary.contains_range(start, end) {
16            true => match self.map.get(name) {
17                Some(entry) => entry.select(start, end),
18                None => vec![],
19            },
20            false => vec![],
21        })
22    }
23
24    fn insert(&self, row: &Row) -> Result<(), PartitionError> {
25        if !self
26            .partition_boundary
27            .contains_point(row.data_point.timestamp)
28        {
29            return Err(PartitionError::OutOfBounds);
30        }
31        match self.map.get_mut(row.metric) {
32            Some(mut m) => {
33                m.insert(row.data_point);
34            }
35            None => {
36                self.map
37                    .insert(row.metric.to_string(), MetricEntry::new(row.data_point));
38            }
39        };
40        Ok(())
41    }
42
43    fn ordering(&self, row: &Row) -> PointPartitionOrdering {
44        self.partition_boundary.ordering(row.data_point.timestamp)
45    }
46
47    fn flush(
48        &self,
49        dir_path: &Path,
50        encode_strategy: crate::EncodeStrategy,
51    ) -> Result<(), PartitionError> {
52        flush(self, dir_path, encode_strategy).map_err(|_| PartitionError::Flush)
53    }
54
55    fn boundary(&self) -> Boundary {
56        self.partition_boundary
57    }
58
59    fn clean(&self) -> Result<(), PartitionError> {
60        // Memory held by this partition gets automatically removed
61        // when the variable is dropped.
62        Ok(())
63    }
64}
65
66impl MemoryPartition {
67    pub fn new(partition_duration: i64, min_timestamp: i64) -> Self {
68        let partition_boundary = Boundary {
69            min_timestamp,
70            max_timestamp: min_timestamp + partition_duration,
71        };
72
73        Self {
74            map: dashmap::DashMap::new(),
75            partition_boundary,
76        }
77    }
78}
79
80#[derive(Debug)]
81pub struct MetricEntry {
82    pub data_points: Vec<DataPoint>,
83}
84
85impl MetricEntry {
86    pub fn new(data_point: DataPoint) -> Self {
87        Self {
88            data_points: vec![data_point],
89        }
90    }
91
92    pub fn min_timestamp(&self) -> i64 {
93        if self.data_points.is_empty() {
94            0
95        } else {
96            self.data_points[0].timestamp
97        }
98    }
99
100    pub fn max_timestamp(&self) -> i64 {
101        if self.data_points.is_empty() {
102            0
103        } else {
104            self.data_points[self.data_points.len() - 1].timestamp
105        }
106    }
107
108    pub fn select(&self, start: i64, end: i64) -> Vec<DataPoint> {
109        if self.data_points.is_empty() {
110            return vec![];
111        }
112
113        let min_timestamp = self.data_points[0].timestamp;
114        let max_timestamp = self.data_points[self.data_points.len() - 1].timestamp;
115        if min_timestamp > end || max_timestamp < start {
116            // Out of range
117            return vec![];
118        }
119
120        let start_idx = if start <= min_timestamp {
121            0
122        } else {
123            self.data_points
124                .binary_search_by(|dp| {
125                    if dp.timestamp >= start {
126                        std::cmp::Ordering::Greater
127                    } else {
128                        std::cmp::Ordering::Less
129                    }
130                })
131                .unwrap_or_else(|i| i)
132        };
133
134        let end_idx = if end >= max_timestamp {
135            self.data_points.len()
136        } else {
137            self.data_points
138                .binary_search_by(|dp| {
139                    if dp.timestamp > end {
140                        std::cmp::Ordering::Greater
141                    } else {
142                        std::cmp::Ordering::Less
143                    }
144                })
145                .unwrap_or_else(|i| i)
146        };
147
148        return self.data_points[start_idx..end_idx].to_vec();
149    }
150
151    pub fn insert(&mut self, data_point: DataPoint) {
152        match self.data_points.is_empty() {
153            true => self.data_points.push(data_point),
154            false => {
155                let max_timestamp = self.data_points[self.data_points.len() - 1].timestamp;
156                if data_point.timestamp >= max_timestamp {
157                    self.data_points.push(data_point)
158                } else {
159                    // Out-of-order insert
160                    let pos = self
161                        .data_points
162                        .binary_search_by_key(&data_point.timestamp, |d| d.timestamp)
163                        .unwrap_or_else(|i| i);
164                    self.data_points.insert(pos, data_point);
165                }
166            }
167        }
168    }
169}
170
171#[cfg(test)]
172pub mod tests {
173    use crate::{
174        metric::{DataPoint, Row},
175        partition::{memory::Partition, Boundary, PartitionError},
176    };
177
178    use super::MemoryPartition;
179
180    fn create_partition_with_rows(partition_duration: i64, rows: &[Row]) -> MemoryPartition {
181        assert!(!rows.is_empty());
182        let partition = MemoryPartition::new(partition_duration, rows[0].data_point.timestamp);
183        for row in rows {
184            partition.insert(row).unwrap();
185        }
186        partition
187    }
188
189    #[test]
190    fn test_partition_boundary() {
191        let boundaries = Boundary {
192            min_timestamp: 1000,
193            max_timestamp: 2000,
194        };
195        assert!(boundaries.contains_range(1500, 1800));
196        assert!(boundaries.contains_range(0, 3000));
197        assert!(boundaries.contains_range(1999, 2000));
198
199        assert!(!boundaries.contains_range(2000, 2001));
200        assert!(!boundaries.contains_range(0, 999));
201    }
202
203    #[test]
204    fn test_min_max_timestamp() {
205        let partition = MemoryPartition::new(1000, 1234);
206        assert_eq!(partition.boundary().min_timestamp(), 1234);
207        assert_eq!(partition.boundary().max_timestamp(), 2234);
208    }
209
210    #[test]
211    fn test_simple_select_in_range() {
212        let metric = "hello";
213        let data_point = DataPoint {
214            timestamp: 1234,
215            value: 4.20,
216        };
217        let row = Row { metric, data_point };
218        let partition = MemoryPartition::new(60000, data_point.timestamp);
219        partition.insert(&row).unwrap();
220        let result = partition.select(metric, 1000, 2000).unwrap();
221        assert_eq!(result.len(), 1);
222        assert_eq!(data_point, result[0]);
223    }
224
225    #[test]
226    fn test_multiple_metrics() {
227        let metric_a = "hello";
228        let data_point_a = DataPoint {
229            timestamp: 1000,
230            value: 4.20,
231        };
232        let row_a = &Row {
233            metric: metric_a,
234            data_point: data_point_a,
235        };
236
237        let metric_b = "world";
238        let data_point_b = DataPoint {
239            timestamp: 1001,
240            value: 1.50,
241        };
242        let row_b = &Row {
243            metric: metric_b,
244            data_point: data_point_b,
245        };
246
247        let partition = MemoryPartition::new(60000, row_a.data_point.timestamp);
248        partition.insert(row_a).unwrap();
249        partition.insert(row_b).unwrap();
250
251        let result = partition.select(metric_a, 1000, 2000).unwrap();
252        assert_eq!(result.len(), 1);
253        assert_eq!(data_point_a, result[0]);
254
255        let result = partition.select(metric_b, 1000, 2000).unwrap();
256        assert_eq!(result.len(), 1);
257        assert_eq!(data_point_b, result[0]);
258    }
259
260    #[test]
261    fn test_simple_select_out_of_range() {
262        let metric = "hello";
263        let row = Row {
264            metric,
265            data_point: DataPoint {
266                timestamp: 1234,
267                value: 4.20,
268            },
269        };
270        let partition = MemoryPartition::new(60000, row.data_point.timestamp);
271        partition.insert(&row).unwrap();
272        let result = partition.select(metric, 0, 1000).unwrap();
273        assert_eq!(result.len(), 0);
274    }
275
276    #[test]
277    fn test_select_boundaries() {
278        let metric = "hello";
279        let data_points = [
280            DataPoint {
281                timestamp: 100,
282                value: 0.0,
283            },
284            DataPoint {
285                timestamp: 200,
286                value: 0.0,
287            },
288            DataPoint {
289                timestamp: 200,
290                value: 1.0,
291            },
292            DataPoint {
293                timestamp: 300,
294                value: 0.0,
295            },
296            DataPoint {
297                timestamp: 400,
298                value: 0.0,
299            },
300            DataPoint {
301                timestamp: 400,
302                value: 1.0,
303            },
304            DataPoint {
305                timestamp: 400,
306                value: 2.0,
307            },
308            DataPoint {
309                timestamp: 500,
310                value: 0.0,
311            },
312        ];
313        let rows: Vec<Row> = data_points
314            .iter()
315            .map(|dp| Row {
316                metric,
317                data_point: *dp,
318            })
319            .collect();
320
321        let partition = create_partition_with_rows(60000, &rows);
322
323        let result = partition.select(metric, 200, 400).unwrap();
324        assert_eq!(result, data_points[1..7]);
325    }
326
327    #[test]
328    fn test_complex_select() {
329        let metric = "hello";
330        let data_points = [
331            DataPoint {
332                timestamp: 100,
333                value: 0.0,
334            },
335            DataPoint {
336                timestamp: 200,
337                value: 0.0,
338            },
339            DataPoint {
340                timestamp: 200,
341                value: 1.0,
342            },
343            DataPoint {
344                timestamp: 300,
345                value: 0.0,
346            },
347        ];
348
349        let rows: Vec<Row> = data_points
350            .iter()
351            .map(|dp| Row {
352                metric,
353                data_point: *dp,
354            })
355            .collect();
356
357        let partition = create_partition_with_rows(60000, &rows);
358
359        let result = partition.select(metric, 101, 300).unwrap();
360        assert_eq!(result, data_points[1..]);
361    }
362
363    #[test]
364    fn test_past_writes() {
365        let metric = "hello";
366        let data_points = [
367            DataPoint {
368                timestamp: 200, // sets the timestamp baseline for the partition
369                value: 2.0,
370            },
371            DataPoint {
372                timestamp: 100, // should belong to a previous partition
373                value: 1.0,
374            },
375            DataPoint {
376                timestamp: 300,
377                value: 3.0,
378            },
379        ];
380
381        let rows: Vec<Row> = data_points
382            .iter()
383            .map(|dp| Row {
384                metric,
385                data_point: *dp,
386            })
387            .collect();
388
389        let partition = MemoryPartition::new(60000, rows[0].data_point.timestamp);
390        partition.insert(&rows[0]).unwrap();
391        assert_eq!(
392            partition.insert(&rows[1]).err(),
393            Some(PartitionError::OutOfBounds)
394        );
395        partition.insert(&rows[2]).unwrap();
396
397        let result = partition.select(metric, 0, 1000).unwrap();
398        assert_eq!(result, vec![data_points[0], data_points[2]]);
399    }
400
401    #[test]
402    fn test_out_of_order_writes() {
403        let metric = "hello";
404        let mut data_points = [
405            DataPoint {
406                timestamp: 100,
407                value: 0.0,
408            },
409            DataPoint {
410                timestamp: 300,
411                value: 0.0,
412            },
413            DataPoint {
414                timestamp: 200, // out of order
415                value: 0.0,
416            },
417            DataPoint {
418                timestamp: 400,
419                value: 0.0,
420            },
421        ];
422
423        let rows: Vec<Row> = data_points
424            .iter()
425            .map(|dp| Row {
426                metric,
427                data_point: *dp,
428            })
429            .collect();
430
431        let partition = create_partition_with_rows(60000, &rows);
432        data_points.sort_by_key(|d| d.timestamp);
433
434        let result = partition.select(metric, 0, 1000).unwrap();
435        assert_eq!(result, data_points);
436    }
437}