Skip to main content

reifydb_core/key/
flow_node_state.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::ops::Bound;
5
6use super::{EncodableKey, EncodableKeyRange, KeyKind};
7use crate::{
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	interface::catalog::flow::FlowNodeId,
10	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct FlowNodeStateKey {
15	pub node: FlowNodeId,
16	pub key: Vec<u8>,
17}
18
19const VERSION: u8 = 1;
20
21impl EncodableKey for FlowNodeStateKey {
22	const KIND: KeyKind = KeyKind::FlowNodeState;
23
24	fn encode(&self) -> EncodedKey {
25		let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
26		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
27		serializer.to_encoded_key()
28	}
29
30	fn decode(key: &EncodedKey) -> Option<Self> {
31		let mut de = KeyDeserializer::from_bytes(key.as_slice());
32
33		let version = de.read_u8().ok()?;
34		if version != VERSION {
35			return None;
36		}
37
38		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
39		if kind != Self::KIND {
40			return None;
41		}
42
43		let node_id = de.read_u64().ok()?;
44		let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
45
46		Some(Self {
47			node: FlowNodeId(node_id),
48			key: key_bytes,
49		})
50	}
51}
52
53impl FlowNodeStateKey {
54	pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
55		Self {
56			node,
57			key,
58		}
59	}
60
61	pub fn new_empty(node: FlowNodeId) -> Self {
62		Self {
63			node,
64			key: Vec::new(),
65		}
66	}
67
68	pub fn encoded(node: impl Into<FlowNodeId>, key: impl Into<Vec<u8>>) -> EncodedKey {
69		Self::new(node.into(), key.into()).encode()
70	}
71
72	/// Create a range for scanning all entries of a specific operator
73	pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
74		let range = FlowNodeStateKeyRange::new(node);
75		EncodedKeyRange::start_end(range.start(), range.end())
76	}
77}
78
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct FlowNodeStateKeyRange {
81	pub node: FlowNodeId,
82}
83
84impl FlowNodeStateKeyRange {
85	pub fn new(node: FlowNodeId) -> Self {
86		Self {
87			node,
88		}
89	}
90
91	fn decode_key(key: &EncodedKey) -> Option<Self> {
92		let mut de = KeyDeserializer::from_bytes(key.as_slice());
93
94		let version = de.read_u8().ok()?;
95		if version != VERSION {
96			return None;
97		}
98
99		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
100		if kind != FlowNodeStateKey::KIND {
101			return None;
102		}
103
104		let node_id = de.read_u64().ok()?;
105
106		Some(Self {
107			node: FlowNodeId(node_id),
108		})
109	}
110}
111
112impl EncodableKeyRange for FlowNodeStateKeyRange {
113	const KIND: KeyKind = KeyKind::FlowNodeState;
114
115	fn start(&self) -> Option<EncodedKey> {
116		let mut serializer = KeySerializer::with_capacity(10);
117		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0);
118		Some(serializer.to_encoded_key())
119	}
120
121	fn end(&self) -> Option<EncodedKey> {
122		let mut serializer = KeySerializer::with_capacity(10);
123		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
124		Some(serializer.to_encoded_key())
125	}
126
127	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
128	where
129		Self: Sized,
130	{
131		let start_key = match &range.start {
132			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
133			Bound::Unbounded => None,
134		};
135
136		let end_key = match &range.end {
137			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
138			Bound::Unbounded => None,
139		};
140
141		(start_key, end_key)
142	}
143}
144
145#[cfg(test)]
146pub mod tests {
147	use super::{EncodableKey, EncodableKeyRange, FlowNodeStateKey, FlowNodeStateKeyRange};
148	use crate::{
149		encoded::key::{EncodedKey, EncodedKeyRange},
150		interface::catalog::flow::FlowNodeId,
151	};
152
153	#[test]
154	fn test_encode_decode() {
155		let key = FlowNodeStateKey {
156			node: FlowNodeId(0xDEADBEEF),
157			key: vec![1, 2, 3, 4],
158		};
159		let encoded = key.encode();
160
161		// Verify the encoded format
162		assert_eq!(encoded[0], 0xFE); // version
163		assert_eq!(encoded[1], 0xEC); // kind (0x13 encoded)
164
165		let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
166		assert_eq!(decoded.node.0, 0xDEADBEEF);
167		assert_eq!(decoded.key, vec![1, 2, 3, 4]);
168	}
169
170	#[test]
171	fn test_encode_decode_empty_key() {
172		let key = FlowNodeStateKey {
173			node: FlowNodeId(0xDEADBEEF),
174			key: vec![],
175		};
176		let encoded = key.encode();
177
178		let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
179		assert_eq!(decoded.node.0, 0xDEADBEEF);
180		assert_eq!(decoded.key, Vec::<u8>::new());
181	}
182
183	#[test]
184	fn test_new() {
185		let key = FlowNodeStateKey::new(FlowNodeId(42), vec![5, 6, 7]);
186		assert_eq!(key.node.0, 42);
187		assert_eq!(key.key, vec![5, 6, 7]);
188	}
189
190	#[test]
191	fn test_new_empty() {
192		let key = FlowNodeStateKey::new_empty(FlowNodeId(42));
193		assert_eq!(key.node.0, 42);
194		assert_eq!(key.key, Vec::<u8>::new());
195	}
196
197	#[test]
198	fn test_roundtrip() {
199		let original = FlowNodeStateKey {
200			node: FlowNodeId(999_999_999),
201			key: vec![10, 20, 30, 40, 50],
202		};
203		let encoded = original.encode();
204		let decoded = FlowNodeStateKey::decode(&encoded).unwrap();
205		assert_eq!(original, decoded);
206	}
207
208	#[test]
209	fn test_decode_invalid_version() {
210		let mut encoded = Vec::new();
211		encoded.push(0xFF); // wrong version
212		encoded.push(0xEC); // correct kind
213		encoded.extend(&999u64.to_be_bytes());
214		let key = EncodedKey::new(encoded);
215		assert!(FlowNodeStateKey::decode(&key).is_none());
216	}
217
218	#[test]
219	fn test_decode_invalid_kind() {
220		let mut encoded = Vec::new();
221		encoded.push(0xFE); // correct version
222		encoded.push(0xFF); // wrong kind
223		encoded.extend(&999u64.to_be_bytes());
224		let key = EncodedKey::new(encoded);
225		assert!(FlowNodeStateKey::decode(&key).is_none());
226	}
227
228	#[test]
229	fn test_decode_too_short() {
230		let mut encoded = Vec::new();
231		encoded.push(0xFE); // correct version
232		encoded.push(0xEC); // correct kind
233		encoded.extend(&999u32.to_be_bytes()); // only 4 bytes instead of 8 for operator id
234		let key = EncodedKey::new(encoded);
235		assert!(FlowNodeStateKey::decode(&key).is_none());
236	}
237
238	#[test]
239	fn test_flow_node_state_key_range() {
240		let node = FlowNodeId(42);
241		let range = FlowNodeStateKeyRange::new(node);
242
243		// Test start key
244		let start = range.start().unwrap();
245		let decoded_start = FlowNodeStateKey::decode(&start).unwrap();
246		assert_eq!(decoded_start.node, node);
247		assert_eq!(decoded_start.key, Vec::<u8>::new());
248
249		// Test end key
250		let end = range.end().unwrap();
251		let decoded_end = FlowNodeStateKey::decode(&end).unwrap();
252		assert_eq!(decoded_end.node.0, 41); // Should be operator - 1
253		assert_eq!(decoded_end.key, Vec::<u8>::new());
254	}
255
256	#[test]
257	fn test_flow_node_state_key_range_decode() {
258		let node = FlowNodeId(100);
259		let range = FlowNodeStateKeyRange::new(node);
260
261		// Create an EncodedKeyRange
262		let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
263
264		// Decode it back
265		let (start_decoded, end_decoded) = FlowNodeStateKeyRange::decode(&encoded_range);
266
267		assert!(start_decoded.is_some());
268		assert_eq!(start_decoded.unwrap().node, node);
269
270		assert!(end_decoded.is_some());
271		assert_eq!(end_decoded.unwrap().node.0, 99);
272	}
273
274	#[test]
275	fn test_node_range_method() {
276		let node = FlowNodeId(555);
277		let range = FlowNodeStateKey::node_range(node);
278
279		// The range should include all keys for this operator
280		// Start should be the operator with empty key
281		// End should be the next operator with empty key
282		let (start_range, end_range) = FlowNodeStateKeyRange::decode(&range);
283
284		assert!(start_range.is_some());
285		assert_eq!(start_range.unwrap().node, node);
286
287		assert!(end_range.is_some());
288		assert_eq!(end_range.unwrap().node.0, 554);
289	}
290}