reifydb_core/key/
flow_node_internal_state.rs1use super::{EncodableKey, EncodableKeyRange, KeyKind};
5use crate::{
6 EncodedKey, EncodedKeyRange,
7 interface::FlowNodeId,
8 util::encoding::keycode::{KeyDeserializer, KeySerializer},
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct FlowNodeInternalStateKey {
13 pub node: FlowNodeId,
14 pub key: Vec<u8>,
15}
16
17const VERSION: u8 = 1;
18
19impl EncodableKey for FlowNodeInternalStateKey {
20 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
21
22 fn encode(&self) -> EncodedKey {
23 let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
24 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
25 serializer.to_encoded_key()
26 }
27
28 fn decode(key: &EncodedKey) -> Option<Self> {
29 let mut de = KeyDeserializer::from_bytes(key.as_slice());
30
31 let version = de.read_u8().ok()?;
32 if version != VERSION {
33 return None;
34 }
35
36 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
37 if kind != Self::KIND {
38 return None;
39 }
40
41 let node_id = de.read_u64().ok()?;
42 let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
43
44 Some(Self {
45 node: FlowNodeId(node_id),
46 key: key_bytes,
47 })
48 }
49}
50
51impl FlowNodeInternalStateKey {
52 pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
53 Self {
54 node,
55 key,
56 }
57 }
58
59 pub fn new_empty(node: FlowNodeId) -> Self {
60 Self {
61 node,
62 key: Vec::new(),
63 }
64 }
65
66 pub fn encoded(node: impl Into<FlowNodeId>, key: impl Into<Vec<u8>>) -> EncodedKey {
67 Self::new(node.into(), key.into()).encode()
68 }
69
70 pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
72 let range = FlowNodeInternalStateKeyRange::new(node);
73 EncodedKeyRange::start_end(range.start(), range.end())
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct FlowNodeInternalStateKeyRange {
79 pub node: FlowNodeId,
80}
81
82impl FlowNodeInternalStateKeyRange {
83 pub fn new(node: FlowNodeId) -> Self {
84 Self {
85 node,
86 }
87 }
88
89 fn decode_key(key: &EncodedKey) -> Option<Self> {
90 let mut de = KeyDeserializer::from_bytes(key.as_slice());
91
92 let version = de.read_u8().ok()?;
93 if version != VERSION {
94 return None;
95 }
96
97 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
98 if kind != FlowNodeInternalStateKey::KIND {
99 return None;
100 }
101
102 let node_id = de.read_u64().ok()?;
103
104 Some(Self {
105 node: FlowNodeId(node_id),
106 })
107 }
108}
109
110impl EncodableKeyRange for FlowNodeInternalStateKeyRange {
111 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
112
113 fn start(&self) -> Option<EncodedKey> {
114 let mut serializer = KeySerializer::with_capacity(10);
115 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0);
116 Some(serializer.to_encoded_key())
117 }
118
119 fn end(&self) -> Option<EncodedKey> {
120 let mut serializer = KeySerializer::with_capacity(10);
121 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
122 Some(serializer.to_encoded_key())
123 }
124
125 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
126 where
127 Self: Sized,
128 {
129 use std::ops::Bound;
130
131 let start_key = match &range.start {
132 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
133 Bound::Unbounded => None,
134 };
135
136 let end_key = match &range.end {
137 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
138 Bound::Unbounded => None,
139 };
140
141 (start_key, end_key)
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::{EncodableKey, EncodableKeyRange, FlowNodeInternalStateKey, FlowNodeInternalStateKeyRange};
148 use crate::{EncodedKey, EncodedKeyRange};
149
150 #[test]
151 fn test_encode_decode() {
152 let key = FlowNodeInternalStateKey {
153 node: crate::interface::FlowNodeId(0xDEADBEEF),
154 key: vec![1, 2, 3, 4],
155 };
156 let encoded = key.encode();
157
158 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE0); let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
163 assert_eq!(decoded.node.0, 0xDEADBEEF);
164 assert_eq!(decoded.key, vec![1, 2, 3, 4]);
165 }
166
167 #[test]
168 fn test_encode_decode_empty_key() {
169 let key = FlowNodeInternalStateKey {
170 node: crate::interface::FlowNodeId(0xDEADBEEF),
171 key: vec![],
172 };
173 let encoded = key.encode();
174
175 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
176 assert_eq!(decoded.node.0, 0xDEADBEEF);
177 assert_eq!(decoded.key, Vec::<u8>::new());
178 }
179
180 #[test]
181 fn test_new() {
182 let key = FlowNodeInternalStateKey::new(crate::interface::FlowNodeId(42), vec![5, 6, 7]);
183 assert_eq!(key.node.0, 42);
184 assert_eq!(key.key, vec![5, 6, 7]);
185 }
186
187 #[test]
188 fn test_new_empty() {
189 let key = FlowNodeInternalStateKey::new_empty(crate::interface::FlowNodeId(42));
190 assert_eq!(key.node.0, 42);
191 assert_eq!(key.key, Vec::<u8>::new());
192 }
193
194 #[test]
195 fn test_roundtrip() {
196 let original = FlowNodeInternalStateKey {
197 node: crate::interface::FlowNodeId(999_999_999),
198 key: vec![10, 20, 30, 40, 50],
199 };
200 let encoded = original.encode();
201 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
202 assert_eq!(original, decoded);
203 }
204
205 #[test]
206 fn test_decode_invalid_version() {
207 let mut encoded = Vec::new();
208 encoded.push(0xFF); encoded.push(0xE5); encoded.extend(&999u64.to_be_bytes());
211 let key = EncodedKey::new(encoded);
212 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
213 }
214
215 #[test]
216 fn test_decode_invalid_kind() {
217 let mut encoded = Vec::new();
218 encoded.push(0xFE); encoded.push(0xFF); encoded.extend(&999u64.to_be_bytes());
221 let key = EncodedKey::new(encoded);
222 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
223 }
224
225 #[test]
226 fn test_decode_too_short() {
227 let mut encoded = Vec::new();
228 encoded.push(0xFE); encoded.push(0xE5); encoded.extend(&999u32.to_be_bytes()); let key = EncodedKey::new(encoded);
232 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
233 }
234
235 #[test]
236 fn test_flow_node_internal_state_key_range() {
237 let node = crate::interface::FlowNodeId(42);
238 let range = FlowNodeInternalStateKeyRange::new(node);
239
240 let start = range.start().unwrap();
242 let decoded_start = FlowNodeInternalStateKey::decode(&start).unwrap();
243 assert_eq!(decoded_start.node, node);
244 assert_eq!(decoded_start.key, Vec::<u8>::new());
245
246 let end = range.end().unwrap();
248 let decoded_end = FlowNodeInternalStateKey::decode(&end).unwrap();
249 assert_eq!(decoded_end.node.0, 41); assert_eq!(decoded_end.key, Vec::<u8>::new());
251 }
252
253 #[test]
254 fn test_flow_node_internal_state_key_range_decode() {
255 let node = crate::interface::FlowNodeId(100);
256 let range = FlowNodeInternalStateKeyRange::new(node);
257
258 let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
260
261 let (start_decoded, end_decoded) = FlowNodeInternalStateKeyRange::decode(&encoded_range);
263
264 assert!(start_decoded.is_some());
265 assert_eq!(start_decoded.unwrap().node, node);
266
267 assert!(end_decoded.is_some());
268 assert_eq!(end_decoded.unwrap().node.0, 99);
269 }
270}