Skip to main content

reifydb_core/key/
flow_node_internal_state.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::ops::Bound;
5
6use super::{EncodableKey, EncodableKeyRange, KeyKind};
7use crate::{
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	interface::catalog::flow::FlowNodeId,
10	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct FlowNodeInternalStateKey {
15	pub node: FlowNodeId,
16	pub key: Vec<u8>,
17}
18
19impl EncodableKey for FlowNodeInternalStateKey {
20	const KIND: KeyKind = KeyKind::FlowNodeInternalState;
21
22	fn encode(&self) -> EncodedKey {
23		let mut serializer = KeySerializer::with_capacity(10 + self.key.len());
24		serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0).extend_raw(&self.key);
25		serializer.to_encoded_key()
26	}
27
28	fn decode(key: &EncodedKey) -> Option<Self> {
29		let mut de = KeyDeserializer::from_bytes(key.as_slice());
30
31		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
32		if kind != Self::KIND {
33			return None;
34		}
35
36		let node_id = de.read_u64().ok()?;
37		let key_bytes = de.read_raw(de.remaining()).ok()?.to_vec();
38
39		Some(Self {
40			node: FlowNodeId(node_id),
41			key: key_bytes,
42		})
43	}
44}
45
46impl FlowNodeInternalStateKey {
47	pub const ROW_NUMBER_COUNTER_TAG: u8 = b'C';
48
49	pub const ROW_NUMBER_MAPPING_TAG: u8 = b'M';
50
51	pub const WINDOW_META_TAG: u8 = b'W';
52
53	pub const GATE_VISIBILITY_TAG: u8 = b'G';
54
55	pub fn is_row_number_counter(&self) -> bool {
56		self.key.as_slice() == [Self::ROW_NUMBER_COUNTER_TAG]
57	}
58
59	pub fn is_row_number_mapping(&self) -> bool {
60		self.key.first() == Some(&Self::ROW_NUMBER_MAPPING_TAG)
61	}
62
63	pub fn is_window_meta(&self) -> bool {
64		self.key.first() == Some(&Self::WINDOW_META_TAG)
65	}
66
67	pub fn is_gate_visibility(&self) -> bool {
68		self.key.first() == Some(&Self::GATE_VISIBILITY_TAG)
69	}
70
71	pub fn new(node: FlowNodeId, key: Vec<u8>) -> Self {
72		Self {
73			node,
74			key,
75		}
76	}
77
78	pub fn new_empty(node: FlowNodeId) -> Self {
79		Self {
80			node,
81			key: Vec::new(),
82		}
83	}
84
85	pub fn encoded(node: impl Into<FlowNodeId>, key: impl Into<Vec<u8>>) -> EncodedKey {
86		Self::new(node.into(), key.into()).encode()
87	}
88
89	pub fn node_range(node: FlowNodeId) -> EncodedKeyRange {
90		let range = FlowNodeInternalStateKeyRange::new(node);
91		EncodedKeyRange::start_end(range.start(), range.end())
92	}
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct FlowNodeInternalStateKeyRange {
97	pub node: FlowNodeId,
98}
99
100impl FlowNodeInternalStateKeyRange {
101	pub fn new(node: FlowNodeId) -> Self {
102		Self {
103			node,
104		}
105	}
106
107	fn decode_key(key: &EncodedKey) -> Option<Self> {
108		let mut de = KeyDeserializer::from_bytes(key.as_slice());
109
110		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
111		if kind != FlowNodeInternalStateKey::KIND {
112			return None;
113		}
114
115		let node_id = de.read_u64().ok()?;
116
117		Some(Self {
118			node: FlowNodeId(node_id),
119		})
120	}
121}
122
123impl EncodableKeyRange for FlowNodeInternalStateKeyRange {
124	const KIND: KeyKind = KeyKind::FlowNodeInternalState;
125
126	fn start(&self) -> Option<EncodedKey> {
127		let mut serializer = KeySerializer::with_capacity(9);
128		serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0);
129		Some(serializer.to_encoded_key())
130	}
131
132	fn end(&self) -> Option<EncodedKey> {
133		let mut serializer = KeySerializer::with_capacity(9);
134		serializer.extend_u8(Self::KIND as u8).extend_u64(self.node.0.wrapping_sub(1));
135		Some(serializer.to_encoded_key())
136	}
137
138	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
139	where
140		Self: Sized,
141	{
142		let start_key = match &range.start {
143			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
144			Bound::Unbounded => None,
145		};
146
147		let end_key = match &range.end {
148			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
149			Bound::Unbounded => None,
150		};
151
152		(start_key, end_key)
153	}
154}
155
156#[cfg(test)]
157pub mod tests {
158	use super::{EncodableKey, EncodableKeyRange, FlowNodeInternalStateKey, FlowNodeInternalStateKeyRange};
159	use crate::{
160		encoded::key::{EncodedKey, EncodedKeyRange},
161		interface::catalog::flow::FlowNodeId,
162	};
163
164	#[test]
165	fn test_encode_decode() {
166		let key = FlowNodeInternalStateKey {
167			node: FlowNodeId(0xDEADBEEF),
168			key: vec![1, 2, 3, 4],
169		};
170		let encoded = key.encode();
171
172		assert_eq!(encoded[0], 0xE0);
173
174		let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
175		assert_eq!(decoded.node.0, 0xDEADBEEF);
176		assert_eq!(decoded.key, vec![1, 2, 3, 4]);
177	}
178
179	#[test]
180	fn test_encode_decode_empty_key() {
181		let key = FlowNodeInternalStateKey {
182			node: FlowNodeId(0xDEADBEEF),
183			key: vec![],
184		};
185		let encoded = key.encode();
186
187		let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
188		assert_eq!(decoded.node.0, 0xDEADBEEF);
189		assert_eq!(decoded.key, Vec::<u8>::new());
190	}
191
192	#[test]
193	fn test_new() {
194		let key = FlowNodeInternalStateKey::new(FlowNodeId(42), vec![5, 6, 7]);
195		assert_eq!(key.node.0, 42);
196		assert_eq!(key.key, vec![5, 6, 7]);
197	}
198
199	#[test]
200	fn test_new_empty() {
201		let key = FlowNodeInternalStateKey::new_empty(FlowNodeId(42));
202		assert_eq!(key.node.0, 42);
203		assert_eq!(key.key, Vec::<u8>::new());
204	}
205
206	#[test]
207	fn test_roundtrip() {
208		let original = FlowNodeInternalStateKey {
209			node: FlowNodeId(999_999_999),
210			key: vec![10, 20, 30, 40, 50],
211		};
212		let encoded = original.encode();
213		let decoded = FlowNodeInternalStateKey::decode(&encoded).unwrap();
214		assert_eq!(original, decoded);
215	}
216
217	#[test]
218	fn test_decode_invalid_version() {
219		let mut encoded = Vec::new();
220		encoded.push(0xFF);
221		encoded.push(0xE5);
222		encoded.extend(&999u64.to_be_bytes());
223		let key = EncodedKey::new(encoded);
224		assert!(FlowNodeInternalStateKey::decode(&key).is_none());
225	}
226
227	#[test]
228	fn test_decode_invalid_kind() {
229		let mut encoded = Vec::new();
230		encoded.push(0xFE);
231		encoded.push(0xFF);
232		encoded.extend(&999u64.to_be_bytes());
233		let key = EncodedKey::new(encoded);
234		assert!(FlowNodeInternalStateKey::decode(&key).is_none());
235	}
236
237	#[test]
238	fn test_decode_too_short() {
239		let mut encoded = Vec::new();
240		encoded.push(0xFE);
241		encoded.push(0xE5);
242		encoded.extend(&999u32.to_be_bytes());
243		let key = EncodedKey::new(encoded);
244		assert!(FlowNodeInternalStateKey::decode(&key).is_none());
245	}
246
247	#[test]
248	fn test_flow_node_internal_state_key_range() {
249		let node = FlowNodeId(42);
250		let range = FlowNodeInternalStateKeyRange::new(node);
251
252		let start = range.start().unwrap();
253		let decoded_start = FlowNodeInternalStateKey::decode(&start).unwrap();
254		assert_eq!(decoded_start.node, node);
255		assert_eq!(decoded_start.key, Vec::<u8>::new());
256
257		let end = range.end().unwrap();
258		let decoded_end = FlowNodeInternalStateKey::decode(&end).unwrap();
259		assert_eq!(decoded_end.node.0, 41);
260		assert_eq!(decoded_end.key, Vec::<u8>::new());
261	}
262
263	#[test]
264	fn test_flow_node_internal_state_key_range_decode() {
265		let node = FlowNodeId(100);
266		let range = FlowNodeInternalStateKeyRange::new(node);
267
268		let encoded_range = EncodedKeyRange::start_end(range.start(), range.end());
269
270		let (start_decoded, end_decoded) = FlowNodeInternalStateKeyRange::decode(&encoded_range);
271
272		assert!(start_decoded.is_some());
273		assert_eq!(start_decoded.unwrap().node, node);
274
275		assert!(end_decoded.is_some());
276		assert_eq!(end_decoded.unwrap().node.0, 99);
277	}
278}