reifydb_core/key/
flow_node_internal_state.rs1use std::ops::Bound;
5
6use super::{EncodableKey, EncodableKeyRange, KeyKind};
7use crate::{
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 interface::catalog::flow::FlowNodeId,
10 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct FlowNodeInternalStateKey {
15 pub node: FlowNodeId,
16 pub key: Vec<u8>,
17}
18
19impl EncodableKey for FlowNodeInternalStateKey {
20 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
21
22 fn encode(&self) -> EncodedKey {
23 let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
24 serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
25 serializer.to_encoded_key()
26 }
27
28 fn decode(key: &EncodedKey) -> Option<Self> {
29 let mut de = KeyDeserializer::from_bytes(key.as_slice());
30
31 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
32 if kind != Self::KIND {
33 return None;
34 }
35
36 let node_id = de.read_u64().ok()?;
37 let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
38
39 Some(Self {
40 node: FlowNodeId(node_id),
41 key: key_bytes,
42 })
43 }
44}
45
46impl FlowNodeInternalStateKey {
47 pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
48 Self {
49 node,
50 key,
51 }
52 }
53
54 pub fn new_empty(node: FlowNodeId) -> Self {
55 Self {
56 node,
57 key: Vec::new(),
58 }
59 }
60
61 pub fn encoded(node: impl Into<FlowNodeId>, key: impl Into<Vec<u8>>) -> EncodedKey {
62 Self::new(node.into(), key.into()).encode()
63 }
64
65 pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
66 let range = FlowNodeInternalStateKeyRange::new(node);
67 EncodedKeyRange::start_end(range.start(), range.end())
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct FlowNodeInternalStateKeyRange {
73 pub node: FlowNodeId,
74}
75
76impl FlowNodeInternalStateKeyRange {
77 pub fn new(node: FlowNodeId) -> Self {
78 Self {
79 node,
80 }
81 }
82
83 fn decode_key(key: &EncodedKey) -> Option<Self> {
84 let mut de = KeyDeserializer::from_bytes(key.as_slice());
85
86 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
87 if kind != FlowNodeInternalStateKey::KIND {
88 return None;
89 }
90
91 let node_id = de.read_u64().ok()?;
92
93 Some(Self {
94 node: FlowNodeId(node_id),
95 })
96 }
97}
98
99impl EncodableKeyRange for FlowNodeInternalStateKeyRange {
100 const KIND: KeyKind = KeyKind::FlowNodeInternalState;
101
102 fn start(&self) -> Option<EncodedKey> {
103 let mut serializer = KeySerializer::with_capacity(9);
104 serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0);
105 Some(serializer.to_encoded_key())
106 }
107
108 fn end(&self) -> Option<EncodedKey> {
109 let mut serializer = KeySerializer::with_capacity(9);
110 serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
111 Some(serializer.to_encoded_key())
112 }
113
114 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
115 where
116 Self: Sized,
117 {
118 let start_key = match &range.start {
119 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
120 Bound::Unbounded => None,
121 };
122
123 let end_key = match &range.end {
124 Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
125 Bound::Unbounded => None,
126 };
127
128 (start_key, end_key)
129 }
130}
131
132#[cfg(test)]
133pub mod tests {
134 use super::{EncodableKey, EncodableKeyRange, FlowNodeInternalStateKey, FlowNodeInternalStateKeyRange};
135 use crate::{
136 encoded::key::{EncodedKey, EncodedKeyRange},
137 interface::catalog::flow::FlowNodeId,
138 };
139
140 #[test]
141 fn test_encode_decode() {
142 let key = FlowNodeInternalStateKey {
143 node: FlowNodeId(0xDEADBEEF),
144 key: vec![1, 2, 3, 4],
145 };
146 let encoded = key.encode();
147
148 assert_eq!(encoded[0], 0xE0);
149
150 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
151 assert_eq!(decoded.node.0, 0xDEADBEEF);
152 assert_eq!(decoded.key, vec![1, 2, 3, 4]);
153 }
154
155 #[test]
156 fn test_encode_decode_empty_key() {
157 let key = FlowNodeInternalStateKey {
158 node: FlowNodeId(0xDEADBEEF),
159 key: vec![],
160 };
161 let encoded = key.encode();
162
163 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
164 assert_eq!(decoded.node.0, 0xDEADBEEF);
165 assert_eq!(decoded.key, Vec::<u8>::new());
166 }
167
168 #[test]
169 fn test_new() {
170 let key = FlowNodeInternalStateKey::new(FlowNodeId(42), vec![5, 6, 7]);
171 assert_eq!(key.node.0, 42);
172 assert_eq!(key.key, vec![5, 6, 7]);
173 }
174
175 #[test]
176 fn test_new_empty() {
177 let key = FlowNodeInternalStateKey::new_empty(FlowNodeId(42));
178 assert_eq!(key.node.0, 42);
179 assert_eq!(key.key, Vec::<u8>::new());
180 }
181
182 #[test]
183 fn test_roundtrip() {
184 let original = FlowNodeInternalStateKey {
185 node: FlowNodeId(999_999_999),
186 key: vec![10, 20, 30, 40, 50],
187 };
188 let encoded = original.encode();
189 let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
190 assert_eq!(original, decoded);
191 }
192
193 #[test]
194 fn test_decode_invalid_version() {
195 let mut encoded = Vec::new();
196 encoded.push(0xFF);
197 encoded.push(0xE5);
198 encoded.extend(&999u64.to_be_bytes());
199 let key = EncodedKey::new(encoded);
200 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
201 }
202
203 #[test]
204 fn test_decode_invalid_kind() {
205 let mut encoded = Vec::new();
206 encoded.push(0xFE);
207 encoded.push(0xFF);
208 encoded.extend(&999u64.to_be_bytes());
209 let key = EncodedKey::new(encoded);
210 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
211 }
212
213 #[test]
214 fn test_decode_too_short() {
215 let mut encoded = Vec::new();
216 encoded.push(0xFE);
217 encoded.push(0xE5);
218 encoded.extend(&999u32.to_be_bytes());
219 let key = EncodedKey::new(encoded);
220 assert!(FlowNodeInternalStateKey::decode(&key).is_none());
221 }
222
223 #[test]
224 fn test_flow_node_internal_state_key_range() {
225 let node = FlowNodeId(42);
226 let range = FlowNodeInternalStateKeyRange::new(node);
227
228 let start = range.start().unwrap();
229 let decoded_start = FlowNodeInternalStateKey::decode(&start).unwrap();
230 assert_eq!(decoded_start.node, node);
231 assert_eq!(decoded_start.key, Vec::<u8>::new());
232
233 let end = range.end().unwrap();
234 let decoded_end = FlowNodeInternalStateKey::decode(&end).unwrap();
235 assert_eq!(decoded_end.node.0, 41);
236 assert_eq!(decoded_end.key, Vec::<u8>::new());
237 }
238
239 #[test]
240 fn test_flow_node_internal_state_key_range_decode() {
241 let node = FlowNodeId(100);
242 let range = FlowNodeInternalStateKeyRange::new(node);
243
244 let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
245
246 let (start_decoded, end_decoded) = FlowNodeInternalStateKeyRange::decode(&encoded_range);
247
248 assert!(start_decoded.is_some());
249 assert_eq!(start_decoded.unwrap().node, node);
250
251 assert!(end_decoded.is_some());
252 assert_eq!(end_decoded.unwrap().node.0, 99);
253 }
254}