1use crate::metric::{DataPoint, Row};
2use anyhow::Result;
3use std::path::Path;
4
5use super::{disk::flush, Boundary, Partition, PartitionError, PointPartitionOrdering};
6
7#[derive(Debug)]
8pub struct MemoryPartition {
9 pub map: dashmap::DashMap<String, MetricEntry>,
10 partition_boundary: Boundary,
11}
12
13impl Partition for MemoryPartition {
14 fn select(&self, name: &str, start: i64, end: i64) -> Result<Vec<DataPoint>> {
15 Ok(match self.partition_boundary.contains_range(start, end) {
16 true => match self.map.get(name) {
17 Some(entry) => entry.select(start, end),
18 None => vec![],
19 },
20 false => vec![],
21 })
22 }
23
24 fn insert(&self, row: &Row) -> Result<(), PartitionError> {
25 if !self
26 .partition_boundary
27 .contains_point(row.data_point.timestamp)
28 {
29 return Err(PartitionError::OutOfBounds);
30 }
31 match self.map.get_mut(row.metric) {
32 Some(mut m) => {
33 m.insert(row.data_point);
34 }
35 None => {
36 self.map
37 .insert(row.metric.to_string(), MetricEntry::new(row.data_point));
38 }
39 };
40 Ok(())
41 }
42
43 fn ordering(&self, row: &Row) -> PointPartitionOrdering {
44 self.partition_boundary.ordering(row.data_point.timestamp)
45 }
46
47 fn flush(
48 &self,
49 dir_path: &Path,
50 encode_strategy: crate::EncodeStrategy,
51 ) -> Result<(), PartitionError> {
52 flush(self, dir_path, encode_strategy).map_err(|_| PartitionError::Flush)
53 }
54
55 fn boundary(&self) -> Boundary {
56 self.partition_boundary
57 }
58
59 fn clean(&self) -> Result<(), PartitionError> {
60 Ok(())
63 }
64}
65
66impl MemoryPartition {
67 pub fn new(partition_duration: i64, min_timestamp: i64) -> Self {
68 let partition_boundary = Boundary {
69 min_timestamp,
70 max_timestamp: min_timestamp + partition_duration,
71 };
72
73 Self {
74 map: dashmap::DashMap::new(),
75 partition_boundary,
76 }
77 }
78}
79
80#[derive(Debug)]
81pub struct MetricEntry {
82 pub data_points: Vec<DataPoint>,
83}
84
85impl MetricEntry {
86 pub fn new(data_point: DataPoint) -> Self {
87 Self {
88 data_points: vec![data_point],
89 }
90 }
91
92 pub fn min_timestamp(&self) -> i64 {
93 if self.data_points.is_empty() {
94 0
95 } else {
96 self.data_points[0].timestamp
97 }
98 }
99
100 pub fn max_timestamp(&self) -> i64 {
101 if self.data_points.is_empty() {
102 0
103 } else {
104 self.data_points[self.data_points.len() - 1].timestamp
105 }
106 }
107
108 pub fn select(&self, start: i64, end: i64) -> Vec<DataPoint> {
109 if self.data_points.is_empty() {
110 return vec![];
111 }
112
113 let min_timestamp = self.data_points[0].timestamp;
114 let max_timestamp = self.data_points[self.data_points.len() - 1].timestamp;
115 if min_timestamp > end || max_timestamp < start {
116 return vec![];
118 }
119
120 let start_idx = if start <= min_timestamp {
121 0
122 } else {
123 self.data_points
124 .binary_search_by(|dp| {
125 if dp.timestamp >= start {
126 std::cmp::Ordering::Greater
127 } else {
128 std::cmp::Ordering::Less
129 }
130 })
131 .unwrap_or_else(|i| i)
132 };
133
134 let end_idx = if end >= max_timestamp {
135 self.data_points.len()
136 } else {
137 self.data_points
138 .binary_search_by(|dp| {
139 if dp.timestamp > end {
140 std::cmp::Ordering::Greater
141 } else {
142 std::cmp::Ordering::Less
143 }
144 })
145 .unwrap_or_else(|i| i)
146 };
147
148 return self.data_points[start_idx..end_idx].to_vec();
149 }
150
151 pub fn insert(&mut self, data_point: DataPoint) {
152 match self.data_points.is_empty() {
153 true => self.data_points.push(data_point),
154 false => {
155 let max_timestamp = self.data_points[self.data_points.len() - 1].timestamp;
156 if data_point.timestamp >= max_timestamp {
157 self.data_points.push(data_point)
158 } else {
159 let pos = self
161 .data_points
162 .binary_search_by_key(&data_point.timestamp, |d| d.timestamp)
163 .unwrap_or_else(|i| i);
164 self.data_points.insert(pos, data_point);
165 }
166 }
167 }
168 }
169}
170
171#[cfg(test)]
172pub mod tests {
173 use crate::{
174 metric::{DataPoint, Row},
175 partition::{memory::Partition, Boundary, PartitionError},
176 };
177
178 use super::MemoryPartition;
179
180 fn create_partition_with_rows(partition_duration: i64, rows: &[Row]) -> MemoryPartition {
181 assert!(!rows.is_empty());
182 let partition = MemoryPartition::new(partition_duration, rows[0].data_point.timestamp);
183 for row in rows {
184 partition.insert(row).unwrap();
185 }
186 partition
187 }
188
189 #[test]
190 fn test_partition_boundary() {
191 let boundaries = Boundary {
192 min_timestamp: 1000,
193 max_timestamp: 2000,
194 };
195 assert!(boundaries.contains_range(1500, 1800));
196 assert!(boundaries.contains_range(0, 3000));
197 assert!(boundaries.contains_range(1999, 2000));
198
199 assert!(!boundaries.contains_range(2000, 2001));
200 assert!(!boundaries.contains_range(0, 999));
201 }
202
203 #[test]
204 fn test_min_max_timestamp() {
205 let partition = MemoryPartition::new(1000, 1234);
206 assert_eq!(partition.boundary().min_timestamp(), 1234);
207 assert_eq!(partition.boundary().max_timestamp(), 2234);
208 }
209
210 #[test]
211 fn test_simple_select_in_range() {
212 let metric = "hello";
213 let data_point = DataPoint {
214 timestamp: 1234,
215 value: 4.20,
216 };
217 let row = Row { metric, data_point };
218 let partition = MemoryPartition::new(60000, data_point.timestamp);
219 partition.insert(&row).unwrap();
220 let result = partition.select(metric, 1000, 2000).unwrap();
221 assert_eq!(result.len(), 1);
222 assert_eq!(data_point, result[0]);
223 }
224
225 #[test]
226 fn test_multiple_metrics() {
227 let metric_a = "hello";
228 let data_point_a = DataPoint {
229 timestamp: 1000,
230 value: 4.20,
231 };
232 let row_a = &Row {
233 metric: metric_a,
234 data_point: data_point_a,
235 };
236
237 let metric_b = "world";
238 let data_point_b = DataPoint {
239 timestamp: 1001,
240 value: 1.50,
241 };
242 let row_b = &Row {
243 metric: metric_b,
244 data_point: data_point_b,
245 };
246
247 let partition = MemoryPartition::new(60000, row_a.data_point.timestamp);
248 partition.insert(row_a).unwrap();
249 partition.insert(row_b).unwrap();
250
251 let result = partition.select(metric_a, 1000, 2000).unwrap();
252 assert_eq!(result.len(), 1);
253 assert_eq!(data_point_a, result[0]);
254
255 let result = partition.select(metric_b, 1000, 2000).unwrap();
256 assert_eq!(result.len(), 1);
257 assert_eq!(data_point_b, result[0]);
258 }
259
260 #[test]
261 fn test_simple_select_out_of_range() {
262 let metric = "hello";
263 let row = Row {
264 metric,
265 data_point: DataPoint {
266 timestamp: 1234,
267 value: 4.20,
268 },
269 };
270 let partition = MemoryPartition::new(60000, row.data_point.timestamp);
271 partition.insert(&row).unwrap();
272 let result = partition.select(metric, 0, 1000).unwrap();
273 assert_eq!(result.len(), 0);
274 }
275
276 #[test]
277 fn test_select_boundaries() {
278 let metric = "hello";
279 let data_points = [
280 DataPoint {
281 timestamp: 100,
282 value: 0.0,
283 },
284 DataPoint {
285 timestamp: 200,
286 value: 0.0,
287 },
288 DataPoint {
289 timestamp: 200,
290 value: 1.0,
291 },
292 DataPoint {
293 timestamp: 300,
294 value: 0.0,
295 },
296 DataPoint {
297 timestamp: 400,
298 value: 0.0,
299 },
300 DataPoint {
301 timestamp: 400,
302 value: 1.0,
303 },
304 DataPoint {
305 timestamp: 400,
306 value: 2.0,
307 },
308 DataPoint {
309 timestamp: 500,
310 value: 0.0,
311 },
312 ];
313 let rows: Vec<Row> = data_points
314 .iter()
315 .map(|dp| Row {
316 metric,
317 data_point: *dp,
318 })
319 .collect();
320
321 let partition = create_partition_with_rows(60000, &rows);
322
323 let result = partition.select(metric, 200, 400).unwrap();
324 assert_eq!(result, data_points[1..7]);
325 }
326
327 #[test]
328 fn test_complex_select() {
329 let metric = "hello";
330 let data_points = [
331 DataPoint {
332 timestamp: 100,
333 value: 0.0,
334 },
335 DataPoint {
336 timestamp: 200,
337 value: 0.0,
338 },
339 DataPoint {
340 timestamp: 200,
341 value: 1.0,
342 },
343 DataPoint {
344 timestamp: 300,
345 value: 0.0,
346 },
347 ];
348
349 let rows: Vec<Row> = data_points
350 .iter()
351 .map(|dp| Row {
352 metric,
353 data_point: *dp,
354 })
355 .collect();
356
357 let partition = create_partition_with_rows(60000, &rows);
358
359 let result = partition.select(metric, 101, 300).unwrap();
360 assert_eq!(result, data_points[1..]);
361 }
362
363 #[test]
364 fn test_past_writes() {
365 let metric = "hello";
366 let data_points = [
367 DataPoint {
368 timestamp: 200, value: 2.0,
370 },
371 DataPoint {
372 timestamp: 100, value: 1.0,
374 },
375 DataPoint {
376 timestamp: 300,
377 value: 3.0,
378 },
379 ];
380
381 let rows: Vec<Row> = data_points
382 .iter()
383 .map(|dp| Row {
384 metric,
385 data_point: *dp,
386 })
387 .collect();
388
389 let partition = MemoryPartition::new(60000, rows[0].data_point.timestamp);
390 partition.insert(&rows[0]).unwrap();
391 assert_eq!(
392 partition.insert(&rows[1]).err(),
393 Some(PartitionError::OutOfBounds)
394 );
395 partition.insert(&rows[2]).unwrap();
396
397 let result = partition.select(metric, 0, 1000).unwrap();
398 assert_eq!(result, vec![data_points[0], data_points[2]]);
399 }
400
401 #[test]
402 fn test_out_of_order_writes() {
403 let metric = "hello";
404 let mut data_points = [
405 DataPoint {
406 timestamp: 100,
407 value: 0.0,
408 },
409 DataPoint {
410 timestamp: 300,
411 value: 0.0,
412 },
413 DataPoint {
414 timestamp: 200, value: 0.0,
416 },
417 DataPoint {
418 timestamp: 400,
419 value: 0.0,
420 },
421 ];
422
423 let rows: Vec<Row> = data_points
424 .iter()
425 .map(|dp| Row {
426 metric,
427 data_point: *dp,
428 })
429 .collect();
430
431 let partition = create_partition_with_rows(60000, &rows);
432 data_points.sort_by_key(|d| d.timestamp);
433
434 let result = partition.select(metric, 0, 1000).unwrap();
435 assert_eq!(result, data_points);
436 }
437}