1use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use reifydb_core::interface::catalog::series::{Series, SeriesKey, SeriesMetadata, TimestampPrecision};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
9pub struct BucketId(pub u64);
10
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
12pub struct Bucket {
13 pub start: u64,
14 pub end: u64,
15 pub width: u64,
16}
17
18impl Bucket {
19 pub fn id(&self) -> BucketId {
20 BucketId(self.start)
21 }
22
23 pub fn contains(&self, key: u64) -> bool {
24 key >= self.start && key < self.end
25 }
26
27 pub fn len(&self) -> u64 {
28 self.end - self.start
29 }
30
31 pub fn is_empty(&self) -> bool {
32 self.end == self.start
33 }
34}
35
36pub fn bucket_for(key: u64, width: u64) -> Bucket {
37 assert!(width > 0, "bucket_for: width must be > 0");
38 let start = (key / width) * width;
39 Bucket {
40 start,
41 end: start + width,
42 width,
43 }
44}
45
46pub fn is_closed(
47 bucket: &Bucket,
48 series: &Series,
49 metadata: &SeriesMetadata,
50 now: SystemTime,
51 grace: Duration,
52) -> bool {
53 match &series.key {
54 SeriesKey::DateTime {
55 precision,
56 ..
57 } => {
58 let bucket_end_wall = to_systemtime(bucket.end, *precision);
59 now.duration_since(bucket_end_wall).map(|d| d > grace).unwrap_or(false)
60 }
61 SeriesKey::Integer {
62 ..
63 } => metadata.newest_key >= bucket.end,
64 }
65}
66
67fn to_systemtime(key: u64, precision: TimestampPrecision) -> SystemTime {
68 let nanos: u128 = match precision {
69 TimestampPrecision::Second => (key as u128) * 1_000_000_000,
70 TimestampPrecision::Millisecond => (key as u128) * 1_000_000,
71 TimestampPrecision::Microsecond => (key as u128) * 1_000,
72 TimestampPrecision::Nanosecond => key as u128,
73 };
74 UNIX_EPOCH + Duration::from_nanos(nanos as u64)
75}
76
77#[cfg(test)]
78mod tests {
79 use reifydb_core::interface::catalog::id::{NamespaceId, SeriesId};
80
81 use super::*;
82
83 #[test]
84 fn bucket_for_aligns_to_width() {
85 let b = bucket_for(137, 100);
86 assert_eq!(b.start, 100);
87 assert_eq!(b.end, 200);
88 assert_eq!(b.width, 100);
89 assert!(b.contains(137));
90 assert!(!b.contains(200));
91 assert_eq!(b.id(), BucketId(100));
92 }
93
94 fn series_with(key: SeriesKey) -> Series {
95 Series {
96 id: SeriesId(1),
97 namespace: NamespaceId(1),
98 name: "s".into(),
99 columns: vec![],
100 tag: None,
101 key,
102 primary_key: None,
103 underlying: false,
104 }
105 }
106
107 #[test]
108 fn integer_bucket_closed_when_newest_key_advances() {
109 let s = series_with(SeriesKey::Integer {
110 column: "k".into(),
111 });
112 let b = Bucket {
113 start: 0,
114 end: 100,
115 width: 100,
116 };
117 let mut meta = SeriesMetadata::new(s.id);
118 meta.newest_key = 99;
119 assert!(!is_closed(&b, &s, &meta, SystemTime::now(), Duration::ZERO));
120 meta.newest_key = 100;
121 assert!(is_closed(&b, &s, &meta, SystemTime::now(), Duration::ZERO));
122 }
123
124 #[test]
125 fn datetime_bucket_closes_after_grace_elapses() {
126 let s = series_with(SeriesKey::DateTime {
127 column: "ts".into(),
128 precision: TimestampPrecision::Millisecond,
129 });
130 let b = Bucket {
132 start: 0,
133 end: 1000,
134 width: 1000,
135 };
136 let meta = SeriesMetadata::new(s.id);
137 let bucket_end = UNIX_EPOCH + Duration::from_millis(1000);
138 assert!(!is_closed(&b, &s, &meta, bucket_end, Duration::from_millis(100)));
139 let past_grace = bucket_end + Duration::from_millis(250);
140 assert!(is_closed(&b, &s, &meta, past_grace, Duration::from_millis(100)));
141 }
142}