1use std::collections::Bound;
5
6use reifydb_type::RowNumber;
7
8use super::{EncodableKey, KeyKind};
9use crate::{
10 EncodedKey, EncodedKeyRange,
11 interface::{EncodableKeyRange, catalog::SourceId},
12 util::encoding::keycode::{KeyDeserializer, KeySerializer},
13};
14
15const VERSION: u8 = 1;
16
17#[derive(Debug, Clone, PartialEq)]
18pub struct RowKey {
19 pub source: SourceId,
20 pub row: RowNumber,
21}
22
23impl EncodableKey for RowKey {
24 const KIND: KeyKind = KeyKind::Row;
25
26 fn encode(&self) -> EncodedKey {
27 let mut serializer = KeySerializer::with_capacity(19); serializer
29 .extend_u8(VERSION)
30 .extend_u8(Self::KIND as u8)
31 .extend_source_id(self.source)
32 .extend_u64(self.row.0);
33 serializer.to_encoded_key()
34 }
35
36 fn decode(key: &EncodedKey) -> Option<Self> {
37 let mut de = KeyDeserializer::from_bytes(key.as_slice());
38
39 let version = de.read_u8().ok()?;
40 if version != VERSION {
41 return None;
42 }
43
44 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
45 if kind != Self::KIND {
46 return None;
47 }
48
49 let source = de.read_source_id().ok()?;
50 let row = de.read_row_number().ok()?;
51
52 Some(Self {
53 source,
54 row,
55 })
56 }
57}
58
59#[derive(Debug, Clone, PartialEq)]
60pub struct RowKeyRange {
61 pub source: SourceId,
62}
63
64impl RowKeyRange {
65 fn decode_key(key: &EncodedKey) -> Option<Self> {
66 let mut de = KeyDeserializer::from_bytes(key.as_slice());
67
68 let version = de.read_u8().ok()?;
69 if version != VERSION {
70 return None;
71 }
72
73 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
74 if kind != Self::KIND {
75 return None;
76 }
77
78 let source = de.read_source_id().ok()?;
79
80 Some(RowKeyRange {
81 source,
82 })
83 }
84
85 pub fn scan_range(source: SourceId, last_key: Option<&EncodedKey>) -> EncodedKeyRange {
92 let range = RowKeyRange {
93 source,
94 };
95
96 if let Some(last_key) = last_key {
97 EncodedKeyRange::new(Bound::Excluded(last_key.clone()), Bound::Included(range.end().unwrap()))
98 } else {
99 EncodedKeyRange::new(
100 Bound::Included(range.start().unwrap()),
101 Bound::Included(range.end().unwrap()),
102 )
103 }
104 }
105}
106
107impl EncodableKeyRange for RowKeyRange {
108 const KIND: KeyKind = KeyKind::Row;
109
110 fn start(&self) -> Option<EncodedKey> {
111 let mut serializer = KeySerializer::with_capacity(11); serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(self.source);
113 Some(serializer.to_encoded_key())
114 }
115
116 fn end(&self) -> Option<EncodedKey> {
117 let mut serializer = KeySerializer::with_capacity(11);
118 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(self.source.prev());
119 Some(serializer.to_encoded_key())
120 }
121
122 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
123 where
124 Self: Sized,
125 {
126 let start_key = match &range.start {
127 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
128 Bound::Unbounded => None,
129 };
130
131 let end_key = match &range.end {
132 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
133 Bound::Unbounded => None,
134 };
135
136 (start_key, end_key)
137 }
138}
139
140impl RowKey {
141 pub fn encoded(source: impl Into<SourceId>, row: impl Into<RowNumber>) -> EncodedKey {
142 Self {
143 source: source.into(),
144 row: row.into(),
145 }
146 .encode()
147 }
148
149 pub fn full_scan(source: impl Into<SourceId>) -> EncodedKeyRange {
150 let source = source.into();
151 EncodedKeyRange::start_end(Some(Self::source_start(source)), Some(Self::source_end(source)))
152 }
153
154 pub fn source_start(source: impl Into<SourceId>) -> EncodedKey {
155 let source = source.into();
156 let mut serializer = KeySerializer::with_capacity(11);
157 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(source);
158 serializer.to_encoded_key()
159 }
160
161 pub fn source_end(source: impl Into<SourceId>) -> EncodedKey {
162 let source = source.into();
163 let mut serializer = KeySerializer::with_capacity(11);
164 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(source.prev());
165 serializer.to_encoded_key()
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use reifydb_type::RowNumber;
172
173 use super::{EncodableKey, RowKey};
174 use crate::interface::catalog::SourceId;
175
176 #[test]
177 fn test_encode_decode() {
178 let key = RowKey {
179 source: SourceId::table(0xABCD),
180 row: RowNumber(0x123456789ABCDEF0),
181 };
182 let encoded = key.encode();
183
184 let expected: Vec<u8> = vec![
185 0xFE, 0xFC, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x54, 0x32, 0xED, 0xCB, 0xA9, 0x87, 0x65, 0x43, 0x21, 0x0F,
189 ];
190
191 assert_eq!(encoded.as_slice(), expected);
192
193 let key = RowKey::decode(&encoded).unwrap();
194 assert_eq!(key.source, SourceId::table(0xABCD));
195 assert_eq!(key.row, 0x123456789ABCDEF0);
196 }
197
198 #[test]
199 fn test_order_preserving() {
200 let key1 = RowKey {
201 source: SourceId::table(1),
202 row: RowNumber(100),
203 };
204 let key2 = RowKey {
205 source: SourceId::table(1),
206 row: RowNumber(200),
207 };
208 let key3 = RowKey {
209 source: SourceId::table(2),
210 row: RowNumber(1),
211 };
212
213 let encoded1 = key1.encode();
214 let encoded2 = key2.encode();
215 let encoded3 = key3.encode();
216
217 assert!(encoded3 < encoded2, "ordering not preserved");
218 assert!(encoded2 < encoded1, "ordering not preserved");
219 }
220}