1use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use reifydb_core::interface::catalog::series::{Series, SeriesKey, SeriesMetadata, TimestampPrecision};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
9pub struct BucketId(pub u64);
10
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
15pub struct Bucket {
16 pub start: u64,
17 pub end: u64,
18 pub width: u64,
19}
20
21impl Bucket {
22 pub fn id(&self) -> BucketId {
26 BucketId(self.start)
27 }
28
29 pub fn contains(&self, key: u64) -> bool {
30 key >= self.start && key < self.end
31 }
32
33 pub fn len(&self) -> u64 {
34 self.end - self.start
35 }
36
37 pub fn is_empty(&self) -> bool {
38 self.end == self.start
39 }
40}
41
42pub fn bucket_for(key: u64, width: u64) -> Bucket {
46 assert!(width > 0, "bucket_for: width must be > 0");
47 let start = (key / width) * width;
48 Bucket {
49 start,
50 end: start + width,
51 width,
52 }
53}
54
55pub fn is_closed(
62 bucket: &Bucket,
63 series: &Series,
64 metadata: &SeriesMetadata,
65 now: SystemTime,
66 grace: Duration,
67) -> bool {
68 match &series.key {
69 SeriesKey::DateTime {
70 precision,
71 ..
72 } => {
73 let bucket_end_wall = to_systemtime(bucket.end, *precision);
74 now.duration_since(bucket_end_wall).map(|d| d > grace).unwrap_or(false)
75 }
76 SeriesKey::Integer {
77 ..
78 } => metadata.newest_key >= bucket.end,
79 }
80}
81
82fn to_systemtime(key: u64, precision: TimestampPrecision) -> SystemTime {
83 let nanos: u128 = match precision {
84 TimestampPrecision::Second => (key as u128) * 1_000_000_000,
85 TimestampPrecision::Millisecond => (key as u128) * 1_000_000,
86 TimestampPrecision::Microsecond => (key as u128) * 1_000,
87 TimestampPrecision::Nanosecond => key as u128,
88 };
89 UNIX_EPOCH + Duration::from_nanos(nanos as u64)
90}
91
92#[cfg(test)]
93mod tests {
94 use reifydb_core::interface::catalog::id::{NamespaceId, SeriesId};
95
96 use super::*;
97
98 #[test]
99 fn bucket_for_aligns_to_width() {
100 let b = bucket_for(137, 100);
101 assert_eq!(b.start, 100);
102 assert_eq!(b.end, 200);
103 assert_eq!(b.width, 100);
104 assert!(b.contains(137));
105 assert!(!b.contains(200));
106 assert_eq!(b.id(), BucketId(100));
107 }
108
109 fn series_with(key: SeriesKey) -> Series {
110 Series {
111 id: SeriesId(1),
112 namespace: NamespaceId(1),
113 name: "s".into(),
114 columns: vec![],
115 tag: None,
116 key,
117 primary_key: None,
118 underlying: false,
119 }
120 }
121
122 #[test]
123 fn integer_bucket_closed_when_newest_key_advances() {
124 let s = series_with(SeriesKey::Integer {
125 column: "k".into(),
126 });
127 let b = Bucket {
128 start: 0,
129 end: 100,
130 width: 100,
131 };
132 let mut meta = SeriesMetadata::new(s.id);
133 meta.newest_key = 99;
134 assert!(!is_closed(&b, &s, &meta, SystemTime::now(), Duration::ZERO));
135 meta.newest_key = 100;
136 assert!(is_closed(&b, &s, &meta, SystemTime::now(), Duration::ZERO));
137 }
138
139 #[test]
140 fn datetime_bucket_closes_after_grace_elapses() {
141 let s = series_with(SeriesKey::DateTime {
142 column: "ts".into(),
143 precision: TimestampPrecision::Millisecond,
144 });
145 let b = Bucket {
147 start: 0,
148 end: 1000,
149 width: 1000,
150 };
151 let meta = SeriesMetadata::new(s.id);
152 let bucket_end = UNIX_EPOCH + Duration::from_millis(1000);
153 assert!(!is_closed(&b, &s, &meta, bucket_end, Duration::from_millis(100)));
154 let past_grace = bucket_end + Duration::from_millis(250);
155 assert!(is_closed(&b, &s, &meta, past_grace, Duration::from_millis(100)));
156 }
157}