reifydb_core/key/
subscription_row.rs1use std::collections::Bound;
5
6use reifydb_type::value::row_number::RowNumber;
7
8use super::{EncodableKey, EncodableKeyRange, KeyKind};
9use crate::{
10 encoded::key::{EncodedKey, EncodedKeyRange},
11 interface::catalog::id::SubscriptionId,
12 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
13};
14
15const VERSION: u8 = 1;
16
17#[derive(Debug, Clone, PartialEq)]
18pub struct SubscriptionRowKey {
19 pub subscription: SubscriptionId,
20 pub row: RowNumber,
21}
22
23impl EncodableKey for SubscriptionRowKey {
24 const KIND: KeyKind = KeyKind::SubscriptionRow;
25
26 fn encode(&self) -> EncodedKey {
27 let mut serializer = KeySerializer::with_capacity(18); serializer
29 .extend_u8(VERSION)
30 .extend_u8(Self::KIND as u8)
31 .extend_u64(self.subscription.0)
32 .extend_u64(self.row.0);
33 serializer.to_encoded_key()
34 }
35
36 fn decode(key: &EncodedKey) -> Option<Self> {
37 let mut de = KeyDeserializer::from_bytes(key.as_slice());
38
39 let version = de.read_u8().ok()?;
40 if version != VERSION {
41 return None;
42 }
43
44 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
45 if kind != Self::KIND {
46 return None;
47 }
48
49 let subscription_id = de.read_u64().ok()?;
50 let subscription = SubscriptionId(subscription_id);
51 let row = de.read_row_number().ok()?;
52
53 Some(Self {
54 subscription,
55 row,
56 })
57 }
58}
59
60#[derive(Debug, Clone, PartialEq)]
61pub struct SubscriptionRowKeyRange {
62 pub subscription: SubscriptionId,
63}
64
65impl SubscriptionRowKeyRange {
66 fn decode_key(key: &EncodedKey) -> Option<Self> {
67 let mut de = KeyDeserializer::from_bytes(key.as_slice());
68
69 let version = de.read_u8().ok()?;
70 if version != VERSION {
71 return None;
72 }
73
74 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
75 if kind != Self::KIND {
76 return None;
77 }
78
79 let subscription_id = de.read_u64().ok()?;
80 let subscription = SubscriptionId(subscription_id);
81
82 Some(SubscriptionRowKeyRange {
83 subscription,
84 })
85 }
86
87 pub fn scan_range(subscription: SubscriptionId, last_key: Option<&EncodedKey>) -> EncodedKeyRange {
94 let range = SubscriptionRowKeyRange {
95 subscription,
96 };
97
98 if let Some(last_key) = last_key {
99 EncodedKeyRange::new(Bound::Excluded(last_key.clone()), Bound::Included(range.end().unwrap()))
100 } else {
101 EncodedKeyRange::new(
102 Bound::Included(range.start().unwrap()),
103 Bound::Included(range.end().unwrap()),
104 )
105 }
106 }
107}
108
109impl EncodableKeyRange for SubscriptionRowKeyRange {
110 const KIND: KeyKind = KeyKind::SubscriptionRow;
111
112 fn start(&self) -> Option<EncodedKey> {
113 let mut serializer = KeySerializer::with_capacity(10); serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.subscription.0);
115 Some(serializer.to_encoded_key())
116 }
117
118 fn end(&self) -> Option<EncodedKey> {
119 let mut serializer = KeySerializer::with_capacity(18); serializer
121 .extend_u8(VERSION)
122 .extend_u8(Self::KIND as u8)
123 .extend_u64(self.subscription.0)
124 .extend_u64(0u64);
125 Some(serializer.to_encoded_key())
126 }
127
128 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
129 where
130 Self: Sized,
131 {
132 let start_key = match &range.start {
133 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
134 Bound::Unbounded => None,
135 };
136
137 let end_key = match &range.end {
138 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
139 Bound::Unbounded => None,
140 };
141
142 (start_key, end_key)
143 }
144}
145
146impl SubscriptionRowKey {
147 pub fn encoded(subscription: SubscriptionId, row: impl Into<RowNumber>) -> EncodedKey {
148 Self {
149 subscription,
150 row: row.into(),
151 }
152 .encode()
153 }
154
155 pub fn full_scan(subscription: SubscriptionId) -> EncodedKeyRange {
156 EncodedKeyRange::start_end(Some(Self::start(subscription)), Some(Self::end(subscription)))
157 }
158
159 fn start(subscription: SubscriptionId) -> EncodedKey {
160 let mut serializer = KeySerializer::with_capacity(18); serializer
162 .extend_u8(VERSION)
163 .extend_u8(Self::KIND as u8)
164 .extend_u64(subscription.0)
165 .extend_u64(u64::MAX);
166 serializer.to_encoded_key()
167 }
168
169 fn end(subscription: SubscriptionId) -> EncodedKey {
170 let mut serializer = KeySerializer::with_capacity(18); serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(subscription.0).extend_u64(0u64);
172 serializer.to_encoded_key()
173 }
174}
175
176#[cfg(test)]
177pub mod tests {
178 use reifydb_type::value::row_number::RowNumber;
179
180 use super::{EncodableKey, SubscriptionRowKey};
181 use crate::interface::catalog::id::SubscriptionId;
182
183 #[test]
184 fn test_encode_decode() {
185 let subscription = SubscriptionId(12345);
186 let key = SubscriptionRowKey {
187 subscription,
188 row: RowNumber(0x123456789ABCDEF0),
189 };
190 let encoded = key.encode();
191
192 let decoded = SubscriptionRowKey::decode(&encoded).unwrap();
193 assert_eq!(decoded.subscription, subscription);
194 assert_eq!(decoded.row, RowNumber(0x123456789ABCDEF0));
195 }
196
197 #[test]
198 fn test_order_preserving() {
199 let sub1 = SubscriptionId(100);
200 let sub2 = SubscriptionId(101);
201
202 let key1 = SubscriptionRowKey {
203 subscription: sub1,
204 row: RowNumber(100),
205 };
206 let key2 = SubscriptionRowKey {
207 subscription: sub1,
208 row: RowNumber(200),
209 };
210 let key3 = SubscriptionRowKey {
211 subscription: sub2,
212 row: RowNumber(1),
213 };
214
215 let encoded1 = key1.encode();
216 let encoded2 = key2.encode();
217 let encoded3 = key3.encode();
218
219 assert!(encoded1 > encoded2, "row ordering not preserved");
220 assert!(encoded1 > encoded3, "subscription ordering not preserved");
221 }
222}