Skip to main content

reifydb_column/
bucket.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use reifydb_core::interface::catalog::series::{Series, SeriesKey, SeriesMetadata, TimestampPrecision};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
9pub struct BucketId(pub u64);
10
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
12pub struct Bucket {
13	pub start: u64,
14	pub end: u64,
15	pub width: u64,
16}
17
18impl Bucket {
19	pub fn id(&self) -> BucketId {
20		BucketId(self.start)
21	}
22
23	pub fn contains(&self, key: u64) -> bool {
24		key >= self.start && key < self.end
25	}
26
27	pub fn len(&self) -> u64 {
28		self.end - self.start
29	}
30
31	pub fn is_empty(&self) -> bool {
32		self.end == self.start
33	}
34}
35
36pub fn bucket_for(key: u64, width: u64) -> Bucket {
37	assert!(width > 0, "bucket_for: width must be > 0");
38	let start = (key / width) * width;
39	Bucket {
40		start,
41		end: start + width,
42		width,
43	}
44}
45
46pub fn is_closed(
47	bucket: &Bucket,
48	series: &Series,
49	metadata: &SeriesMetadata,
50	now: SystemTime,
51	grace: Duration,
52) -> bool {
53	match &series.key {
54		SeriesKey::DateTime {
55			precision,
56			..
57		} => {
58			let bucket_end_wall = to_systemtime(bucket.end, *precision);
59			now.duration_since(bucket_end_wall).map(|d| d > grace).unwrap_or(false)
60		}
61		SeriesKey::Integer {
62			..
63		} => metadata.newest_key >= bucket.end,
64	}
65}
66
67fn to_systemtime(key: u64, precision: TimestampPrecision) -> SystemTime {
68	let nanos: u128 = match precision {
69		TimestampPrecision::Second => (key as u128) * 1_000_000_000,
70		TimestampPrecision::Millisecond => (key as u128) * 1_000_000,
71		TimestampPrecision::Microsecond => (key as u128) * 1_000,
72		TimestampPrecision::Nanosecond => key as u128,
73	};
74	UNIX_EPOCH + Duration::from_nanos(nanos as u64)
75}
76
77#[cfg(test)]
78mod tests {
79	use reifydb_core::interface::catalog::id::{NamespaceId, SeriesId};
80
81	use super::*;
82
83	#[test]
84	fn bucket_for_aligns_to_width() {
85		let b = bucket_for(137, 100);
86		assert_eq!(b.start, 100);
87		assert_eq!(b.end, 200);
88		assert_eq!(b.width, 100);
89		assert!(b.contains(137));
90		assert!(!b.contains(200));
91		assert_eq!(b.id(), BucketId(100));
92	}
93
94	fn series_with(key: SeriesKey) -> Series {
95		Series {
96			id: SeriesId(1),
97			namespace: NamespaceId(1),
98			name: "s".into(),
99			columns: vec![],
100			tag: None,
101			key,
102			primary_key: None,
103			underlying: false,
104		}
105	}
106
107	#[test]
108	fn integer_bucket_closed_when_newest_key_advances() {
109		let s = series_with(SeriesKey::Integer {
110			column: "k".into(),
111		});
112		let b = Bucket {
113			start: 0,
114			end: 100,
115			width: 100,
116		};
117		let mut meta = SeriesMetadata::new(s.id);
118		meta.newest_key = 99;
119		assert!(!is_closed(&b, &s, &meta, SystemTime::now(), Duration::ZERO));
120		meta.newest_key = 100;
121		assert!(is_closed(&b, &s, &meta, SystemTime::now(), Duration::ZERO));
122	}
123
124	#[test]
125	fn datetime_bucket_closes_after_grace_elapses() {
126		let s = series_with(SeriesKey::DateTime {
127			column: "ts".into(),
128			precision: TimestampPrecision::Millisecond,
129		});
130		// bucket.end at 1000ms past epoch
131		let b = Bucket {
132			start: 0,
133			end: 1000,
134			width: 1000,
135		};
136		let meta = SeriesMetadata::new(s.id);
137		let bucket_end = UNIX_EPOCH + Duration::from_millis(1000);
138		assert!(!is_closed(&b, &s, &meta, bucket_end, Duration::from_millis(100)));
139		let past_grace = bucket_end + Duration::from_millis(250);
140		assert!(is_closed(&b, &s, &meta, past_grace, Duration::from_millis(100)));
141	}
142}