reifydb_core/key/
flow_node_internal_state.rs1use std::ops::Bound;
5
6use super::{EncodableKey, EncodableKeyRange, KeyKind};
7use crate::{
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 interface::catalog::flow::FlowNodeId,
10 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct FlowNodeInternalStateKey {
15 pub node: FlowNodeId,
16 pub key: Vec<u8>,
17}
18
19impl EncodableKey for FlowNodeInternalStateKey {
20 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
21
22 fn encode(&self) -> EncodedKey {
23 let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
24 serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
25 serializer.to_encoded_key()
26 }
27
28 fn decode(key: &EncodedKey) -> Option<Self> {
29 let mut de = KeyDeserializer::from_bytes(key.as_slice());
30
31 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
32 if kind != Self::KIND {
33 return None;
34 }
35
36 let node_id = de.read_u64().ok()?;
37 let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
38
39 Some(Self {
40 node: FlowNodeId(node_id),
41 key: key_bytes,
42 })
43 }
44}
45
46impl FlowNodeInternalStateKey {
47 pub const ROW_NUMBER_COUNTER_TAG: u8 = b'C';
48
49 pub const ROW_NUMBER_MAPPING_TAG: u8 = b'M';
50
51 pub const WINDOW_META_TAG: u8 = b'W';
52
53 pub const GATE_VISIBILITY_TAG: u8 = b'G';
54
55 pub fn is_row_number_counter(&self) -> bool {
56 self.key.as_slice() == [Self::ROW_NUMBER_COUNTER_TAG]
57 }
58
59 pub fn is_row_number_mapping(&self) -> bool {
60 self.key.first() == Some(&Self::ROW_NUMBER_MAPPING_TAG)
61 }
62
63 pub fn is_window_meta(&self) -> bool {
64 self.key.first() == Some(&Self::WINDOW_META_TAG)
65 }
66
67 pub fn is_gate_visibility(&self) -> bool {
68 self.key.first() == Some(&Self::GATE_VISIBILITY_TAG)
69 }
70
71 pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
72 Self {
73 node,
74 key,
75 }
76 }
77
78 pub fn new_empty(node: FlowNodeId) -> Self {
79 Self {
80 node,
81 key: Vec::new(),
82 }
83 }
84
85 pub fn encoded(node: impl Into<FlowNodeId>, key: impl Into<Vec<u8>>) -> EncodedKey {
86 Self::new(node.into(), key.into()).encode()
87 }
88
89 pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
90 let range = FlowNodeInternalStateKeyRange::new(node);
91 EncodedKeyRange::start_end(range.start(), range.end())
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct FlowNodeInternalStateKeyRange {
97 pub node: FlowNodeId,
98}
99
100impl FlowNodeInternalStateKeyRange {
101 pub fn new(node: FlowNodeId) -> Self {
102 Self {
103 node,
104 }
105 }
106
107 fn decode_key(key: &EncodedKey) -> Option<Self> {
108 let mut de = KeyDeserializer::from_bytes(key.as_slice());
109
110 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
111 if kind != FlowNodeInternalStateKey::KIND {
112 return None;
113 }
114
115 let node_id = de.read_u64().ok()?;
116
117 Some(Self {
118 node: FlowNodeId(node_id),
119 })
120 }
121}
122
123impl EncodableKeyRange for FlowNodeInternalStateKeyRange {
124 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
125
126 fn start(&self) -> Option<EncodedKey> {
127 let mut serializer = KeySerializer::with_capacity(9);
128 serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0);
129 Some(serializer.to_encoded_key())
130 }
131
132 fn end(&self) -> Option<EncodedKey> {
133 let mut serializer = KeySerializer::with_capacity(9);
134 serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
135 Some(serializer.to_encoded_key())
136 }
137
138 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
139 where
140 Self: Sized,
141 {
142 let start_key = match &range.start {
143 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
144 Bound::Unbounded => None,
145 };
146
147 let end_key = match &range.end {
148 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
149 Bound::Unbounded => None,
150 };
151
152 (start_key, end_key)
153 }
154}
155
156#[cfg(test)]
157pub mod tests {
158 use super::{EncodableKey, EncodableKeyRange, FlowNodeInternalStateKey, FlowNodeInternalStateKeyRange};
159 use crate::{
160 encoded::key::{EncodedKey, EncodedKeyRange},
161 interface::catalog::flow::FlowNodeId,
162 };
163
164 #[test]
165 fn test_encode_decode() {
166 let key = FlowNodeInternalStateKey {
167 node: FlowNodeId(0xDEADBEEF),
168 key: vec![1, 2, 3, 4],
169 };
170 let encoded = key.encode();
171
172 assert_eq!(encoded[0], 0xE0);
173
174 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
175 assert_eq!(decoded.node.0, 0xDEADBEEF);
176 assert_eq!(decoded.key, vec![1, 2, 3, 4]);
177 }
178
179 #[test]
180 fn test_encode_decode_empty_key() {
181 let key = FlowNodeInternalStateKey {
182 node: FlowNodeId(0xDEADBEEF),
183 key: vec![],
184 };
185 let encoded = key.encode();
186
187 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
188 assert_eq!(decoded.node.0, 0xDEADBEEF);
189 assert_eq!(decoded.key, Vec::<u8>::new());
190 }
191
192 #[test]
193 fn test_new() {
194 let key = FlowNodeInternalStateKey::new(FlowNodeId(42), vec![5, 6, 7]);
195 assert_eq!(key.node.0, 42);
196 assert_eq!(key.key, vec![5, 6, 7]);
197 }
198
199 #[test]
200 fn test_new_empty() {
201 let key = FlowNodeInternalStateKey::new_empty(FlowNodeId(42));
202 assert_eq!(key.node.0, 42);
203 assert_eq!(key.key, Vec::<u8>::new());
204 }
205
206 #[test]
207 fn test_roundtrip() {
208 let original = FlowNodeInternalStateKey {
209 node: FlowNodeId(999_999_999),
210 key: vec![10, 20, 30, 40, 50],
211 };
212 let encoded = original.encode();
213 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
214 assert_eq!(original, decoded);
215 }
216
217 #[test]
218 fn test_decode_invalid_version() {
219 let mut encoded = Vec::new();
220 encoded.push(0xFF);
221 encoded.push(0xE5);
222 encoded.extend(&999u64.to_be_bytes());
223 let key = EncodedKey::new(encoded);
224 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
225 }
226
227 #[test]
228 fn test_decode_invalid_kind() {
229 let mut encoded = Vec::new();
230 encoded.push(0xFE);
231 encoded.push(0xFF);
232 encoded.extend(&999u64.to_be_bytes());
233 let key = EncodedKey::new(encoded);
234 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
235 }
236
237 #[test]
238 fn test_decode_too_short() {
239 let mut encoded = Vec::new();
240 encoded.push(0xFE);
241 encoded.push(0xE5);
242 encoded.extend(&999u32.to_be_bytes());
243 let key = EncodedKey::new(encoded);
244 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
245 }
246
247 #[test]
248 fn test_flow_node_internal_state_key_range() {
249 let node = FlowNodeId(42);
250 let range = FlowNodeInternalStateKeyRange::new(node);
251
252 let start = range.start().unwrap();
253 let decoded_start = FlowNodeInternalStateKey::decode(&start).unwrap();
254 assert_eq!(decoded_start.node, node);
255 assert_eq!(decoded_start.key, Vec::<u8>::new());
256
257 let end = range.end().unwrap();
258 let decoded_end = FlowNodeInternalStateKey::decode(&end).unwrap();
259 assert_eq!(decoded_end.node.0, 41);
260 assert_eq!(decoded_end.key, Vec::<u8>::new());
261 }
262
263 #[test]
264 fn test_flow_node_internal_state_key_range_decode() {
265 let node = FlowNodeId(100);
266 let range = FlowNodeInternalStateKeyRange::new(node);
267
268 let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
269
270 let (start_decoded, end_decoded) = FlowNodeInternalStateKeyRange::decode(&encoded_range);
271
272 assert!(start_decoded.is_some());
273 assert_eq!(start_decoded.unwrap().node, node);
274
275 assert!(end_decoded.is_some());
276 assert_eq!(end_decoded.unwrap().node.0, 99);
277 }
278}