durable_streams_server/protocol/
offset.rs1use crate::protocol::error::{Error, Result};
2use std::cmp::Ordering;
3use std::fmt;
4use std::hash::{Hash, Hasher};
5use std::str::FromStr;
6
7#[derive(Debug, Clone)]
12pub enum Offset {
13 Start,
14 Now,
15 Concrete {
16 read_seq: u64,
17 byte_offset: u64,
18 raw: [u8; 33],
19 },
20}
21
22impl Offset {
23 pub const START: &'static str = "-1";
25
26 pub const NOW: &'static str = "now";
28
29 #[must_use]
31 pub fn new(read_seq: u64, byte_offset: u64) -> Self {
32 Self::Concrete {
33 read_seq,
34 byte_offset,
35 raw: encode_offset(read_seq, byte_offset),
36 }
37 }
38
39 #[must_use]
41 pub fn start() -> Self {
42 Self::Start
43 }
44
45 #[must_use]
47 pub fn now() -> Self {
48 Self::Now
49 }
50
51 #[must_use]
53 pub fn is_start(&self) -> bool {
54 matches!(self, Self::Start)
55 }
56
57 #[must_use]
59 pub fn is_now(&self) -> bool {
60 matches!(self, Self::Now)
61 }
62
63 #[must_use]
65 pub fn is_sentinel(&self) -> bool {
66 matches!(self, Self::Start | Self::Now)
67 }
68
69 #[must_use]
71 pub fn as_str(&self) -> &str {
72 match self {
73 Self::Start => Self::START,
74 Self::Now => Self::NOW,
75 Self::Concrete { raw, .. } => {
76 unsafe { std::str::from_utf8_unchecked(raw) }
78 }
79 }
80 }
81
82 #[must_use]
86 pub fn parse_components(&self) -> Option<(u64, u64)> {
87 match self {
88 Self::Concrete {
89 read_seq,
90 byte_offset,
91 ..
92 } => Some((*read_seq, *byte_offset)),
93 Self::Start | Self::Now => None,
94 }
95 }
96}
97
98impl FromStr for Offset {
99 type Err = Error;
100
101 fn from_str(s: &str) -> Result<Self> {
102 if s == Self::START {
103 return Ok(Self::Start);
104 }
105 if s == Self::NOW {
106 return Ok(Self::Now);
107 }
108
109 let bytes = s.as_bytes();
110 if bytes.len() != 33 || bytes[16] != b'_' {
111 return Err(Error::InvalidOffset(format!(
112 "Expected format 'read_seq_byte_offset', got '{s}'"
113 )));
114 }
115
116 for (idx, b) in bytes.iter().copied().enumerate() {
117 if idx == 16 {
118 continue;
119 }
120 if b.is_ascii_uppercase() {
121 return Err(Error::InvalidOffset(format!(
122 "Offset must use lowercase hex digits: '{s}'"
123 )));
124 }
125 if !b.is_ascii_digit() && !(b'a'..=b'f').contains(&b) {
126 let part_num = if idx < 16 { 1 } else { 2 };
127 return Err(Error::InvalidOffset(format!(
128 "Invalid hex character in part {part_num} of '{s}'"
129 )));
130 }
131 }
132
133 let read_seq = decode_hex_16(&bytes[..16])
134 .ok_or_else(|| Error::InvalidOffset(format!("Failed to parse hex values in '{s}'")))?;
135 let byte_offset = decode_hex_16(&bytes[17..])
136 .ok_or_else(|| Error::InvalidOffset(format!("Failed to parse hex values in '{s}'")))?;
137
138 Ok(Self::Concrete {
139 read_seq,
140 byte_offset,
141 raw: encode_offset(read_seq, byte_offset),
142 })
143 }
144}
145
146impl fmt::Display for Offset {
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 f.write_str(self.as_str())
149 }
150}
151
152impl From<Offset> for String {
153 fn from(offset: Offset) -> Self {
154 offset.as_str().to_string()
155 }
156}
157
158impl PartialEq for Offset {
159 fn eq(&self, other: &Self) -> bool {
160 match (self.parse_components(), other.parse_components()) {
161 (Some(a), Some(b)) => a == b,
162 _ => self.as_str() == other.as_str(),
163 }
164 }
165}
166
167impl Eq for Offset {}
168
169impl PartialOrd for Offset {
170 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
171 Some(self.cmp(other))
172 }
173}
174
175impl Ord for Offset {
176 fn cmp(&self, other: &Self) -> Ordering {
177 match (self.parse_components(), other.parse_components()) {
178 (Some((a_rs, a_bo)), Some((b_rs, b_bo))) => (a_rs, a_bo).cmp(&(b_rs, b_bo)),
179 _ => self.as_str().cmp(other.as_str()),
180 }
181 }
182}
183
184impl Hash for Offset {
185 fn hash<H: Hasher>(&self, state: &mut H) {
186 if let Some((read_seq, byte_offset)) = self.parse_components() {
187 0u8.hash(state);
188 read_seq.hash(state);
189 byte_offset.hash(state);
190 } else {
191 1u8.hash(state);
192 self.as_str().hash(state);
193 }
194 }
195}
196
197fn encode_offset(read_seq: u64, byte_offset: u64) -> [u8; 33] {
198 const HEX: &[u8; 16] = b"0123456789abcdef";
199 let mut raw = [0u8; 33];
200
201 for (i, slot) in raw[..16].iter_mut().enumerate() {
202 let shift = (15 - i) * 4;
203 *slot = HEX[((read_seq >> shift) & 0xF) as usize];
204 }
205 raw[16] = b'_';
206 for (i, slot) in raw[17..].iter_mut().enumerate() {
207 let shift = (15 - i) * 4;
208 *slot = HEX[((byte_offset >> shift) & 0xF) as usize];
209 }
210
211 raw
212}
213
214fn decode_hex_16(bytes: &[u8]) -> Option<u64> {
215 if bytes.len() != 16 {
216 return None;
217 }
218
219 let mut value = 0u64;
220 for b in bytes {
221 let digit = match b {
222 b'0'..=b'9' => b - b'0',
223 b'a'..=b'f' => b - b'a' + 10,
224 _ => return None,
225 };
226 value = (value << 4) | u64::from(digit);
227 }
228 Some(value)
229}
230
231pub fn serialize_offset<S: serde::Serializer>(
238 offset: &Offset,
239 s: S,
240) -> std::result::Result<S::Ok, S::Error> {
241 s.serialize_str(offset.as_str())
242}
243
244pub fn deserialize_offset<'de, D: serde::Deserializer<'de>>(
251 d: D,
252) -> std::result::Result<Offset, D::Error> {
253 let raw = <String as serde::Deserialize>::deserialize(d)?;
254 raw.parse::<Offset>().map_err(serde::de::Error::custom)
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260
261 #[test]
262 fn test_offset_new() {
263 let offset = Offset::new(0, 0);
264 assert_eq!(offset.as_str(), "0000000000000000_0000000000000000");
265
266 let offset = Offset::new(1, 42);
267 assert_eq!(offset.as_str(), "0000000000000001_000000000000002a");
268
269 let offset = Offset::new(u64::MAX, u64::MAX);
270 assert_eq!(offset.as_str(), "ffffffffffffffff_ffffffffffffffff");
271 }
272
273 #[test]
274 fn test_offset_sentinels() {
275 let start = Offset::start();
276 assert!(start.is_start());
277 assert!(start.is_sentinel());
278 assert!(!start.is_now());
279 assert_eq!(start.as_str(), "-1");
280
281 let now = Offset::now();
282 assert!(now.is_now());
283 assert!(now.is_sentinel());
284 assert!(!now.is_start());
285 assert_eq!(now.as_str(), "now");
286 }
287
288 #[test]
289 fn test_offset_parse_valid() {
290 let offset: Offset = "0000000000000000_0000000000000000".parse().unwrap();
291 assert_eq!(offset.as_str(), "0000000000000000_0000000000000000");
292
293 let offset: Offset = "0000000000000001_000000000000002a".parse().unwrap();
294 assert_eq!(offset.as_str(), "0000000000000001_000000000000002a");
295
296 let offset: Offset = "-1".parse().unwrap();
297 assert!(offset.is_start());
298
299 let offset: Offset = "now".parse().unwrap();
300 assert!(offset.is_now());
301 }
302
303 #[test]
304 fn test_offset_parse_invalid() {
305 assert!(
307 "0000000000000000-0000000000000000"
308 .parse::<Offset>()
309 .is_err()
310 );
311
312 assert!("000_000".parse::<Offset>().is_err());
314
315 assert!(
317 "00000000000000000_0000000000000000"
318 .parse::<Offset>()
319 .is_err()
320 );
321
322 assert!(
324 "000000000000000A_0000000000000000"
325 .parse::<Offset>()
326 .is_err()
327 );
328
329 assert!(
331 "000000000000000g_0000000000000000"
332 .parse::<Offset>()
333 .is_err()
334 );
335
336 assert!(
338 "00000000000000000000000000000000"
339 .parse::<Offset>()
340 .is_err()
341 );
342
343 assert!(
345 "0000000000000000_0000000000000000_0000000000000000"
346 .parse::<Offset>()
347 .is_err()
348 );
349 }
350
351 #[test]
352 fn test_offset_ordering() {
353 let offset1 = Offset::new(0, 0);
354 let offset2 = Offset::new(0, 1);
355 let offset3 = Offset::new(1, 0);
356 let offset4 = Offset::new(1, 1);
357
358 assert!(offset1 < offset2);
359 assert!(offset2 < offset3);
360 assert!(offset3 < offset4);
361 assert!(offset1 < offset4);
362
363 assert_eq!(offset1.as_str() < offset2.as_str(), offset1 < offset2);
365 }
366
367 #[test]
368 fn test_offset_parse_components() {
369 let offset = Offset::new(42, 100);
370 let (read_seq, byte_offset) = offset.parse_components().unwrap();
371 assert_eq!(read_seq, 42);
372 assert_eq!(byte_offset, 100);
373
374 assert!(Offset::start().parse_components().is_none());
376 assert!(Offset::now().parse_components().is_none());
377 }
378
379 #[test]
380 fn test_offset_sentinel_ordering() {
381 let start = Offset::start();
382 let now = Offset::now();
383 let zero = Offset::new(0, 0);
384 let mid = Offset::new(5, 10);
385
386 assert!(start < zero);
388 assert!(start < mid);
389
390 assert!(now > zero);
392 assert!(now > mid);
393
394 assert_ne!(start, now);
396
397 assert_eq!(Offset::start(), Offset::start());
399 assert_eq!(Offset::now(), Offset::now());
400 }
401
402 #[test]
403 fn test_offset_equality_and_hash() {
404 use std::collections::HashSet;
405
406 let a = Offset::new(1, 2);
407 let b = Offset::new(1, 2);
408 let c = Offset::new(1, 3);
409
410 assert_eq!(a, b);
411 assert_ne!(a, c);
412
413 let mut set = HashSet::new();
415 set.insert(a.as_str().to_string());
416 assert!(set.contains(b.as_str()));
417
418 let mut set2 = HashSet::new();
420 set2.insert(Offset::start().as_str().to_string());
421 set2.insert(Offset::now().as_str().to_string());
422 assert_eq!(set2.len(), 2);
423 }
424
425 #[test]
426 fn test_offset_display() {
427 let offset = Offset::new(1, 2);
428 assert_eq!(format!("{offset}"), "0000000000000001_0000000000000002");
429
430 let start = Offset::start();
431 assert_eq!(format!("{start}"), "-1");
432
433 let now = Offset::now();
434 assert_eq!(format!("{now}"), "now");
435 }
436}