Skip to main content

reifydb_core/key/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_type::value::Value;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	interface::catalog::id::RingBufferId,
10	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13#[derive(Debug, Clone, PartialEq)]
14pub struct RingBufferKey {
15	pub ringbuffer: RingBufferId,
16}
17
18impl RingBufferKey {
19	pub fn new(ringbuffer: RingBufferId) -> Self {
20		Self {
21			ringbuffer,
22		}
23	}
24
25	pub fn encoded(ringbuffer: impl Into<RingBufferId>) -> EncodedKey {
26		Self::new(ringbuffer.into()).encode()
27	}
28
29	pub fn full_scan() -> EncodedKeyRange {
30		EncodedKeyRange::start_end(Some(Self::ringbuffer_start()), Some(Self::ringbuffer_end()))
31	}
32
33	fn ringbuffer_start() -> EncodedKey {
34		let mut serializer = KeySerializer::with_capacity(1);
35		serializer.extend_u8(Self::KIND as u8);
36		serializer.to_encoded_key()
37	}
38
39	fn ringbuffer_end() -> EncodedKey {
40		let mut serializer = KeySerializer::with_capacity(1);
41		serializer.extend_u8(Self::KIND as u8 - 1);
42		serializer.to_encoded_key()
43	}
44}
45
46impl EncodableKey for RingBufferKey {
47	const KIND: KeyKind = KeyKind::RingBuffer;
48
49	fn encode(&self) -> EncodedKey {
50		let mut serializer = KeySerializer::with_capacity(9);
51		serializer.extend_u8(Self::KIND as u8).extend_u64(self.ringbuffer);
52		serializer.to_encoded_key()
53	}
54
55	fn decode(key: &EncodedKey) -> Option<Self> {
56		let mut de = KeyDeserializer::from_bytes(key.as_slice());
57
58		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
59		if kind != Self::KIND {
60			return None;
61		}
62
63		let ringbuffer = de.read_u64().ok()?;
64
65		Some(Self {
66			ringbuffer: RingBufferId(ringbuffer),
67		})
68	}
69}
70
71#[derive(Debug, Clone, PartialEq)]
72pub struct RingBufferMetadataKey {
73	pub ringbuffer: RingBufferId,
74	pub partition_values: Vec<Value>,
75}
76
77impl RingBufferMetadataKey {
78	pub fn new(ringbuffer: RingBufferId) -> Self {
79		Self {
80			ringbuffer,
81			partition_values: vec![],
82		}
83	}
84
85	pub fn encoded(ringbuffer: impl Into<RingBufferId>) -> EncodedKey {
86		Self::new(ringbuffer.into()).encode()
87	}
88
89	pub fn encoded_partition(ringbuffer: impl Into<RingBufferId>, partition_values: Vec<Value>) -> EncodedKey {
90		Self {
91			ringbuffer: ringbuffer.into(),
92			partition_values,
93		}
94		.encode()
95	}
96
97	pub fn full_scan_for_ringbuffer(ringbuffer: RingBufferId) -> EncodedKeyRange {
98		let mut start = KeySerializer::with_capacity(9);
99		start.extend_u8(Self::KIND as u8);
100		start.extend_u64(ringbuffer);
101		let start_key = start.to_encoded_key();
102
103		let mut end = KeySerializer::with_capacity(9);
104		end.extend_u8(Self::KIND as u8);
105		end.extend_u64(RingBufferId(ringbuffer.0 - 1));
106		let end_key = end.to_encoded_key();
107
108		EncodedKeyRange::start_end(Some(start_key), Some(end_key))
109	}
110}
111
112impl EncodableKey for RingBufferMetadataKey {
113	const KIND: KeyKind = KeyKind::RingBufferMetadata;
114
115	fn encode(&self) -> EncodedKey {
116		let mut serializer = KeySerializer::with_capacity(31);
117		serializer.extend_u8(Self::KIND as u8).extend_u64(self.ringbuffer);
118		for value in &self.partition_values {
119			serializer.extend_value(value);
120		}
121		serializer.to_encoded_key()
122	}
123
124	fn decode(key: &EncodedKey) -> Option<Self> {
125		let mut de = KeyDeserializer::from_bytes(key.as_slice());
126
127		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
128		if kind != Self::KIND {
129			return None;
130		}
131
132		let ringbuffer = de.read_u64().ok()?;
133
134		let mut partition_values = Vec::new();
135		while !de.is_empty() {
136			partition_values.push(de.read_value().ok()?);
137		}
138
139		Some(Self {
140			ringbuffer: RingBufferId(ringbuffer),
141			partition_values,
142		})
143	}
144}
145
146#[cfg(test)]
147mod tests {
148	use super::*;
149
150	#[test]
151	fn test_metadata_key_encode_decode_roundtrip() {
152		let key = RingBufferMetadataKey::encoded_partition(
153			RingBufferId(42),
154			vec![Value::Utf8("east".to_string())],
155		);
156		let mut de = KeyDeserializer::from_bytes(key.as_slice());
157		let _ = (de.read_u8(), de.read_u64());
158		let value = de.read_value().unwrap();
159		assert_eq!(value, Value::Utf8("east".to_string()));
160	}
161
162	#[test]
163	fn test_metadata_key_encode_decode_multiple() {
164		let key = RingBufferMetadataKey::encoded_partition(
165			RingBufferId(7),
166			vec![Value::Utf8("us".to_string()), Value::Uint8(42)],
167		);
168		let mut de = KeyDeserializer::from_bytes(key.as_slice());
169		let _ = (de.read_u8(), de.read_u64());
170		assert_eq!(de.read_value().unwrap(), Value::Utf8("us".to_string()));
171		assert_eq!(de.read_value().unwrap(), Value::Uint8(42));
172	}
173}