Skip to main content

reifydb_core/key/
flow_version.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use super::{EncodableKey, KeyKind};
5use crate::{
6	encoded::key::EncodedKey,
7	interface::catalog::flow::FlowId,
8	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11/// Key for storing a flow's last processed CDC version.
12/// Used for exactly-once processing semantics across restarts.
13#[derive(Debug, Clone, PartialEq)]
14pub struct FlowVersionKey {
15	pub flow: FlowId,
16}
17
18const VERSION: u8 = 1;
19
20impl EncodableKey for FlowVersionKey {
21	const KIND: KeyKind = KeyKind::FlowVersion;
22
23	fn encode(&self) -> EncodedKey {
24		let mut serializer = KeySerializer::with_capacity(10);
25		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.flow);
26		serializer.to_encoded_key()
27	}
28
29	fn decode(key: &EncodedKey) -> Option<Self> {
30		let mut de = KeyDeserializer::from_bytes(key.as_slice());
31
32		let version = de.read_u8().ok()?;
33		if version != VERSION {
34			return None;
35		}
36
37		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
38		if kind != Self::KIND {
39			return None;
40		}
41
42		let flow = de.read_u64().ok()?;
43
44		Some(Self {
45			flow: FlowId(flow),
46		})
47	}
48}
49
50impl FlowVersionKey {
51	pub fn new(flow: impl Into<FlowId>) -> Self {
52		Self {
53			flow: flow.into(),
54		}
55	}
56
57	pub fn encoded(flow: impl Into<FlowId>) -> EncodedKey {
58		Self::new(flow).encode()
59	}
60}
61
62#[cfg(test)]
63pub mod tests {
64	use super::{EncodableKey, FlowVersionKey};
65	use crate::interface::catalog::flow::FlowId;
66
67	#[test]
68	fn test_encode_decode() {
69		let key = FlowVersionKey {
70			flow: FlowId(0x1234),
71		};
72		let encoded = key.encode();
73		let decoded = FlowVersionKey::decode(&encoded).unwrap();
74		assert_eq!(decoded.flow, FlowId(0x1234));
75		assert_eq!(key, decoded);
76	}
77
78	#[test]
79	fn test_new_and_encoded() {
80		let key = FlowVersionKey::new(42u64);
81		assert_eq!(key.flow, FlowId(42));
82
83		let encoded = FlowVersionKey::encoded(42u64);
84		let decoded = FlowVersionKey::decode(&encoded).unwrap();
85		assert_eq!(decoded.flow, FlowId(42));
86	}
87}