reifydb_core/key/
ringbuffer.rs1use reifydb_type::value::Value;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 interface::catalog::id::RingBufferId,
10 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13#[derive(Debug, Clone, PartialEq)]
14pub struct RingBufferKey {
15 pub ringbuffer: RingBufferId,
16}
17
18impl RingBufferKey {
19 pub fn new(ringbuffer: RingBufferId) -> Self {
20 Self {
21 ringbuffer,
22 }
23 }
24
25 pub fn encoded(ringbuffer: impl Into<RingBufferId>) -> EncodedKey {
26 Self::new(ringbuffer.into()).encode()
27 }
28
29 pub fn full_scan() -> EncodedKeyRange {
30 EncodedKeyRange::start_end(Some(Self::ringbuffer_start()), Some(Self::ringbuffer_end()))
31 }
32
33 fn ringbuffer_start() -> EncodedKey {
34 let mut serializer = KeySerializer::with_capacity(1);
35 serializer.extend_u8(Self::KIND as u8);
36 serializer.to_encoded_key()
37 }
38
39 fn ringbuffer_end() -> EncodedKey {
40 let mut serializer = KeySerializer::with_capacity(1);
41 serializer.extend_u8(Self::KIND as u8 - 1);
42 serializer.to_encoded_key()
43 }
44}
45
46impl EncodableKey for RingBufferKey {
47 const KIND: KeyKind = KeyKind::RingBuffer;
48
49 fn encode(&self) -> EncodedKey {
50 let mut serializer = KeySerializer::with_capacity(9);
51 serializer.extend_u8(Self::KIND as u8).extend_u64(self.ringbuffer);
52 serializer.to_encoded_key()
53 }
54
55 fn decode(key: &EncodedKey) -> Option<Self> {
56 let mut de = KeyDeserializer::from_bytes(key.as_slice());
57
58 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
59 if kind != Self::KIND {
60 return None;
61 }
62
63 let ringbuffer = de.read_u64().ok()?;
64
65 Some(Self {
66 ringbuffer: RingBufferId(ringbuffer),
67 })
68 }
69}
70
71#[derive(Debug, Clone, PartialEq)]
72pub struct RingBufferMetadataKey {
73 pub ringbuffer: RingBufferId,
74 pub partition_values: Vec<Value>,
75}
76
77impl RingBufferMetadataKey {
78 pub fn new(ringbuffer: RingBufferId) -> Self {
79 Self {
80 ringbuffer,
81 partition_values: vec![],
82 }
83 }
84
85 pub fn encoded(ringbuffer: impl Into<RingBufferId>) -> EncodedKey {
86 Self::new(ringbuffer.into()).encode()
87 }
88
89 pub fn encoded_partition(ringbuffer: impl Into<RingBufferId>, partition_values: Vec<Value>) -> EncodedKey {
90 Self {
91 ringbuffer: ringbuffer.into(),
92 partition_values,
93 }
94 .encode()
95 }
96
97 pub fn full_scan_for_ringbuffer(ringbuffer: RingBufferId) -> EncodedKeyRange {
98 let mut start = KeySerializer::with_capacity(9);
99 start.extend_u8(Self::KIND as u8);
100 start.extend_u64(ringbuffer);
101 let start_key = start.to_encoded_key();
102
103 let mut end = KeySerializer::with_capacity(9);
104 end.extend_u8(Self::KIND as u8);
105 end.extend_u64(RingBufferId(ringbuffer.0 - 1));
106 let end_key = end.to_encoded_key();
107
108 EncodedKeyRange::start_end(Some(start_key), Some(end_key))
109 }
110}
111
112impl EncodableKey for RingBufferMetadataKey {
113 const KIND: KeyKind = KeyKind::RingBufferMetadata;
114
115 fn encode(&self) -> EncodedKey {
116 let mut serializer = KeySerializer::with_capacity(31);
117 serializer.extend_u8(Self::KIND as u8).extend_u64(self.ringbuffer);
118 for value in &self.partition_values {
119 serializer.extend_value(value);
120 }
121 serializer.to_encoded_key()
122 }
123
124 fn decode(key: &EncodedKey) -> Option<Self> {
125 let mut de = KeyDeserializer::from_bytes(key.as_slice());
126
127 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
128 if kind != Self::KIND {
129 return None;
130 }
131
132 let ringbuffer = de.read_u64().ok()?;
133
134 let mut partition_values = Vec::new();
135 while !de.is_empty() {
136 partition_values.push(de.read_value().ok()?);
137 }
138
139 Some(Self {
140 ringbuffer: RingBufferId(ringbuffer),
141 partition_values,
142 })
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn test_metadata_key_encode_decode_roundtrip() {
152 let key = RingBufferMetadataKey::encoded_partition(
153 RingBufferId(42),
154 vec![Value::Utf8("east".to_string())],
155 );
156 let mut de = KeyDeserializer::from_bytes(key.as_slice());
157 let _ = (de.read_u8(), de.read_u64());
158 let value = de.read_value().unwrap();
159 assert_eq!(value, Value::Utf8("east".to_string()));
160 }
161
162 #[test]
163 fn test_metadata_key_encode_decode_multiple() {
164 let key = RingBufferMetadataKey::encoded_partition(
165 RingBufferId(7),
166 vec![Value::Utf8("us".to_string()), Value::Uint8(42)],
167 );
168 let mut de = KeyDeserializer::from_bytes(key.as_slice());
169 let _ = (de.read_u8(), de.read_u64());
170 assert_eq!(de.read_value().unwrap(), Value::Utf8("us".to_string()));
171 assert_eq!(de.read_value().unwrap(), Value::Uint8(42));
172 }
173}