Skip to main content

reifydb_core/key/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_type::value::Value;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	interface::catalog::id::RingBufferId,
10	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13const VERSION: u8 = 1;
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct RingBufferKey {
17	pub ringbuffer: RingBufferId,
18}
19
20impl RingBufferKey {
21	pub fn new(ringbuffer: RingBufferId) -> Self {
22		Self {
23			ringbuffer,
24		}
25	}
26
27	pub fn encoded(ringbuffer: impl Into<RingBufferId>) -> EncodedKey {
28		Self::new(ringbuffer.into()).encode()
29	}
30
31	pub fn full_scan() -> EncodedKeyRange {
32		EncodedKeyRange::start_end(Some(Self::ringbuffer_start()), Some(Self::ringbuffer_end()))
33	}
34
35	fn ringbuffer_start() -> EncodedKey {
36		let mut serializer = KeySerializer::with_capacity(2);
37		serializer.extend_u8(VERSION);
38		serializer.extend_u8(Self::KIND as u8);
39		serializer.to_encoded_key()
40	}
41
42	fn ringbuffer_end() -> EncodedKey {
43		let mut serializer = KeySerializer::with_capacity(2);
44		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8 - 1);
45		serializer.to_encoded_key()
46	}
47}
48
49impl EncodableKey for RingBufferKey {
50	const KIND: KeyKind = KeyKind::RingBuffer;
51
52	fn encode(&self) -> EncodedKey {
53		let mut serializer = KeySerializer::with_capacity(10);
54		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.ringbuffer);
55		serializer.to_encoded_key()
56	}
57
58	fn decode(key: &EncodedKey) -> Option<Self> {
59		let mut de = KeyDeserializer::from_bytes(key.as_slice());
60
61		let version = de.read_u8().ok()?;
62		if version != VERSION {
63			return None;
64		}
65
66		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
67		if kind != Self::KIND {
68			return None;
69		}
70
71		let ringbuffer = de.read_u64().ok()?;
72
73		Some(Self {
74			ringbuffer: RingBufferId(ringbuffer),
75		})
76	}
77}
78
79#[derive(Debug, Clone, PartialEq)]
80pub struct RingBufferMetadataKey {
81	pub ringbuffer: RingBufferId,
82	pub partition_values: Vec<Value>,
83}
84
85impl RingBufferMetadataKey {
86	pub fn new(ringbuffer: RingBufferId) -> Self {
87		Self {
88			ringbuffer,
89			partition_values: vec![],
90		}
91	}
92
93	pub fn encoded(ringbuffer: impl Into<RingBufferId>) -> EncodedKey {
94		Self::new(ringbuffer.into()).encode()
95	}
96
97	pub fn encoded_partition(ringbuffer: impl Into<RingBufferId>, partition_values: Vec<Value>) -> EncodedKey {
98		Self {
99			ringbuffer: ringbuffer.into(),
100			partition_values,
101		}
102		.encode()
103	}
104
105	/// Returns a range scanning all partition metadata keys for a given ringbuffer (prefix scan).
106	pub fn full_scan_for_ringbuffer(ringbuffer: RingBufferId) -> EncodedKeyRange {
107		let mut start = KeySerializer::with_capacity(10);
108		start.extend_u8(VERSION);
109		start.extend_u8(Self::KIND as u8);
110		start.extend_u64(ringbuffer);
111		let start_key = start.to_encoded_key();
112
113		let mut end = KeySerializer::with_capacity(10);
114		end.extend_u8(VERSION);
115		end.extend_u8(Self::KIND as u8);
116		end.extend_u64(RingBufferId(ringbuffer.0 - 1));
117		let end_key = end.to_encoded_key();
118
119		EncodedKeyRange::start_end(Some(start_key), Some(end_key))
120	}
121}
122
123impl EncodableKey for RingBufferMetadataKey {
124	const KIND: KeyKind = KeyKind::RingBufferMetadata;
125
126	fn encode(&self) -> EncodedKey {
127		let mut serializer = KeySerializer::with_capacity(32);
128		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.ringbuffer);
129		for value in &self.partition_values {
130			serializer.extend_value(value);
131		}
132		serializer.to_encoded_key()
133	}
134
135	fn decode(key: &EncodedKey) -> Option<Self> {
136		let mut de = KeyDeserializer::from_bytes(key.as_slice());
137
138		let version = de.read_u8().ok()?;
139		if version != VERSION {
140			return None;
141		}
142
143		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
144		if kind != Self::KIND {
145			return None;
146		}
147
148		let ringbuffer = de.read_u64().ok()?;
149
150		Some(Self {
151			ringbuffer: RingBufferId(ringbuffer),
152			partition_values: vec![],
153		})
154	}
155}
156
157#[cfg(test)]
158mod tests {
159	use super::*;
160
161	#[test]
162	fn test_metadata_key_encode_decode_roundtrip() {
163		let key = RingBufferMetadataKey::encoded_partition(
164			RingBufferId(42),
165			vec![Value::Utf8("east".to_string())],
166		);
167		let mut de = KeyDeserializer::from_bytes(key.as_slice());
168		let _ = (de.read_u8(), de.read_u8(), de.read_u64());
169		let value = de.read_value().unwrap();
170		assert_eq!(value, Value::Utf8("east".to_string()));
171	}
172
173	#[test]
174	fn test_metadata_key_encode_decode_multiple() {
175		let key = RingBufferMetadataKey::encoded_partition(
176			RingBufferId(7),
177			vec![Value::Utf8("us".to_string()), Value::Uint8(42)],
178		);
179		let mut de = KeyDeserializer::from_bytes(key.as_slice());
180		let _ = (de.read_u8(), de.read_u8(), de.read_u64());
181		assert_eq!(de.read_value().unwrap(), Value::Utf8("us".to_string()));
182		assert_eq!(de.read_value().unwrap(), Value::Uint8(42));
183	}
184}