Skip to main content

reifydb_column/
bucket.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use reifydb_core::interface::catalog::series::{Series, SeriesKey, SeriesMetadata, TimestampPrecision};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
9pub struct BucketId(pub u64);
10
11// A half-open interval `[start, end)` over the series-key domain (keys are
12// encoded as `u64` by `Series::key_to_u64`). `width` is redundant but cached
13// so bucket arithmetic stays O(1).
14#[derive(Clone, Copy, Debug, PartialEq, Eq)]
15pub struct Bucket {
16	pub start: u64,
17	pub end: u64,
18	pub width: u64,
19}
20
21impl Bucket {
22	// Stable id for map lookup - the bucket's start, which is unique within a
23	// given `(series_id, width)` combination. The registry scopes buckets by
24	// `SeriesId`, so the start alone is enough to identify.
25	pub fn id(&self) -> BucketId {
26		BucketId(self.start)
27	}
28
29	pub fn contains(&self, key: u64) -> bool {
30		key >= self.start && key < self.end
31	}
32
33	pub fn len(&self) -> u64 {
34		self.end - self.start
35	}
36
37	pub fn is_empty(&self) -> bool {
38		self.end == self.start
39	}
40}
41
42// Compute the bucket that contains `key` given a fixed bucket `width`. Buckets
43// are aligned to multiples of `width`, i.e. `bucket.start = (key / width) * width`.
44// Panics if `width == 0`.
45pub fn bucket_for(key: u64, width: u64) -> Bucket {
46	assert!(width > 0, "bucket_for: width must be > 0");
47	let start = (key / width) * width;
48	Bucket {
49		start,
50		end: start + width,
51		width,
52	}
53}
54
55// A bucket is closed when its end has been passed - no more rows should land
56// in it. Rules differ per series-key kind:
57// - `DateTime`: bucket closed when `now - bucket_end_wall > grace`.
58// - `Integer`: bucket closed when `metadata.newest_key >= bucket.end`; the `grace` parameter is ignored because integer
59//   keys don't have a natural wall-clock correspondence. Late-arrival re-materialization is handled upstream by
60//   comparing `sequence_counter` across ticks.
61pub fn is_closed(
62	bucket: &Bucket,
63	series: &Series,
64	metadata: &SeriesMetadata,
65	now: SystemTime,
66	grace: Duration,
67) -> bool {
68	match &series.key {
69		SeriesKey::DateTime {
70			precision,
71			..
72		} => {
73			let bucket_end_wall = to_systemtime(bucket.end, *precision);
74			now.duration_since(bucket_end_wall).map(|d| d > grace).unwrap_or(false)
75		}
76		SeriesKey::Integer {
77			..
78		} => metadata.newest_key >= bucket.end,
79	}
80}
81
82fn to_systemtime(key: u64, precision: TimestampPrecision) -> SystemTime {
83	let nanos: u128 = match precision {
84		TimestampPrecision::Second => (key as u128) * 1_000_000_000,
85		TimestampPrecision::Millisecond => (key as u128) * 1_000_000,
86		TimestampPrecision::Microsecond => (key as u128) * 1_000,
87		TimestampPrecision::Nanosecond => key as u128,
88	};
89	UNIX_EPOCH + Duration::from_nanos(nanos as u64)
90}
91
92#[cfg(test)]
93mod tests {
94	use reifydb_core::interface::catalog::id::{NamespaceId, SeriesId};
95
96	use super::*;
97
98	#[test]
99	fn bucket_for_aligns_to_width() {
100		let b = bucket_for(137, 100);
101		assert_eq!(b.start, 100);
102		assert_eq!(b.end, 200);
103		assert_eq!(b.width, 100);
104		assert!(b.contains(137));
105		assert!(!b.contains(200));
106		assert_eq!(b.id(), BucketId(100));
107	}
108
109	fn series_with(key: SeriesKey) -> Series {
110		Series {
111			id: SeriesId(1),
112			namespace: NamespaceId(1),
113			name: "s".into(),
114			columns: vec![],
115			tag: None,
116			key,
117			primary_key: None,
118			underlying: false,
119		}
120	}
121
122	#[test]
123	fn integer_bucket_closed_when_newest_key_advances() {
124		let s = series_with(SeriesKey::Integer {
125			column: "k".into(),
126		});
127		let b = Bucket {
128			start: 0,
129			end: 100,
130			width: 100,
131		};
132		let mut meta = SeriesMetadata::new(s.id);
133		meta.newest_key = 99;
134		assert!(!is_closed(&b, &s, &meta, SystemTime::now(), Duration::ZERO));
135		meta.newest_key = 100;
136		assert!(is_closed(&b, &s, &meta, SystemTime::now(), Duration::ZERO));
137	}
138
139	#[test]
140	fn datetime_bucket_closes_after_grace_elapses() {
141		let s = series_with(SeriesKey::DateTime {
142			column: "ts".into(),
143			precision: TimestampPrecision::Millisecond,
144		});
145		// bucket.end at 1000ms past epoch
146		let b = Bucket {
147			start: 0,
148			end: 1000,
149			width: 1000,
150		};
151		let meta = SeriesMetadata::new(s.id);
152		let bucket_end = UNIX_EPOCH + Duration::from_millis(1000);
153		assert!(!is_closed(&b, &s, &meta, bucket_end, Duration::from_millis(100)));
154		let past_grace = bucket_end + Duration::from_millis(250);
155		assert!(is_closed(&b, &s, &meta, past_grace, Duration::from_millis(100)));
156	}
157}