Skip to main content

durable_streams_server/protocol/
offset.rs

1use crate::protocol::error::{Error, Result};
2use std::cmp::Ordering;
3use std::fmt;
4use std::hash::{Hash, Hasher};
5use std::str::FromStr;
6
7/// Offset value with validated format.
8///
9/// Canonical format: `{read_seq:016x}_{byte_offset:016x}`.
10/// Sentinels: `-1` (stream start), `now` (tail/live).
11#[derive(Debug, Clone)]
12pub enum Offset {
13    Start,
14    Now,
15    Concrete {
16        read_seq: u64,
17        byte_offset: u64,
18        raw: [u8; 33],
19    },
20}
21
22impl Offset {
23    /// Sentinel value for stream start
24    pub const START: &'static str = "-1";
25
26    /// Sentinel value for stream tail/live mode
27    pub const NOW: &'static str = "now";
28
29    /// Create a new offset from read sequence and byte offset.
30    #[must_use]
31    pub fn new(read_seq: u64, byte_offset: u64) -> Self {
32        Self::Concrete {
33            read_seq,
34            byte_offset,
35            raw: encode_offset(read_seq, byte_offset),
36        }
37    }
38
39    /// Create the stream start sentinel
40    #[must_use]
41    pub fn start() -> Self {
42        Self::Start
43    }
44
45    /// Create the tail/now sentinel
46    #[must_use]
47    pub fn now() -> Self {
48        Self::Now
49    }
50
51    /// Check if this is the start sentinel
52    #[must_use]
53    pub fn is_start(&self) -> bool {
54        matches!(self, Self::Start)
55    }
56
57    /// Check if this is the now/tail sentinel
58    #[must_use]
59    pub fn is_now(&self) -> bool {
60        matches!(self, Self::Now)
61    }
62
63    /// Check if this is a sentinel value (start or now)
64    #[must_use]
65    pub fn is_sentinel(&self) -> bool {
66        matches!(self, Self::Start | Self::Now)
67    }
68
69    /// Get the canonical offset string.
70    #[must_use]
71    pub fn as_str(&self) -> &str {
72        match self {
73            Self::Start => Self::START,
74            Self::Now => Self::NOW,
75            Self::Concrete { raw, .. } => {
76                // SAFETY: `raw` is always constructed from ASCII hex digits + `_`.
77                unsafe { std::str::from_utf8_unchecked(raw) }
78            }
79        }
80    }
81
82    /// Parse the offset into (`read_seq`, `byte_offset`) components.
83    ///
84    /// Returns `None` for sentinel values.
85    #[must_use]
86    pub fn parse_components(&self) -> Option<(u64, u64)> {
87        match self {
88            Self::Concrete {
89                read_seq,
90                byte_offset,
91                ..
92            } => Some((*read_seq, *byte_offset)),
93            Self::Start | Self::Now => None,
94        }
95    }
96}
97
98impl FromStr for Offset {
99    type Err = Error;
100
101    fn from_str(s: &str) -> Result<Self> {
102        if s == Self::START {
103            return Ok(Self::Start);
104        }
105        if s == Self::NOW {
106            return Ok(Self::Now);
107        }
108
109        let bytes = s.as_bytes();
110        if bytes.len() != 33 || bytes[16] != b'_' {
111            return Err(Error::InvalidOffset(format!(
112                "Expected format 'read_seq_byte_offset', got '{s}'"
113            )));
114        }
115
116        for (idx, b) in bytes.iter().copied().enumerate() {
117            if idx == 16 {
118                continue;
119            }
120            if b.is_ascii_uppercase() {
121                return Err(Error::InvalidOffset(format!(
122                    "Offset must use lowercase hex digits: '{s}'"
123                )));
124            }
125            if !b.is_ascii_digit() && !(b'a'..=b'f').contains(&b) {
126                let part_num = if idx < 16 { 1 } else { 2 };
127                return Err(Error::InvalidOffset(format!(
128                    "Invalid hex character in part {part_num} of '{s}'"
129                )));
130            }
131        }
132
133        let read_seq = decode_hex_16(&bytes[..16])
134            .ok_or_else(|| Error::InvalidOffset(format!("Failed to parse hex values in '{s}'")))?;
135        let byte_offset = decode_hex_16(&bytes[17..])
136            .ok_or_else(|| Error::InvalidOffset(format!("Failed to parse hex values in '{s}'")))?;
137
138        Ok(Self::Concrete {
139            read_seq,
140            byte_offset,
141            raw: encode_offset(read_seq, byte_offset),
142        })
143    }
144}
145
146impl fmt::Display for Offset {
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        f.write_str(self.as_str())
149    }
150}
151
152impl From<Offset> for String {
153    fn from(offset: Offset) -> Self {
154        offset.as_str().to_string()
155    }
156}
157
158impl PartialEq for Offset {
159    fn eq(&self, other: &Self) -> bool {
160        match (self.parse_components(), other.parse_components()) {
161            (Some(a), Some(b)) => a == b,
162            _ => self.as_str() == other.as_str(),
163        }
164    }
165}
166
167impl Eq for Offset {}
168
169impl PartialOrd for Offset {
170    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
171        Some(self.cmp(other))
172    }
173}
174
175impl Ord for Offset {
176    fn cmp(&self, other: &Self) -> Ordering {
177        match (self.parse_components(), other.parse_components()) {
178            (Some((a_rs, a_bo)), Some((b_rs, b_bo))) => (a_rs, a_bo).cmp(&(b_rs, b_bo)),
179            _ => self.as_str().cmp(other.as_str()),
180        }
181    }
182}
183
184impl Hash for Offset {
185    fn hash<H: Hasher>(&self, state: &mut H) {
186        if let Some((read_seq, byte_offset)) = self.parse_components() {
187            0u8.hash(state);
188            read_seq.hash(state);
189            byte_offset.hash(state);
190        } else {
191            1u8.hash(state);
192            self.as_str().hash(state);
193        }
194    }
195}
196
197fn encode_offset(read_seq: u64, byte_offset: u64) -> [u8; 33] {
198    const HEX: &[u8; 16] = b"0123456789abcdef";
199    let mut raw = [0u8; 33];
200
201    for (i, slot) in raw[..16].iter_mut().enumerate() {
202        let shift = (15 - i) * 4;
203        *slot = HEX[((read_seq >> shift) & 0xF) as usize];
204    }
205    raw[16] = b'_';
206    for (i, slot) in raw[17..].iter_mut().enumerate() {
207        let shift = (15 - i) * 4;
208        *slot = HEX[((byte_offset >> shift) & 0xF) as usize];
209    }
210
211    raw
212}
213
214fn decode_hex_16(bytes: &[u8]) -> Option<u64> {
215    if bytes.len() != 16 {
216        return None;
217    }
218
219    let mut value = 0u64;
220    for b in bytes {
221        let digit = match b {
222            b'0'..=b'9' => b - b'0',
223            b'a'..=b'f' => b - b'a' + 10,
224            _ => return None,
225        };
226        value = (value << 4) | u64::from(digit);
227    }
228    Some(value)
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234
235    #[test]
236    fn test_offset_new() {
237        let offset = Offset::new(0, 0);
238        assert_eq!(offset.as_str(), "0000000000000000_0000000000000000");
239
240        let offset = Offset::new(1, 42);
241        assert_eq!(offset.as_str(), "0000000000000001_000000000000002a");
242
243        let offset = Offset::new(u64::MAX, u64::MAX);
244        assert_eq!(offset.as_str(), "ffffffffffffffff_ffffffffffffffff");
245    }
246
247    #[test]
248    fn test_offset_sentinels() {
249        let start = Offset::start();
250        assert!(start.is_start());
251        assert!(start.is_sentinel());
252        assert!(!start.is_now());
253        assert_eq!(start.as_str(), "-1");
254
255        let now = Offset::now();
256        assert!(now.is_now());
257        assert!(now.is_sentinel());
258        assert!(!now.is_start());
259        assert_eq!(now.as_str(), "now");
260    }
261
262    #[test]
263    fn test_offset_parse_valid() {
264        let offset: Offset = "0000000000000000_0000000000000000".parse().unwrap();
265        assert_eq!(offset.as_str(), "0000000000000000_0000000000000000");
266
267        let offset: Offset = "0000000000000001_000000000000002a".parse().unwrap();
268        assert_eq!(offset.as_str(), "0000000000000001_000000000000002a");
269
270        let offset: Offset = "-1".parse().unwrap();
271        assert!(offset.is_start());
272
273        let offset: Offset = "now".parse().unwrap();
274        assert!(offset.is_now());
275    }
276
277    #[test]
278    fn test_offset_parse_invalid() {
279        // Wrong separator
280        assert!(
281            "0000000000000000-0000000000000000"
282                .parse::<Offset>()
283                .is_err()
284        );
285
286        // Too short
287        assert!("000_000".parse::<Offset>().is_err());
288
289        // Too long
290        assert!(
291            "00000000000000000_0000000000000000"
292                .parse::<Offset>()
293                .is_err()
294        );
295
296        // Uppercase (not canonical)
297        assert!(
298            "000000000000000A_0000000000000000"
299                .parse::<Offset>()
300                .is_err()
301        );
302
303        // Invalid hex
304        assert!(
305            "000000000000000g_0000000000000000"
306                .parse::<Offset>()
307                .is_err()
308        );
309
310        // Missing underscore
311        assert!(
312            "00000000000000000000000000000000"
313                .parse::<Offset>()
314                .is_err()
315        );
316
317        // Extra parts
318        assert!(
319            "0000000000000000_0000000000000000_0000000000000000"
320                .parse::<Offset>()
321                .is_err()
322        );
323    }
324
325    #[test]
326    fn test_offset_ordering() {
327        let offset1 = Offset::new(0, 0);
328        let offset2 = Offset::new(0, 1);
329        let offset3 = Offset::new(1, 0);
330        let offset4 = Offset::new(1, 1);
331
332        assert!(offset1 < offset2);
333        assert!(offset2 < offset3);
334        assert!(offset3 < offset4);
335        assert!(offset1 < offset4);
336
337        // Lexicographic ordering equals temporal ordering
338        assert_eq!(offset1.as_str() < offset2.as_str(), offset1 < offset2);
339    }
340
341    #[test]
342    fn test_offset_parse_components() {
343        let offset = Offset::new(42, 100);
344        let (read_seq, byte_offset) = offset.parse_components().unwrap();
345        assert_eq!(read_seq, 42);
346        assert_eq!(byte_offset, 100);
347
348        // Sentinels return None
349        assert!(Offset::start().parse_components().is_none());
350        assert!(Offset::now().parse_components().is_none());
351    }
352
353    #[test]
354    fn test_offset_sentinel_ordering() {
355        let start = Offset::start();
356        let now = Offset::now();
357        let zero = Offset::new(0, 0);
358        let mid = Offset::new(5, 10);
359
360        // "-1" < "0000..." (ASCII '-' < '0')
361        assert!(start < zero);
362        assert!(start < mid);
363
364        // "now" > "0000..." (ASCII 'n' > '0')
365        assert!(now > zero);
366        assert!(now > mid);
367
368        // Sentinels are not equal to each other
369        assert_ne!(start, now);
370
371        // Same sentinel is equal to itself
372        assert_eq!(Offset::start(), Offset::start());
373        assert_eq!(Offset::now(), Offset::now());
374    }
375
376    #[test]
377    fn test_offset_equality_and_hash() {
378        use std::collections::HashSet;
379
380        let a = Offset::new(1, 2);
381        let b = Offset::new(1, 2);
382        let c = Offset::new(1, 3);
383
384        assert_eq!(a, b);
385        assert_ne!(a, c);
386
387        // Equal offsets must produce the same hash (HashSet insertion)
388        let mut set = HashSet::new();
389        set.insert(a.as_str().to_string());
390        assert!(set.contains(b.as_str()));
391
392        // Sentinels hash consistently
393        let mut set2 = HashSet::new();
394        set2.insert(Offset::start().as_str().to_string());
395        set2.insert(Offset::now().as_str().to_string());
396        assert_eq!(set2.len(), 2);
397    }
398
399    #[test]
400    fn test_offset_display() {
401        let offset = Offset::new(1, 2);
402        assert_eq!(format!("{offset}"), "0000000000000001_0000000000000002");
403
404        let start = Offset::start();
405        assert_eq!(format!("{start}"), "-1");
406
407        let now = Offset::now();
408        assert_eq!(format!("{now}"), "now");
409    }
410}