reifydb_core/key/
flow_node_state.rs1use super::{EncodableKey, EncodableKeyRange, KeyKind};
5use crate::{
6 EncodedKey, EncodedKeyRange,
7 interface::FlowNodeId,
8 util::encoding::keycode::{KeyDeserializer, KeySerializer},
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct FlowNodeStateKey {
13 pub node: FlowNodeId,
14 pub key: Vec<u8>,
15}
16
17const VERSION: u8 = 1;
18
19impl EncodableKey for FlowNodeStateKey {
20 const KIND: KeyKind = KeyKind::FlowNodeState;
21
22 fn encode(&self) -> EncodedKey {
23 let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
24 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
25 serializer.to_encoded_key()
26 }
27
28 fn decode(key: &EncodedKey) -> Option<Self> {
29 let mut de = KeyDeserializer::from_bytes(key.as_slice());
30
31 let version = de.read_u8().ok()?;
32 if version != VERSION {
33 return None;
34 }
35
36 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
37 if kind != Self::KIND {
38 return None;
39 }
40
41 let node_id = de.read_u64().ok()?;
42 let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
43
44 Some(Self {
45 node: FlowNodeId(node_id),
46 key: key_bytes,
47 })
48 }
49}
50
51impl FlowNodeStateKey {
52 pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
53 Self {
54 node,
55 key,
56 }
57 }
58
59 pub fn new_empty(node: FlowNodeId) -> Self {
60 Self {
61 node,
62 key: Vec::new(),
63 }
64 }
65
66 pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
68 let range = FlowNodeStateKeyRange::new(node);
69 EncodedKeyRange::start_end(range.start(), range.end())
70 }
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct FlowNodeStateKeyRange {
75 pub node: FlowNodeId,
76}
77
78impl FlowNodeStateKeyRange {
79 pub fn new(node: FlowNodeId) -> Self {
80 Self {
81 node,
82 }
83 }
84
85 fn decode_key(key: &EncodedKey) -> Option<Self> {
86 let mut de = KeyDeserializer::from_bytes(key.as_slice());
87
88 let version = de.read_u8().ok()?;
89 if version != VERSION {
90 return None;
91 }
92
93 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
94 if kind != FlowNodeStateKey::KIND {
95 return None;
96 }
97
98 let node_id = de.read_u64().ok()?;
99
100 Some(Self {
101 node: FlowNodeId(node_id),
102 })
103 }
104}
105
106impl EncodableKeyRange for FlowNodeStateKeyRange {
107 const KIND: KeyKind = KeyKind::FlowNodeState;
108
109 fn start(&self) -> Option<EncodedKey> {
110 let mut serializer = KeySerializer::with_capacity(10);
111 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0);
112 Some(serializer.to_encoded_key())
113 }
114
115 fn end(&self) -> Option<EncodedKey> {
116 let mut serializer = KeySerializer::with_capacity(10);
117 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
118 Some(serializer.to_encoded_key())
119 }
120
121 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
122 where
123 Self: Sized,
124 {
125 use std::ops::Bound;
126
127 let start_key = match &range.start {
128 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
129 Bound::Unbounded => None,
130 };
131
132 let end_key = match &range.end {
133 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
134 Bound::Unbounded => None,
135 };
136
137 (start_key, end_key)
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::{EncodableKey, EncodableKeyRange, FlowNodeStateKey, FlowNodeStateKeyRange};
144 use crate::{EncodedKey, EncodedKeyRange};
145
146 #[test]
147 fn test_encode_decode() {
148 let key = FlowNodeStateKey {
149 node: crate::interface::FlowNodeId(0xDEADBEEF),
150 key: vec![1, 2, 3, 4],
151 };
152 let encoded = key.encode();
153
154 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xEC); let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
159 assert_eq!(decoded.node.0, 0xDEADBEEF);
160 assert_eq!(decoded.key, vec![1, 2, 3, 4]);
161 }
162
163 #[test]
164 fn test_encode_decode_empty_key() {
165 let key = FlowNodeStateKey {
166 node: crate::interface::FlowNodeId(0xDEADBEEF),
167 key: vec![],
168 };
169 let encoded = key.encode();
170
171 let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
172 assert_eq!(decoded.node.0, 0xDEADBEEF);
173 assert_eq!(decoded.key, Vec::<u8>::new());
174 }
175
176 #[test]
177 fn test_new() {
178 let key = FlowNodeStateKey::new(crate::interface::FlowNodeId(42), vec![5, 6, 7]);
179 assert_eq!(key.node.0, 42);
180 assert_eq!(key.key, vec![5, 6, 7]);
181 }
182
183 #[test]
184 fn test_new_empty() {
185 let key = FlowNodeStateKey::new_empty(crate::interface::FlowNodeId(42));
186 assert_eq!(key.node.0, 42);
187 assert_eq!(key.key, Vec::<u8>::new());
188 }
189
190 #[test]
191 fn test_roundtrip() {
192 let original = FlowNodeStateKey {
193 node: crate::interface::FlowNodeId(999_999_999),
194 key: vec![10, 20, 30, 40, 50],
195 };
196 let encoded = original.encode();
197 let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
198 assert_eq!(original, decoded);
199 }
200
201 #[test]
202 fn test_decode_invalid_version() {
203 let mut encoded = Vec::new();
204 encoded.push(0xFF); encoded.push(0xEC); encoded.extend(&999u64.to_be_bytes());
207 let key = EncodedKey::new(encoded);
208 assert!(FlowNodeStateKey::decode(&key).is_none());
209 }
210
211 #[test]
212 fn test_decode_invalid_kind() {
213 let mut encoded = Vec::new();
214 encoded.push(0xFE); encoded.push(0xFF); encoded.extend(&999u64.to_be_bytes());
217 let key = EncodedKey::new(encoded);
218 assert!(FlowNodeStateKey::decode(&key).is_none());
219 }
220
221 #[test]
222 fn test_decode_too_short() {
223 let mut encoded = Vec::new();
224 encoded.push(0xFE); encoded.push(0xEC); encoded.extend(&999u32.to_be_bytes()); let key = EncodedKey::new(encoded);
228 assert!(FlowNodeStateKey::decode(&key).is_none());
229 }
230
231 #[test]
232 fn test_flow_node_state_key_range() {
233 let node = crate::interface::FlowNodeId(42);
234 let range = FlowNodeStateKeyRange::new(node);
235
236 let start = range.start().unwrap();
238 let decoded_start = FlowNodeStateKey::decode(&start).unwrap();
239 assert_eq!(decoded_start.node, node);
240 assert_eq!(decoded_start.key, Vec::<u8>::new());
241
242 let end = range.end().unwrap();
244 let decoded_end = FlowNodeStateKey::decode(&end).unwrap();
245 assert_eq!(decoded_end.node.0, 41); assert_eq!(decoded_end.key, Vec::<u8>::new());
247 }
248
249 #[test]
250 fn test_flow_node_state_key_range_decode() {
251 let node = crate::interface::FlowNodeId(100);
252 let range = FlowNodeStateKeyRange::new(node);
253
254 let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
256
257 let (start_decoded, end_decoded) = FlowNodeStateKeyRange::decode(&encoded_range);
259
260 assert!(start_decoded.is_some());
261 assert_eq!(start_decoded.unwrap().node, node);
262
263 assert!(end_decoded.is_some());
264 assert_eq!(end_decoded.unwrap().node.0, 99);
265 }
266
267 #[test]
268 fn test_node_range_method() {
269 let node = crate::interface::FlowNodeId(555);
270 let range = FlowNodeStateKey::node_range(node);
271
272 let (start_range, end_range) = FlowNodeStateKeyRange::decode(&range);
276
277 assert!(start_range.is_some());
278 assert_eq!(start_range.unwrap().node, node);
279
280 assert!(end_range.is_some());
281 assert_eq!(end_range.unwrap().node.0, 554);
282 }
283}