manifold_timeseries/
encoding.rs1use std::fmt;
4use std::io;
5
6#[derive(Debug)]
8pub enum EncodingError {
9 InvalidData(String),
11 Io(io::Error),
13}
14
15impl fmt::Display for EncodingError {
16 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17 match self {
18 Self::InvalidData(msg) => write!(f, "Invalid encoded data: {msg}"),
19 Self::Io(err) => write!(f, "IO error: {err}"),
20 }
21 }
22}
23
24impl std::error::Error for EncodingError {}
25
26impl From<io::Error> for EncodingError {
27 fn from(err: io::Error) -> Self {
28 Self::Io(err)
29 }
30}
31
32pub trait TimestampEncoding: Send + Sync {
38 fn encode(timestamp: u64) -> Vec<u8>;
40
41 fn decode(bytes: &[u8]) -> Result<u64, EncodingError>;
43
44 fn supports_random_access() -> bool;
46}
47
48#[derive(Debug, Clone, Copy)]
61pub struct AbsoluteEncoding;
62
63impl TimestampEncoding for AbsoluteEncoding {
64 fn encode(timestamp: u64) -> Vec<u8> {
65 timestamp.to_be_bytes().to_vec()
66 }
67
68 fn decode(bytes: &[u8]) -> Result<u64, EncodingError> {
69 if bytes.len() != 8 {
70 return Err(EncodingError::InvalidData(format!(
71 "Expected 8 bytes for AbsoluteEncoding, got {}",
72 bytes.len()
73 )));
74 }
75 let array: [u8; 8] = bytes.try_into().map_err(|_| {
76 EncodingError::InvalidData("Failed to convert bytes to array".to_string())
77 })?;
78 Ok(u64::from_be_bytes(array))
79 }
80
81 fn supports_random_access() -> bool {
82 true
83 }
84}
85
86#[derive(Debug, Clone, Copy)]
107pub struct DeltaEncoding;
108
109pub const DELTA_CHECKPOINT_INTERVAL: usize = 1000;
111
112impl DeltaEncoding {
113 #[allow(clippy::cast_possible_truncation)]
115 pub fn encode_varint(value: u64, buf: &mut Vec<u8>) {
116 let mut n = value;
117 while n >= 0x80 {
118 buf.push((n as u8) | 0x80);
119 n >>= 7;
120 }
121 buf.push(n as u8);
122 }
123
124 pub fn decode_varint(bytes: &[u8]) -> Result<(u64, usize), EncodingError> {
126 let mut result: u64 = 0;
127 let mut shift: u32 = 0;
128
129 for (i, &byte) in bytes.iter().enumerate() {
130 if shift >= 64 {
131 return Err(EncodingError::InvalidData("Varint overflow".to_string()));
132 }
133
134 let value = u64::from(byte & 0x7F);
135 result |= value << shift;
136
137 if (byte & 0x80) == 0 {
138 return Ok((result, i + 1));
139 }
140
141 shift += 7;
142 }
143
144 Err(EncodingError::InvalidData("Incomplete varint".to_string()))
145 }
146}
147
148impl TimestampEncoding for DeltaEncoding {
149 fn encode(timestamp: u64) -> Vec<u8> {
150 timestamp.to_be_bytes().to_vec()
154 }
155
156 fn decode(bytes: &[u8]) -> Result<u64, EncodingError> {
157 if bytes.len() != 8 {
158 return Err(EncodingError::InvalidData(format!(
159 "Expected 8 bytes for DeltaEncoding checkpoint, got {}",
160 bytes.len()
161 )));
162 }
163 let array: [u8; 8] = bytes.try_into().map_err(|_| {
164 EncodingError::InvalidData("Failed to convert bytes to array".to_string())
165 })?;
166 Ok(u64::from_be_bytes(array))
167 }
168
169 fn supports_random_access() -> bool {
170 false
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[test]
180 fn test_absolute_encoding() {
181 let timestamp = 1_609_459_200_000u64; let encoded = AbsoluteEncoding::encode(timestamp);
184 assert_eq!(encoded.len(), 8);
185
186 let decoded = AbsoluteEncoding::decode(&encoded).unwrap();
187 assert_eq!(decoded, timestamp);
188 }
189
190 #[test]
191 fn test_absolute_encoding_sortable() {
192 let ts1 = 1_000_000_000u64;
193 let ts2 = 2_000_000_000u64;
194 let ts3 = 3_000_000_000u64;
195
196 let enc1 = AbsoluteEncoding::encode(ts1);
197 let enc2 = AbsoluteEncoding::encode(ts2);
198 let enc3 = AbsoluteEncoding::encode(ts3);
199
200 assert!(enc1 < enc2);
202 assert!(enc2 < enc3);
203 assert!(enc1 < enc3);
204 }
205
206 #[test]
207 fn test_absolute_encoding_invalid_length() {
208 let bytes = vec![1, 2, 3]; assert!(AbsoluteEncoding::decode(&bytes).is_err());
210 }
211
212 #[test]
213 fn test_delta_encoding_basic() {
214 let timestamp = 1_609_459_200_000u64;
215
216 let encoded = DeltaEncoding::encode(timestamp);
217 assert_eq!(encoded.len(), 8);
218
219 let decoded = DeltaEncoding::decode(&encoded).unwrap();
220 assert_eq!(decoded, timestamp);
221 }
222
223 #[test]
224 fn test_varint_encoding() {
225 let test_cases = vec![
226 0u64,
227 127,
228 128,
229 16383,
230 16384,
231 2_097_151,
232 2_097_152,
233 268_435_455,
234 268_435_456,
235 ];
236
237 for value in test_cases {
238 let mut buf = Vec::new();
239 DeltaEncoding::encode_varint(value, &mut buf);
240 let (decoded, consumed) = DeltaEncoding::decode_varint(&buf).unwrap();
241 assert_eq!(decoded, value);
242 assert_eq!(consumed, buf.len());
243 }
244 }
245
246 #[test]
247 fn test_encoding_capabilities() {
248 assert!(AbsoluteEncoding::supports_random_access());
249 assert!(!DeltaEncoding::supports_random_access());
250 }
251}