Skip to main content

reifydb_core/key/
flow_node_internal_state.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::ops::Bound;
5
6use super::{EncodableKey, EncodableKeyRange, KeyKind};
7use crate::{
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	interface::catalog::flow::FlowNodeId,
10	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct FlowNodeInternalStateKey {
15	pub node: FlowNodeId,
16	pub key: Vec<u8>,
17}
18
19impl EncodableKey for FlowNodeInternalStateKey {
20	const KIND: KeyKind = KeyKind::FlowNodeInternalState;
21
22	fn encode(&self) -> EncodedKey {
23		let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
24		serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
25		serializer.to_encoded_key()
26	}
27
28	fn decode(key: &EncodedKey) -> Option<Self> {
29		let mut de = KeyDeserializer::from_bytes(key.as_slice());
30
31		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
32		if kind != Self::KIND {
33			return None;
34		}
35
36		let node_id = de.read_u64().ok()?;
37		let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
38
39		Some(Self {
40			node: FlowNodeId(node_id),
41			key: key_bytes,
42		})
43	}
44}
45
46impl FlowNodeInternalStateKey {
47	pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
48		Self {
49			node,
50			key,
51		}
52	}
53
54	pub fn new_empty(node: FlowNodeId) -> Self {
55		Self {
56			node,
57			key: Vec::new(),
58		}
59	}
60
61	pub fn encoded(node: impl Into<FlowNodeId>, key: impl Into<Vec<u8>>) -> EncodedKey {
62		Self::new(node.into(), key.into()).encode()
63	}
64
65	pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
66		let range = FlowNodeInternalStateKeyRange::new(node);
67		EncodedKeyRange::start_end(range.start(), range.end())
68	}
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct FlowNodeInternalStateKeyRange {
73	pub node: FlowNodeId,
74}
75
76impl FlowNodeInternalStateKeyRange {
77	pub fn new(node: FlowNodeId) -> Self {
78		Self {
79			node,
80		}
81	}
82
83	fn decode_key(key: &EncodedKey) -> Option<Self> {
84		let mut de = KeyDeserializer::from_bytes(key.as_slice());
85
86		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
87		if kind != FlowNodeInternalStateKey::KIND {
88			return None;
89		}
90
91		let node_id = de.read_u64().ok()?;
92
93		Some(Self {
94			node: FlowNodeId(node_id),
95		})
96	}
97}
98
99impl EncodableKeyRange for FlowNodeInternalStateKeyRange {
100	const KIND: KeyKind = KeyKind::FlowNodeInternalState;
101
102	fn start(&self) -> Option<EncodedKey> {
103		let mut serializer = KeySerializer::with_capacity(9);
104		serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0);
105		Some(serializer.to_encoded_key())
106	}
107
108	fn end(&self) -> Option<EncodedKey> {
109		let mut serializer = KeySerializer::with_capacity(9);
110		serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
111		Some(serializer.to_encoded_key())
112	}
113
114	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
115	where
116		Self: Sized,
117	{
118		let start_key = match &range.start {
119			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
120			Bound::Unbounded => None,
121		};
122
123		let end_key = match &range.end {
124			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
125			Bound::Unbounded => None,
126		};
127
128		(start_key, end_key)
129	}
130}
131
132#[cfg(test)]
133pub mod tests {
134	use super::{EncodableKey, EncodableKeyRange, FlowNodeInternalStateKey, FlowNodeInternalStateKeyRange};
135	use crate::{
136		encoded::key::{EncodedKey, EncodedKeyRange},
137		interface::catalog::flow::FlowNodeId,
138	};
139
140	#[test]
141	fn test_encode_decode() {
142		let key = FlowNodeInternalStateKey {
143			node: FlowNodeId(0xDEADBEEF),
144			key: vec![1, 2, 3, 4],
145		};
146		let encoded = key.encode();
147
148		assert_eq!(encoded[0], 0xE0);
149
150		let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
151		assert_eq!(decoded.node.0, 0xDEADBEEF);
152		assert_eq!(decoded.key, vec![1, 2, 3, 4]);
153	}
154
155	#[test]
156	fn test_encode_decode_empty_key() {
157		let key = FlowNodeInternalStateKey {
158			node: FlowNodeId(0xDEADBEEF),
159			key: vec![],
160		};
161		let encoded = key.encode();
162
163		let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
164		assert_eq!(decoded.node.0, 0xDEADBEEF);
165		assert_eq!(decoded.key, Vec::<u8>::new());
166	}
167
168	#[test]
169	fn test_new() {
170		let key = FlowNodeInternalStateKey::new(FlowNodeId(42), vec![5, 6, 7]);
171		assert_eq!(key.node.0, 42);
172		assert_eq!(key.key, vec![5, 6, 7]);
173	}
174
175	#[test]
176	fn test_new_empty() {
177		let key = FlowNodeInternalStateKey::new_empty(FlowNodeId(42));
178		assert_eq!(key.node.0, 42);
179		assert_eq!(key.key, Vec::<u8>::new());
180	}
181
182	#[test]
183	fn test_roundtrip() {
184		let original = FlowNodeInternalStateKey {
185			node: FlowNodeId(999_999_999),
186			key: vec![10, 20, 30, 40, 50],
187		};
188		let encoded = original.encode();
189		let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
190		assert_eq!(original, decoded);
191	}
192
193	#[test]
194	fn test_decode_invalid_version() {
195		let mut encoded = Vec::new();
196		encoded.push(0xFF);
197		encoded.push(0xE5);
198		encoded.extend(&999u64.to_be_bytes());
199		let key = EncodedKey::new(encoded);
200		assert!(FlowNodeInternalStateKey::decode(&key).is_none());
201	}
202
203	#[test]
204	fn test_decode_invalid_kind() {
205		let mut encoded = Vec::new();
206		encoded.push(0xFE);
207		encoded.push(0xFF);
208		encoded.extend(&999u64.to_be_bytes());
209		let key = EncodedKey::new(encoded);
210		assert!(FlowNodeInternalStateKey::decode(&key).is_none());
211	}
212
213	#[test]
214	fn test_decode_too_short() {
215		let mut encoded = Vec::new();
216		encoded.push(0xFE);
217		encoded.push(0xE5);
218		encoded.extend(&999u32.to_be_bytes());
219		let key = EncodedKey::new(encoded);
220		assert!(FlowNodeInternalStateKey::decode(&key).is_none());
221	}
222
223	#[test]
224	fn test_flow_node_internal_state_key_range() {
225		let node = FlowNodeId(42);
226		let range = FlowNodeInternalStateKeyRange::new(node);
227
228		let start = range.start().unwrap();
229		let decoded_start = FlowNodeInternalStateKey::decode(&start).unwrap();
230		assert_eq!(decoded_start.node, node);
231		assert_eq!(decoded_start.key, Vec::<u8>::new());
232
233		let end = range.end().unwrap();
234		let decoded_end = FlowNodeInternalStateKey::decode(&end).unwrap();
235		assert_eq!(decoded_end.node.0, 41);
236		assert_eq!(decoded_end.key, Vec::<u8>::new());
237	}
238
239	#[test]
240	fn test_flow_node_internal_state_key_range_decode() {
241		let node = FlowNodeId(100);
242		let range = FlowNodeInternalStateKeyRange::new(node);
243
244		let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
245
246		let (start_decoded, end_decoded) = FlowNodeInternalStateKeyRange::decode(&encoded_range);
247
248		assert!(start_decoded.is_some());
249		assert_eq!(start_decoded.unwrap().node, node);
250
251		assert!(end_decoded.is_some());
252		assert_eq!(end_decoded.unwrap().node.0, 99);
253	}
254}