Skip to main content

durable_streams_server/protocol/
offset.rs

1use crate::protocol::error::{Error, Result};
2use std::cmp::Ordering;
3use std::fmt;
4use std::hash::{Hash, Hasher};
5use std::str::FromStr;
6
7/// Offset value with validated format.
8///
9/// Canonical format: `{read_seq:016x}_{byte_offset:016x}`.
10/// Sentinels: `-1` (stream start), `now` (tail/live).
11#[derive(Debug, Clone)]
12pub enum Offset {
13    Start,
14    Now,
15    Concrete {
16        read_seq: u64,
17        byte_offset: u64,
18        raw: [u8; 33],
19    },
20}
21
22impl Offset {
23    /// Sentinel value for stream start
24    pub const START: &'static str = "-1";
25
26    /// Sentinel value for stream tail/live mode
27    pub const NOW: &'static str = "now";
28
29    /// Create a new offset from read sequence and byte offset.
30    #[must_use]
31    pub fn new(read_seq: u64, byte_offset: u64) -> Self {
32        Self::Concrete {
33            read_seq,
34            byte_offset,
35            raw: encode_offset(read_seq, byte_offset),
36        }
37    }
38
39    /// Create the stream start sentinel
40    #[must_use]
41    pub fn start() -> Self {
42        Self::Start
43    }
44
45    /// Create the tail/now sentinel
46    #[must_use]
47    pub fn now() -> Self {
48        Self::Now
49    }
50
51    /// Check if this is the start sentinel
52    #[must_use]
53    pub fn is_start(&self) -> bool {
54        matches!(self, Self::Start)
55    }
56
57    /// Check if this is the now/tail sentinel
58    #[must_use]
59    pub fn is_now(&self) -> bool {
60        matches!(self, Self::Now)
61    }
62
63    /// Check if this is a sentinel value (start or now)
64    #[must_use]
65    pub fn is_sentinel(&self) -> bool {
66        matches!(self, Self::Start | Self::Now)
67    }
68
69    /// Get the canonical offset string.
70    #[must_use]
71    pub fn as_str(&self) -> &str {
72        match self {
73            Self::Start => Self::START,
74            Self::Now => Self::NOW,
75            Self::Concrete { raw, .. } => {
76                // SAFETY: `raw` is always constructed from ASCII hex digits + `_`.
77                unsafe { std::str::from_utf8_unchecked(raw) }
78            }
79        }
80    }
81
82    /// Parse the offset into (`read_seq`, `byte_offset`) components.
83    ///
84    /// Returns `None` for sentinel values.
85    #[must_use]
86    pub fn parse_components(&self) -> Option<(u64, u64)> {
87        match self {
88            Self::Concrete {
89                read_seq,
90                byte_offset,
91                ..
92            } => Some((*read_seq, *byte_offset)),
93            Self::Start | Self::Now => None,
94        }
95    }
96}
97
98impl FromStr for Offset {
99    type Err = Error;
100
101    fn from_str(s: &str) -> Result<Self> {
102        if s == Self::START {
103            return Ok(Self::Start);
104        }
105        if s == Self::NOW {
106            return Ok(Self::Now);
107        }
108
109        let bytes = s.as_bytes();
110        if bytes.len() != 33 || bytes[16] != b'_' {
111            return Err(Error::InvalidOffset(format!(
112                "Expected format 'read_seq_byte_offset', got '{s}'"
113            )));
114        }
115
116        for (idx, b) in bytes.iter().copied().enumerate() {
117            if idx == 16 {
118                continue;
119            }
120            if b.is_ascii_uppercase() {
121                return Err(Error::InvalidOffset(format!(
122                    "Offset must use lowercase hex digits: '{s}'"
123                )));
124            }
125            if !b.is_ascii_digit() && !(b'a'..=b'f').contains(&b) {
126                let part_num = if idx < 16 { 1 } else { 2 };
127                return Err(Error::InvalidOffset(format!(
128                    "Invalid hex character in part {part_num} of '{s}'"
129                )));
130            }
131        }
132
133        let read_seq = decode_hex_16(&bytes[..16])
134            .ok_or_else(|| Error::InvalidOffset(format!("Failed to parse hex values in '{s}'")))?;
135        let byte_offset = decode_hex_16(&bytes[17..])
136            .ok_or_else(|| Error::InvalidOffset(format!("Failed to parse hex values in '{s}'")))?;
137
138        Ok(Self::Concrete {
139            read_seq,
140            byte_offset,
141            raw: encode_offset(read_seq, byte_offset),
142        })
143    }
144}
145
146impl fmt::Display for Offset {
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        f.write_str(self.as_str())
149    }
150}
151
152impl From<Offset> for String {
153    fn from(offset: Offset) -> Self {
154        offset.as_str().to_string()
155    }
156}
157
158impl PartialEq for Offset {
159    fn eq(&self, other: &Self) -> bool {
160        match (self.parse_components(), other.parse_components()) {
161            (Some(a), Some(b)) => a == b,
162            _ => self.as_str() == other.as_str(),
163        }
164    }
165}
166
167impl Eq for Offset {}
168
169impl PartialOrd for Offset {
170    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
171        Some(self.cmp(other))
172    }
173}
174
175impl Ord for Offset {
176    fn cmp(&self, other: &Self) -> Ordering {
177        match (self.parse_components(), other.parse_components()) {
178            (Some((a_rs, a_bo)), Some((b_rs, b_bo))) => (a_rs, a_bo).cmp(&(b_rs, b_bo)),
179            _ => self.as_str().cmp(other.as_str()),
180        }
181    }
182}
183
184impl Hash for Offset {
185    fn hash<H: Hasher>(&self, state: &mut H) {
186        if let Some((read_seq, byte_offset)) = self.parse_components() {
187            0u8.hash(state);
188            read_seq.hash(state);
189            byte_offset.hash(state);
190        } else {
191            1u8.hash(state);
192            self.as_str().hash(state);
193        }
194    }
195}
196
197fn encode_offset(read_seq: u64, byte_offset: u64) -> [u8; 33] {
198    const HEX: &[u8; 16] = b"0123456789abcdef";
199    let mut raw = [0u8; 33];
200
201    for (i, slot) in raw[..16].iter_mut().enumerate() {
202        let shift = (15 - i) * 4;
203        *slot = HEX[((read_seq >> shift) & 0xF) as usize];
204    }
205    raw[16] = b'_';
206    for (i, slot) in raw[17..].iter_mut().enumerate() {
207        let shift = (15 - i) * 4;
208        *slot = HEX[((byte_offset >> shift) & 0xF) as usize];
209    }
210
211    raw
212}
213
214fn decode_hex_16(bytes: &[u8]) -> Option<u64> {
215    if bytes.len() != 16 {
216        return None;
217    }
218
219    let mut value = 0u64;
220    for b in bytes {
221        let digit = match b {
222            b'0'..=b'9' => b - b'0',
223            b'a'..=b'f' => b - b'a' + 10,
224            _ => return None,
225        };
226        value = (value << 4) | u64::from(digit);
227    }
228    Some(value)
229}
230
231/// Serialize an `Offset` as its canonical string representation.
232///
233/// # Errors
234///
235/// Returns an error if the underlying serializer fails to serialize the
236/// string representation.
237pub fn serialize_offset<S: serde::Serializer>(
238    offset: &Offset,
239    s: S,
240) -> std::result::Result<S::Ok, S::Error> {
241    s.serialize_str(offset.as_str())
242}
243
244/// Deserialize an `Offset` from its canonical string representation.
245///
246/// # Errors
247///
248/// Returns an error if the underlying deserializer fails to produce a
249/// string, or if the string cannot be parsed as a valid `Offset`.
250pub fn deserialize_offset<'de, D: serde::Deserializer<'de>>(
251    d: D,
252) -> std::result::Result<Offset, D::Error> {
253    let raw = <String as serde::Deserialize>::deserialize(d)?;
254    raw.parse::<Offset>().map_err(serde::de::Error::custom)
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    #[test]
262    fn test_offset_new() {
263        let offset = Offset::new(0, 0);
264        assert_eq!(offset.as_str(), "0000000000000000_0000000000000000");
265
266        let offset = Offset::new(1, 42);
267        assert_eq!(offset.as_str(), "0000000000000001_000000000000002a");
268
269        let offset = Offset::new(u64::MAX, u64::MAX);
270        assert_eq!(offset.as_str(), "ffffffffffffffff_ffffffffffffffff");
271    }
272
273    #[test]
274    fn test_offset_sentinels() {
275        let start = Offset::start();
276        assert!(start.is_start());
277        assert!(start.is_sentinel());
278        assert!(!start.is_now());
279        assert_eq!(start.as_str(), "-1");
280
281        let now = Offset::now();
282        assert!(now.is_now());
283        assert!(now.is_sentinel());
284        assert!(!now.is_start());
285        assert_eq!(now.as_str(), "now");
286    }
287
288    #[test]
289    fn test_offset_parse_valid() {
290        let offset: Offset = "0000000000000000_0000000000000000".parse().unwrap();
291        assert_eq!(offset.as_str(), "0000000000000000_0000000000000000");
292
293        let offset: Offset = "0000000000000001_000000000000002a".parse().unwrap();
294        assert_eq!(offset.as_str(), "0000000000000001_000000000000002a");
295
296        let offset: Offset = "-1".parse().unwrap();
297        assert!(offset.is_start());
298
299        let offset: Offset = "now".parse().unwrap();
300        assert!(offset.is_now());
301    }
302
303    #[test]
304    fn test_offset_parse_invalid() {
305        // Wrong separator
306        assert!(
307            "0000000000000000-0000000000000000"
308                .parse::<Offset>()
309                .is_err()
310        );
311
312        // Too short
313        assert!("000_000".parse::<Offset>().is_err());
314
315        // Too long
316        assert!(
317            "00000000000000000_0000000000000000"
318                .parse::<Offset>()
319                .is_err()
320        );
321
322        // Uppercase (not canonical)
323        assert!(
324            "000000000000000A_0000000000000000"
325                .parse::<Offset>()
326                .is_err()
327        );
328
329        // Invalid hex
330        assert!(
331            "000000000000000g_0000000000000000"
332                .parse::<Offset>()
333                .is_err()
334        );
335
336        // Missing underscore
337        assert!(
338            "00000000000000000000000000000000"
339                .parse::<Offset>()
340                .is_err()
341        );
342
343        // Extra parts
344        assert!(
345            "0000000000000000_0000000000000000_0000000000000000"
346                .parse::<Offset>()
347                .is_err()
348        );
349    }
350
351    #[test]
352    fn test_offset_ordering() {
353        let offset1 = Offset::new(0, 0);
354        let offset2 = Offset::new(0, 1);
355        let offset3 = Offset::new(1, 0);
356        let offset4 = Offset::new(1, 1);
357
358        assert!(offset1 < offset2);
359        assert!(offset2 < offset3);
360        assert!(offset3 < offset4);
361        assert!(offset1 < offset4);
362
363        // Lexicographic ordering equals temporal ordering
364        assert_eq!(offset1.as_str() < offset2.as_str(), offset1 < offset2);
365    }
366
367    #[test]
368    fn test_offset_parse_components() {
369        let offset = Offset::new(42, 100);
370        let (read_seq, byte_offset) = offset.parse_components().unwrap();
371        assert_eq!(read_seq, 42);
372        assert_eq!(byte_offset, 100);
373
374        // Sentinels return None
375        assert!(Offset::start().parse_components().is_none());
376        assert!(Offset::now().parse_components().is_none());
377    }
378
379    #[test]
380    fn test_offset_sentinel_ordering() {
381        let start = Offset::start();
382        let now = Offset::now();
383        let zero = Offset::new(0, 0);
384        let mid = Offset::new(5, 10);
385
386        // "-1" < "0000..." (ASCII '-' < '0')
387        assert!(start < zero);
388        assert!(start < mid);
389
390        // "now" > "0000..." (ASCII 'n' > '0')
391        assert!(now > zero);
392        assert!(now > mid);
393
394        // Sentinels are not equal to each other
395        assert_ne!(start, now);
396
397        // Same sentinel is equal to itself
398        assert_eq!(Offset::start(), Offset::start());
399        assert_eq!(Offset::now(), Offset::now());
400    }
401
402    #[test]
403    fn test_offset_equality_and_hash() {
404        use std::collections::HashSet;
405
406        let a = Offset::new(1, 2);
407        let b = Offset::new(1, 2);
408        let c = Offset::new(1, 3);
409
410        assert_eq!(a, b);
411        assert_ne!(a, c);
412
413        // Equal offsets must produce the same hash (HashSet insertion)
414        let mut set = HashSet::new();
415        set.insert(a.as_str().to_string());
416        assert!(set.contains(b.as_str()));
417
418        // Sentinels hash consistently
419        let mut set2 = HashSet::new();
420        set2.insert(Offset::start().as_str().to_string());
421        set2.insert(Offset::now().as_str().to_string());
422        assert_eq!(set2.len(), 2);
423    }
424
425    #[test]
426    fn test_offset_display() {
427        let offset = Offset::new(1, 2);
428        assert_eq!(format!("{offset}"), "0000000000000001_0000000000000002");
429
430        let start = Offset::start();
431        assert_eq!(format!("{start}"), "-1");
432
433        let now = Offset::now();
434        assert_eq!(format!("{now}"), "now");
435    }
436}