reifydb_core/key/
flow_node_internal_state.rs1use super::{EncodableKey, EncodableKeyRange, KeyKind};
5use crate::{
6 encoded::key::{EncodedKey, EncodedKeyRange},
7 interface::catalog::flow::FlowNodeId,
8 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct FlowNodeInternalStateKey {
13 pub node: FlowNodeId,
14 pub key: Vec<u8>,
15}
16
17const VERSION: u8 = 1;
18
19impl EncodableKey for FlowNodeInternalStateKey {
20 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
21
22 fn encode(&self) -> EncodedKey {
23 let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
24 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
25 serializer.to_encoded_key()
26 }
27
28 fn decode(key: &EncodedKey) -> Option<Self> {
29 let mut de = KeyDeserializer::from_bytes(key.as_slice());
30
31 let version = de.read_u8().ok()?;
32 if version != VERSION {
33 return None;
34 }
35
36 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
37 if kind != Self::KIND {
38 return None;
39 }
40
41 let node_id = de.read_u64().ok()?;
42 let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
43
44 Some(Self {
45 node: FlowNodeId(node_id),
46 key: key_bytes,
47 })
48 }
49}
50
51impl FlowNodeInternalStateKey {
52 pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
53 Self {
54 node,
55 key,
56 }
57 }
58
59 pub fn new_empty(node: FlowNodeId) -> Self {
60 Self {
61 node,
62 key: Vec::new(),
63 }
64 }
65
66 pub fn encoded(node: impl Into<FlowNodeId>, key: impl Into<Vec<u8>>) -> EncodedKey {
67 Self::new(node.into(), key.into()).encode()
68 }
69
70 pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
72 let range = FlowNodeInternalStateKeyRange::new(node);
73 EncodedKeyRange::start_end(range.start(), range.end())
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct FlowNodeInternalStateKeyRange {
79 pub node: FlowNodeId,
80}
81
82impl FlowNodeInternalStateKeyRange {
83 pub fn new(node: FlowNodeId) -> Self {
84 Self {
85 node,
86 }
87 }
88
89 fn decode_key(key: &EncodedKey) -> Option<Self> {
90 let mut de = KeyDeserializer::from_bytes(key.as_slice());
91
92 let version = de.read_u8().ok()?;
93 if version != VERSION {
94 return None;
95 }
96
97 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
98 if kind != FlowNodeInternalStateKey::KIND {
99 return None;
100 }
101
102 let node_id = de.read_u64().ok()?;
103
104 Some(Self {
105 node: FlowNodeId(node_id),
106 })
107 }
108}
109
110impl EncodableKeyRange for FlowNodeInternalStateKeyRange {
111 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
112
113 fn start(&self) -> Option<EncodedKey> {
114 let mut serializer = KeySerializer::with_capacity(10);
115 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0);
116 Some(serializer.to_encoded_key())
117 }
118
119 fn end(&self) -> Option<EncodedKey> {
120 let mut serializer = KeySerializer::with_capacity(10);
121 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
122 Some(serializer.to_encoded_key())
123 }
124
125 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
126 where
127 Self: Sized,
128 {
129 use std::ops::Bound;
130
131 let start_key = match &range.start {
132 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
133 Bound::Unbounded => None,
134 };
135
136 let end_key = match &range.end {
137 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
138 Bound::Unbounded => None,
139 };
140
141 (start_key, end_key)
142 }
143}
144
145#[cfg(test)]
146pub mod tests {
147 use super::{EncodableKey, EncodableKeyRange, FlowNodeInternalStateKey, FlowNodeInternalStateKeyRange};
148 use crate::{
149 encoded::key::{EncodedKey, EncodedKeyRange},
150 interface::catalog::flow::FlowNodeId,
151 };
152
153 #[test]
154 fn test_encode_decode() {
155 let key = FlowNodeInternalStateKey {
156 node: FlowNodeId(0xDEADBEEF),
157 key: vec![1, 2, 3, 4],
158 };
159 let encoded = key.encode();
160
161 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE0); let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
166 assert_eq!(decoded.node.0, 0xDEADBEEF);
167 assert_eq!(decoded.key, vec![1, 2, 3, 4]);
168 }
169
170 #[test]
171 fn test_encode_decode_empty_key() {
172 let key = FlowNodeInternalStateKey {
173 node: FlowNodeId(0xDEADBEEF),
174 key: vec![],
175 };
176 let encoded = key.encode();
177
178 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
179 assert_eq!(decoded.node.0, 0xDEADBEEF);
180 assert_eq!(decoded.key, Vec::<u8>::new());
181 }
182
183 #[test]
184 fn test_new() {
185 let key = FlowNodeInternalStateKey::new(FlowNodeId(42), vec![5, 6, 7]);
186 assert_eq!(key.node.0, 42);
187 assert_eq!(key.key, vec![5, 6, 7]);
188 }
189
190 #[test]
191 fn test_new_empty() {
192 let key = FlowNodeInternalStateKey::new_empty(FlowNodeId(42));
193 assert_eq!(key.node.0, 42);
194 assert_eq!(key.key, Vec::<u8>::new());
195 }
196
197 #[test]
198 fn test_roundtrip() {
199 let original = FlowNodeInternalStateKey {
200 node: FlowNodeId(999_999_999),
201 key: vec![10, 20, 30, 40, 50],
202 };
203 let encoded = original.encode();
204 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
205 assert_eq!(original, decoded);
206 }
207
208 #[test]
209 fn test_decode_invalid_version() {
210 let mut encoded = Vec::new();
211 encoded.push(0xFF); encoded.push(0xE5); encoded.extend(&999u64.to_be_bytes());
214 let key = EncodedKey::new(encoded);
215 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
216 }
217
218 #[test]
219 fn test_decode_invalid_kind() {
220 let mut encoded = Vec::new();
221 encoded.push(0xFE); encoded.push(0xFF); encoded.extend(&999u64.to_be_bytes());
224 let key = EncodedKey::new(encoded);
225 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
226 }
227
228 #[test]
229 fn test_decode_too_short() {
230 let mut encoded = Vec::new();
231 encoded.push(0xFE); encoded.push(0xE5); encoded.extend(&999u32.to_be_bytes()); let key = EncodedKey::new(encoded);
235 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
236 }
237
238 #[test]
239 fn test_flow_node_internal_state_key_range() {
240 let node = FlowNodeId(42);
241 let range = FlowNodeInternalStateKeyRange::new(node);
242
243 let start = range.start().unwrap();
245 let decoded_start = FlowNodeInternalStateKey::decode(&start).unwrap();
246 assert_eq!(decoded_start.node, node);
247 assert_eq!(decoded_start.key, Vec::<u8>::new());
248
249 let end = range.end().unwrap();
251 let decoded_end = FlowNodeInternalStateKey::decode(&end).unwrap();
252 assert_eq!(decoded_end.node.0, 41); assert_eq!(decoded_end.key, Vec::<u8>::new());
254 }
255
256 #[test]
257 fn test_flow_node_internal_state_key_range_decode() {
258 let node = FlowNodeId(100);
259 let range = FlowNodeInternalStateKeyRange::new(node);
260
261 let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
263
264 let (start_decoded, end_decoded) = FlowNodeInternalStateKeyRange::decode(&encoded_range);
266
267 assert!(start_decoded.is_some());
268 assert_eq!(start_decoded.unwrap().node, node);
269
270 assert!(end_decoded.is_some());
271 assert_eq!(end_decoded.unwrap().node.0, 99);
272 }
273}