reifydb_core/key/
flow_node_internal_state.rs1use std::ops::Bound;
5
6use super::{EncodableKey, EncodableKeyRange, KeyKind};
7use crate::{
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 interface::catalog::flow::FlowNodeId,
10 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct FlowNodeInternalStateKey {
15 pub node: FlowNodeId,
16 pub key: Vec<u8>,
17}
18
19const VERSION: u8 = 1;
20
21impl EncodableKey for FlowNodeInternalStateKey {
22 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
23
24 fn encode(&self) -> EncodedKey {
25 let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
26 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
27 serializer.to_encoded_key()
28 }
29
30 fn decode(key: &EncodedKey) -> Option<Self> {
31 let mut de = KeyDeserializer::from_bytes(key.as_slice());
32
33 let version = de.read_u8().ok()?;
34 if version != VERSION {
35 return None;
36 }
37
38 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
39 if kind != Self::KIND {
40 return None;
41 }
42
43 let node_id = de.read_u64().ok()?;
44 let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
45
46 Some(Self {
47 node: FlowNodeId(node_id),
48 key: key_bytes,
49 })
50 }
51}
52
53impl FlowNodeInternalStateKey {
54 pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
55 Self {
56 node,
57 key,
58 }
59 }
60
61 pub fn new_empty(node: FlowNodeId) -> Self {
62 Self {
63 node,
64 key: Vec::new(),
65 }
66 }
67
68 pub fn encoded(node: impl Into<FlowNodeId>, key: impl Into<Vec<u8>>) -> EncodedKey {
69 Self::new(node.into(), key.into()).encode()
70 }
71
72 pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
74 let range = FlowNodeInternalStateKeyRange::new(node);
75 EncodedKeyRange::start_end(range.start(), range.end())
76 }
77}
78
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct FlowNodeInternalStateKeyRange {
81 pub node: FlowNodeId,
82}
83
84impl FlowNodeInternalStateKeyRange {
85 pub fn new(node: FlowNodeId) -> Self {
86 Self {
87 node,
88 }
89 }
90
91 fn decode_key(key: &EncodedKey) -> Option<Self> {
92 let mut de = KeyDeserializer::from_bytes(key.as_slice());
93
94 let version = de.read_u8().ok()?;
95 if version != VERSION {
96 return None;
97 }
98
99 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
100 if kind != FlowNodeInternalStateKey::KIND {
101 return None;
102 }
103
104 let node_id = de.read_u64().ok()?;
105
106 Some(Self {
107 node: FlowNodeId(node_id),
108 })
109 }
110}
111
112impl EncodableKeyRange for FlowNodeInternalStateKeyRange {
113 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
114
115 fn start(&self) -> Option<EncodedKey> {
116 let mut serializer = KeySerializer::with_capacity(10);
117 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0);
118 Some(serializer.to_encoded_key())
119 }
120
121 fn end(&self) -> Option<EncodedKey> {
122 let mut serializer = KeySerializer::with_capacity(10);
123 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
124 Some(serializer.to_encoded_key())
125 }
126
127 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
128 where
129 Self: Sized,
130 {
131 let start_key = match &range.start {
132 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
133 Bound::Unbounded => None,
134 };
135
136 let end_key = match &range.end {
137 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
138 Bound::Unbounded => None,
139 };
140
141 (start_key, end_key)
142 }
143}
144
145#[cfg(test)]
146pub mod tests {
147 use super::{EncodableKey, EncodableKeyRange, FlowNodeInternalStateKey, FlowNodeInternalStateKeyRange};
148 use crate::{
149 encoded::key::{EncodedKey, EncodedKeyRange},
150 interface::catalog::flow::FlowNodeId,
151 };
152
153 #[test]
154 fn test_encode_decode() {
155 let key = FlowNodeInternalStateKey {
156 node: FlowNodeId(0xDEADBEEF),
157 key: vec![1, 2, 3, 4],
158 };
159 let encoded = key.encode();
160
161 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE0); let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
166 assert_eq!(decoded.node.0, 0xDEADBEEF);
167 assert_eq!(decoded.key, vec![1, 2, 3, 4]);
168 }
169
170 #[test]
171 fn test_encode_decode_empty_key() {
172 let key = FlowNodeInternalStateKey {
173 node: FlowNodeId(0xDEADBEEF),
174 key: vec![],
175 };
176 let encoded = key.encode();
177
178 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
179 assert_eq!(decoded.node.0, 0xDEADBEEF);
180 assert_eq!(decoded.key, Vec::<u8>::new());
181 }
182
183 #[test]
184 fn test_new() {
185 let key = FlowNodeInternalStateKey::new(FlowNodeId(42), vec![5, 6, 7]);
186 assert_eq!(key.node.0, 42);
187 assert_eq!(key.key, vec![5, 6, 7]);
188 }
189
190 #[test]
191 fn test_new_empty() {
192 let key = FlowNodeInternalStateKey::new_empty(FlowNodeId(42));
193 assert_eq!(key.node.0, 42);
194 assert_eq!(key.key, Vec::<u8>::new());
195 }
196
197 #[test]
198 fn test_roundtrip() {
199 let original = FlowNodeInternalStateKey {
200 node: FlowNodeId(999_999_999),
201 key: vec![10, 20, 30, 40, 50],
202 };
203 let encoded = original.encode();
204 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
205 assert_eq!(original, decoded);
206 }
207
208 #[test]
209 fn test_decode_invalid_version() {
210 let mut encoded = Vec::new();
211 encoded.push(0xFF); encoded.push(0xE5); encoded.extend(&999u64.to_be_bytes());
214 let key = EncodedKey::new(encoded);
215 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
216 }
217
218 #[test]
219 fn test_decode_invalid_kind() {
220 let mut encoded = Vec::new();
221 encoded.push(0xFE); encoded.push(0xFF); encoded.extend(&999u64.to_be_bytes());
224 let key = EncodedKey::new(encoded);
225 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
226 }
227
228 #[test]
229 fn test_decode_too_short() {
230 let mut encoded = Vec::new();
231 encoded.push(0xFE); encoded.push(0xE5); encoded.extend(&999u32.to_be_bytes()); let key = EncodedKey::new(encoded);
235 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
236 }
237
238 #[test]
239 fn test_flow_node_internal_state_key_range() {
240 let node = FlowNodeId(42);
241 let range = FlowNodeInternalStateKeyRange::new(node);
242
243 let start = range.start().unwrap();
245 let decoded_start = FlowNodeInternalStateKey::decode(&start).unwrap();
246 assert_eq!(decoded_start.node, node);
247 assert_eq!(decoded_start.key, Vec::<u8>::new());
248
249 let end = range.end().unwrap();
251 let decoded_end = FlowNodeInternalStateKey::decode(&end).unwrap();
252 assert_eq!(decoded_end.node.0, 41); assert_eq!(decoded_end.key, Vec::<u8>::new());
254 }
255
256 #[test]
257 fn test_flow_node_internal_state_key_range_decode() {
258 let node = FlowNodeId(100);
259 let range = FlowNodeInternalStateKeyRange::new(node);
260
261 let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
263
264 let (start_decoded, end_decoded) = FlowNodeInternalStateKeyRange::decode(&encoded_range);
266
267 assert!(start_decoded.is_some());
268 assert_eq!(start_decoded.unwrap().node, node);
269
270 assert!(end_decoded.is_some());
271 assert_eq!(end_decoded.unwrap().node.0, 99);
272 }
273}