Skip to main content

reifydb_core/key/
subscription_row.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::Bound;
5
6use reifydb_type::value::row_number::RowNumber;
7
8use super::{EncodableKey, EncodableKeyRange, KeyKind};
9use crate::{
10	encoded::key::{EncodedKey, EncodedKeyRange},
11	interface::catalog::id::SubscriptionId,
12	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
13};
14
15const VERSION: u8 = 1;
16
17#[derive(Debug, Clone, PartialEq)]
18pub struct SubscriptionRowKey {
19	pub subscription: SubscriptionId,
20	pub row: RowNumber,
21}
22
23impl EncodableKey for SubscriptionRowKey {
24	const KIND: KeyKind = KeyKind::SubscriptionRow;
25
26	fn encode(&self) -> EncodedKey {
27		let mut serializer = KeySerializer::with_capacity(18); // 1 + 1 + 8 (subscription u64) + 8 (row)
28		serializer
29			.extend_u8(VERSION)
30			.extend_u8(Self::KIND as u8)
31			.extend_u64(self.subscription.0)
32			.extend_u64(self.row.0);
33		serializer.to_encoded_key()
34	}
35
36	fn decode(key: &EncodedKey) -> Option<Self> {
37		let mut de = KeyDeserializer::from_bytes(key.as_slice());
38
39		let version = de.read_u8().ok()?;
40		if version != VERSION {
41			return None;
42		}
43
44		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
45		if kind != Self::KIND {
46			return None;
47		}
48
49		let subscription_id = de.read_u64().ok()?;
50		let subscription = SubscriptionId(subscription_id);
51		let row = de.read_row_number().ok()?;
52
53		Some(Self {
54			subscription,
55			row,
56		})
57	}
58}
59
60#[derive(Debug, Clone, PartialEq)]
61pub struct SubscriptionRowKeyRange {
62	pub subscription: SubscriptionId,
63}
64
65impl SubscriptionRowKeyRange {
66	fn decode_key(key: &EncodedKey) -> Option<Self> {
67		let mut de = KeyDeserializer::from_bytes(key.as_slice());
68
69		let version = de.read_u8().ok()?;
70		if version != VERSION {
71			return None;
72		}
73
74		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
75		if kind != Self::KIND {
76			return None;
77		}
78
79		let subscription_id = de.read_u64().ok()?;
80		let subscription = SubscriptionId(subscription_id);
81
82		Some(SubscriptionRowKeyRange {
83			subscription,
84		})
85	}
86
87	/// Create a range for scanning rows from a subscription
88	///
89	/// If `last_key` is provided, creates a range that continues from after that key.
90	/// Otherwise, creates a range that includes all rows for the subscription.
91	///
92	/// The caller is responsible for limiting the number of results returned.
93	pub fn scan_range(subscription: SubscriptionId, last_key: Option<&EncodedKey>) -> EncodedKeyRange {
94		let range = SubscriptionRowKeyRange {
95			subscription,
96		};
97
98		if let Some(last_key) = last_key {
99			EncodedKeyRange::new(Bound::Excluded(last_key.clone()), Bound::Included(range.end().unwrap()))
100		} else {
101			EncodedKeyRange::new(
102				Bound::Included(range.start().unwrap()),
103				Bound::Included(range.end().unwrap()),
104			)
105		}
106	}
107}
108
109impl EncodableKeyRange for SubscriptionRowKeyRange {
110	const KIND: KeyKind = KeyKind::SubscriptionRow;
111
112	fn start(&self) -> Option<EncodedKey> {
113		let mut serializer = KeySerializer::with_capacity(10); // 1 + 1 + 8 (subscription u64)
114		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.subscription.0);
115		Some(serializer.to_encoded_key())
116	}
117
118	fn end(&self) -> Option<EncodedKey> {
119		let mut serializer = KeySerializer::with_capacity(18); // 1 + 1 + 8 + 8
120		serializer
121			.extend_u8(VERSION)
122			.extend_u8(Self::KIND as u8)
123			.extend_u64(self.subscription.0)
124			.extend_u64(0u64);
125		Some(serializer.to_encoded_key())
126	}
127
128	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
129	where
130		Self: Sized,
131	{
132		let start_key = match &range.start {
133			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
134			Bound::Unbounded => None,
135		};
136
137		let end_key = match &range.end {
138			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
139			Bound::Unbounded => None,
140		};
141
142		(start_key, end_key)
143	}
144}
145
146impl SubscriptionRowKey {
147	pub fn encoded(subscription: SubscriptionId, row: impl Into<RowNumber>) -> EncodedKey {
148		Self {
149			subscription,
150			row: row.into(),
151		}
152		.encode()
153	}
154
155	pub fn full_scan(subscription: SubscriptionId) -> EncodedKeyRange {
156		EncodedKeyRange::start_end(Some(Self::start(subscription)), Some(Self::end(subscription)))
157	}
158
159	fn start(subscription: SubscriptionId) -> EncodedKey {
160		let mut serializer = KeySerializer::with_capacity(18); // 1 + 1 + 8 + 8
161		serializer
162			.extend_u8(VERSION)
163			.extend_u8(Self::KIND as u8)
164			.extend_u64(subscription.0)
165			.extend_u64(u64::MAX);
166		serializer.to_encoded_key()
167	}
168
169	fn end(subscription: SubscriptionId) -> EncodedKey {
170		let mut serializer = KeySerializer::with_capacity(18); // 1 + 1 + 8 + 8
171		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(subscription.0).extend_u64(0u64);
172		serializer.to_encoded_key()
173	}
174}
175
176#[cfg(test)]
177pub mod tests {
178	use reifydb_type::value::row_number::RowNumber;
179
180	use super::{EncodableKey, SubscriptionRowKey};
181	use crate::interface::catalog::id::SubscriptionId;
182
183	#[test]
184	fn test_encode_decode() {
185		let subscription = SubscriptionId(12345);
186		let key = SubscriptionRowKey {
187			subscription,
188			row: RowNumber(0x123456789ABCDEF0),
189		};
190		let encoded = key.encode();
191
192		let decoded = SubscriptionRowKey::decode(&encoded).unwrap();
193		assert_eq!(decoded.subscription, subscription);
194		assert_eq!(decoded.row, RowNumber(0x123456789ABCDEF0));
195	}
196
197	#[test]
198	fn test_order_preserving() {
199		let sub1 = SubscriptionId(100);
200		let sub2 = SubscriptionId(101);
201
202		let key1 = SubscriptionRowKey {
203			subscription: sub1,
204			row: RowNumber(100),
205		};
206		let key2 = SubscriptionRowKey {
207			subscription: sub1,
208			row: RowNumber(200),
209		};
210		let key3 = SubscriptionRowKey {
211			subscription: sub2,
212			row: RowNumber(1),
213		};
214
215		let encoded1 = key1.encode();
216		let encoded2 = key2.encode();
217		let encoded3 = key3.encode();
218
219		assert!(encoded1 > encoded2, "row ordering not preserved");
220		assert!(encoded1 > encoded3, "subscription ordering not preserved");
221	}
222}