reifydb_core/key/
flow_node_state.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use super::{EncodableKey, EncodableKeyRange, KeyKind};
5use crate::{
6	EncodedKey, EncodedKeyRange,
7	interface::FlowNodeId,
8	util::encoding::keycode::{KeyDeserializer, KeySerializer},
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct FlowNodeStateKey {
13	pub node: FlowNodeId,
14	pub key: Vec<u8>,
15}
16
17const VERSION: u8 = 1;
18
19impl EncodableKey for FlowNodeStateKey {
20	const KIND: KeyKind = KeyKind::FlowNodeState;
21
22	fn encode(&self) -> EncodedKey {
23		let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
24		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
25		serializer.to_encoded_key()
26	}
27
28	fn decode(key: &EncodedKey) -> Option<Self> {
29		let mut de = KeyDeserializer::from_bytes(key.as_slice());
30
31		let version = de.read_u8().ok()?;
32		if version != VERSION {
33			return None;
34		}
35
36		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
37		if kind != Self::KIND {
38			return None;
39		}
40
41		let node_id = de.read_u64().ok()?;
42		let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
43
44		Some(Self {
45			node: FlowNodeId(node_id),
46			key: key_bytes,
47		})
48	}
49}
50
51impl FlowNodeStateKey {
52	pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
53		Self {
54			node,
55			key,
56		}
57	}
58
59	pub fn new_empty(node: FlowNodeId) -> Self {
60		Self {
61			node,
62			key: Vec::new(),
63		}
64	}
65
66	pub fn encoded(node: impl Into<FlowNodeId>, key: impl Into<Vec<u8>>) -> EncodedKey {
67		Self::new(node.into(), key.into()).encode()
68	}
69
70	/// Create a range for scanning all entries of a specific operator
71	pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
72		let range = FlowNodeStateKeyRange::new(node);
73		EncodedKeyRange::start_end(range.start(), range.end())
74	}
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct FlowNodeStateKeyRange {
79	pub node: FlowNodeId,
80}
81
82impl FlowNodeStateKeyRange {
83	pub fn new(node: FlowNodeId) -> Self {
84		Self {
85			node,
86		}
87	}
88
89	fn decode_key(key: &EncodedKey) -> Option<Self> {
90		let mut de = KeyDeserializer::from_bytes(key.as_slice());
91
92		let version = de.read_u8().ok()?;
93		if version != VERSION {
94			return None;
95		}
96
97		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
98		if kind != FlowNodeStateKey::KIND {
99			return None;
100		}
101
102		let node_id = de.read_u64().ok()?;
103
104		Some(Self {
105			node: FlowNodeId(node_id),
106		})
107	}
108}
109
110impl EncodableKeyRange for FlowNodeStateKeyRange {
111	const KIND: KeyKind = KeyKind::FlowNodeState;
112
113	fn start(&self) -> Option<EncodedKey> {
114		let mut serializer = KeySerializer::with_capacity(10);
115		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0);
116		Some(serializer.to_encoded_key())
117	}
118
119	fn end(&self) -> Option<EncodedKey> {
120		let mut serializer = KeySerializer::with_capacity(10);
121		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
122		Some(serializer.to_encoded_key())
123	}
124
125	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
126	where
127		Self: Sized,
128	{
129		use std::ops::Bound;
130
131		let start_key = match &range.start {
132			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
133			Bound::Unbounded => None,
134		};
135
136		let end_key = match &range.end {
137			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
138			Bound::Unbounded => None,
139		};
140
141		(start_key, end_key)
142	}
143}
144
145#[cfg(test)]
146mod tests {
147	use super::{EncodableKey, EncodableKeyRange, FlowNodeStateKey, FlowNodeStateKeyRange};
148	use crate::{EncodedKey, EncodedKeyRange};
149
150	#[test]
151	fn test_encode_decode() {
152		let key = FlowNodeStateKey {
153			node: crate::interface::FlowNodeId(0xDEADBEEF),
154			key: vec![1, 2, 3, 4],
155		};
156		let encoded = key.encode();
157
158		// Verify the encoded format
159		assert_eq!(encoded[0], 0xFE); // version
160		assert_eq!(encoded[1], 0xEC); // kind (0x13 encoded)
161
162		let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
163		assert_eq!(decoded.node.0, 0xDEADBEEF);
164		assert_eq!(decoded.key, vec![1, 2, 3, 4]);
165	}
166
167	#[test]
168	fn test_encode_decode_empty_key() {
169		let key = FlowNodeStateKey {
170			node: crate::interface::FlowNodeId(0xDEADBEEF),
171			key: vec![],
172		};
173		let encoded = key.encode();
174
175		let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
176		assert_eq!(decoded.node.0, 0xDEADBEEF);
177		assert_eq!(decoded.key, Vec::<u8>::new());
178	}
179
180	#[test]
181	fn test_new() {
182		let key = FlowNodeStateKey::new(crate::interface::FlowNodeId(42), vec![5, 6, 7]);
183		assert_eq!(key.node.0, 42);
184		assert_eq!(key.key, vec![5, 6, 7]);
185	}
186
187	#[test]
188	fn test_new_empty() {
189		let key = FlowNodeStateKey::new_empty(crate::interface::FlowNodeId(42));
190		assert_eq!(key.node.0, 42);
191		assert_eq!(key.key, Vec::<u8>::new());
192	}
193
194	#[test]
195	fn test_roundtrip() {
196		let original = FlowNodeStateKey {
197			node: crate::interface::FlowNodeId(999_999_999),
198			key: vec![10, 20, 30, 40, 50],
199		};
200		let encoded = original.encode();
201		let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
202		assert_eq!(original, decoded);
203	}
204
205	#[test]
206	fn test_decode_invalid_version() {
207		let mut encoded = Vec::new();
208		encoded.push(0xFF); // wrong version
209		encoded.push(0xEC); // correct kind
210		encoded.extend(&999u64.to_be_bytes());
211		let key = EncodedKey::new(encoded);
212		assert!(FlowNodeStateKey::decode(&key).is_none());
213	}
214
215	#[test]
216	fn test_decode_invalid_kind() {
217		let mut encoded = Vec::new();
218		encoded.push(0xFE); // correct version
219		encoded.push(0xFF); // wrong kind
220		encoded.extend(&999u64.to_be_bytes());
221		let key = EncodedKey::new(encoded);
222		assert!(FlowNodeStateKey::decode(&key).is_none());
223	}
224
225	#[test]
226	fn test_decode_too_short() {
227		let mut encoded = Vec::new();
228		encoded.push(0xFE); // correct version
229		encoded.push(0xEC); // correct kind
230		encoded.extend(&999u32.to_be_bytes()); // only 4 bytes instead of 8 for operator id
231		let key = EncodedKey::new(encoded);
232		assert!(FlowNodeStateKey::decode(&key).is_none());
233	}
234
235	#[test]
236	fn test_flow_node_state_key_range() {
237		let node = crate::interface::FlowNodeId(42);
238		let range = FlowNodeStateKeyRange::new(node);
239
240		// Test start key
241		let start = range.start().unwrap();
242		let decoded_start = FlowNodeStateKey::decode(&start).unwrap();
243		assert_eq!(decoded_start.node, node);
244		assert_eq!(decoded_start.key, Vec::<u8>::new());
245
246		// Test end key
247		let end = range.end().unwrap();
248		let decoded_end = FlowNodeStateKey::decode(&end).unwrap();
249		assert_eq!(decoded_end.node.0, 41); // Should be operator - 1
250		assert_eq!(decoded_end.key, Vec::<u8>::new());
251	}
252
253	#[test]
254	fn test_flow_node_state_key_range_decode() {
255		let node = crate::interface::FlowNodeId(100);
256		let range = FlowNodeStateKeyRange::new(node);
257
258		// Create an EncodedKeyRange
259		let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
260
261		// Decode it back
262		let (start_decoded, end_decoded) = FlowNodeStateKeyRange::decode(&encoded_range);
263
264		assert!(start_decoded.is_some());
265		assert_eq!(start_decoded.unwrap().node, node);
266
267		assert!(end_decoded.is_some());
268		assert_eq!(end_decoded.unwrap().node.0, 99);
269	}
270
271	#[test]
272	fn test_node_range_method() {
273		let node = crate::interface::FlowNodeId(555);
274		let range = FlowNodeStateKey::node_range(node);
275
276		// The range should include all keys for this operator
277		// Start should be the operator with empty key
278		// End should be the next operator with empty key
279		let (start_range, end_range) = FlowNodeStateKeyRange::decode(&range);
280
281		assert!(start_range.is_some());
282		assert_eq!(start_range.unwrap().node, node);
283
284		assert!(end_range.is_some());
285		assert_eq!(end_range.unwrap().node.0, 554);
286	}
287}