reifydb_core/key/
flow_node_state.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use super::{EncodableKey, EncodableKeyRange, KeyKind};
5use crate::{
6	EncodedKey, EncodedKeyRange,
7	interface::FlowNodeId,
8	util::encoding::keycode::{KeyDeserializer, KeySerializer},
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct FlowNodeStateKey {
13	pub node: FlowNodeId,
14	pub key: Vec<u8>,
15}
16
17const VERSION: u8 = 1;
18
19impl EncodableKey for FlowNodeStateKey {
20	const KIND: KeyKind = KeyKind::FlowNodeState;
21
22	fn encode(&self) -> EncodedKey {
23		let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
24		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
25		serializer.to_encoded_key()
26	}
27
28	fn decode(key: &EncodedKey) -> Option<Self> {
29		let mut de = KeyDeserializer::from_bytes(key.as_slice());
30
31		let version = de.read_u8().ok()?;
32		if version != VERSION {
33			return None;
34		}
35
36		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
37		if kind != Self::KIND {
38			return None;
39		}
40
41		let node_id = de.read_u64().ok()?;
42		let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
43
44		Some(Self {
45			node: FlowNodeId(node_id),
46			key: key_bytes,
47		})
48	}
49}
50
51impl FlowNodeStateKey {
52	pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
53		Self {
54			node,
55			key,
56		}
57	}
58
59	pub fn new_empty(node: FlowNodeId) -> Self {
60		Self {
61			node,
62			key: Vec::new(),
63		}
64	}
65
66	/// Create a range for scanning all entries of a specific operator
67	pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
68		let range = FlowNodeStateKeyRange::new(node);
69		EncodedKeyRange::start_end(range.start(), range.end())
70	}
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct FlowNodeStateKeyRange {
75	pub node: FlowNodeId,
76}
77
78impl FlowNodeStateKeyRange {
79	pub fn new(node: FlowNodeId) -> Self {
80		Self {
81			node,
82		}
83	}
84
85	fn decode_key(key: &EncodedKey) -> Option<Self> {
86		let mut de = KeyDeserializer::from_bytes(key.as_slice());
87
88		let version = de.read_u8().ok()?;
89		if version != VERSION {
90			return None;
91		}
92
93		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
94		if kind != FlowNodeStateKey::KIND {
95			return None;
96		}
97
98		let node_id = de.read_u64().ok()?;
99
100		Some(Self {
101			node: FlowNodeId(node_id),
102		})
103	}
104}
105
106impl EncodableKeyRange for FlowNodeStateKeyRange {
107	const KIND: KeyKind = KeyKind::FlowNodeState;
108
109	fn start(&self) -> Option<EncodedKey> {
110		let mut serializer = KeySerializer::with_capacity(10);
111		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0);
112		Some(serializer.to_encoded_key())
113	}
114
115	fn end(&self) -> Option<EncodedKey> {
116		let mut serializer = KeySerializer::with_capacity(10);
117		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
118		Some(serializer.to_encoded_key())
119	}
120
121	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
122	where
123		Self: Sized,
124	{
125		use std::ops::Bound;
126
127		let start_key = match &range.start {
128			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
129			Bound::Unbounded => None,
130		};
131
132		let end_key = match &range.end {
133			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
134			Bound::Unbounded => None,
135		};
136
137		(start_key, end_key)
138	}
139}
140
141#[cfg(test)]
142mod tests {
143	use super::{EncodableKey, EncodableKeyRange, FlowNodeStateKey, FlowNodeStateKeyRange};
144	use crate::{EncodedKey, EncodedKeyRange};
145
146	#[test]
147	fn test_encode_decode() {
148		let key = FlowNodeStateKey {
149			node: crate::interface::FlowNodeId(0xDEADBEEF),
150			key: vec![1, 2, 3, 4],
151		};
152		let encoded = key.encode();
153
154		// Verify the encoded format
155		assert_eq!(encoded[0], 0xFE); // version
156		assert_eq!(encoded[1], 0xEC); // kind (0x13 encoded)
157
158		let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
159		assert_eq!(decoded.node.0, 0xDEADBEEF);
160		assert_eq!(decoded.key, vec![1, 2, 3, 4]);
161	}
162
163	#[test]
164	fn test_encode_decode_empty_key() {
165		let key = FlowNodeStateKey {
166			node: crate::interface::FlowNodeId(0xDEADBEEF),
167			key: vec![],
168		};
169		let encoded = key.encode();
170
171		let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
172		assert_eq!(decoded.node.0, 0xDEADBEEF);
173		assert_eq!(decoded.key, Vec::<u8>::new());
174	}
175
176	#[test]
177	fn test_new() {
178		let key = FlowNodeStateKey::new(crate::interface::FlowNodeId(42), vec![5, 6, 7]);
179		assert_eq!(key.node.0, 42);
180		assert_eq!(key.key, vec![5, 6, 7]);
181	}
182
183	#[test]
184	fn test_new_empty() {
185		let key = FlowNodeStateKey::new_empty(crate::interface::FlowNodeId(42));
186		assert_eq!(key.node.0, 42);
187		assert_eq!(key.key, Vec::<u8>::new());
188	}
189
190	#[test]
191	fn test_roundtrip() {
192		let original = FlowNodeStateKey {
193			node: crate::interface::FlowNodeId(999_999_999),
194			key: vec![10, 20, 30, 40, 50],
195		};
196		let encoded = original.encode();
197		let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
198		assert_eq!(original, decoded);
199	}
200
201	#[test]
202	fn test_decode_invalid_version() {
203		let mut encoded = Vec::new();
204		encoded.push(0xFF); // wrong version
205		encoded.push(0xEC); // correct kind
206		encoded.extend(&999u64.to_be_bytes());
207		let key = EncodedKey::new(encoded);
208		assert!(FlowNodeStateKey::decode(&key).is_none());
209	}
210
211	#[test]
212	fn test_decode_invalid_kind() {
213		let mut encoded = Vec::new();
214		encoded.push(0xFE); // correct version
215		encoded.push(0xFF); // wrong kind
216		encoded.extend(&999u64.to_be_bytes());
217		let key = EncodedKey::new(encoded);
218		assert!(FlowNodeStateKey::decode(&key).is_none());
219	}
220
221	#[test]
222	fn test_decode_too_short() {
223		let mut encoded = Vec::new();
224		encoded.push(0xFE); // correct version
225		encoded.push(0xEC); // correct kind
226		encoded.extend(&999u32.to_be_bytes()); // only 4 bytes instead of 8 for operator id
227		let key = EncodedKey::new(encoded);
228		assert!(FlowNodeStateKey::decode(&key).is_none());
229	}
230
231	#[test]
232	fn test_flow_node_state_key_range() {
233		let node = crate::interface::FlowNodeId(42);
234		let range = FlowNodeStateKeyRange::new(node);
235
236		// Test start key
237		let start = range.start().unwrap();
238		let decoded_start = FlowNodeStateKey::decode(&start).unwrap();
239		assert_eq!(decoded_start.node, node);
240		assert_eq!(decoded_start.key, Vec::<u8>::new());
241
242		// Test end key
243		let end = range.end().unwrap();
244		let decoded_end = FlowNodeStateKey::decode(&end).unwrap();
245		assert_eq!(decoded_end.node.0, 41); // Should be operator - 1
246		assert_eq!(decoded_end.key, Vec::<u8>::new());
247	}
248
249	#[test]
250	fn test_flow_node_state_key_range_decode() {
251		let node = crate::interface::FlowNodeId(100);
252		let range = FlowNodeStateKeyRange::new(node);
253
254		// Create an EncodedKeyRange
255		let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
256
257		// Decode it back
258		let (start_decoded, end_decoded) = FlowNodeStateKeyRange::decode(&encoded_range);
259
260		assert!(start_decoded.is_some());
261		assert_eq!(start_decoded.unwrap().node, node);
262
263		assert!(end_decoded.is_some());
264		assert_eq!(end_decoded.unwrap().node.0, 99);
265	}
266
267	#[test]
268	fn test_node_range_method() {
269		let node = crate::interface::FlowNodeId(555);
270		let range = FlowNodeStateKey::node_range(node);
271
272		// The range should include all keys for this operator
273		// Start should be the operator with empty key
274		// End should be the next operator with empty key
275		let (start_range, end_range) = FlowNodeStateKeyRange::decode(&range);
276
277		assert!(start_range.is_some());
278		assert_eq!(start_range.unwrap().node, node);
279
280		assert!(end_range.is_some());
281		assert_eq!(end_range.unwrap().node.0, 554);
282	}
283}