Skip to main content

durable_streams_server/protocol/
cursor.rs

1// Cursor generation for long-poll/SSE responses.
2//
3// Cursors are opaque strings that clients echo back in subsequent
4// requests via the `cursor` query parameter. They enable CDN request
5// collapsing by identifying the stream position.
6//
7// The value packs (read_seq, byte_offset) into a single integer that
8// fits within JavaScript's MAX_SAFE_INTEGER (2^53 - 1) so conformance
9// tests can compare cursors via parseInt().
10//
11// Each emitted cursor is strictly greater than the previous one,
12// even when the stream position hasn't changed. This satisfies the
13// "cursor collision with jitter" conformance tests while keeping the
14// offset-derived value as a floor so cursors advance with position.
15
16use std::sync::atomic::{AtomicU64, Ordering};
17
18/// Bits reserved for the byte-offset component.
19///
20/// 27 bits gives a range of 128 MB — well above the per-stream limit.
21const BYTE_OFFSET_BITS: u32 = 27;
22
23/// Global monotonic cursor counter. Each `generate()` call returns a
24/// value strictly greater than any previously returned cursor.
25static LAST_CURSOR: AtomicU64 = AtomicU64::new(0);
26
27/// Generate a monotonically increasing cursor from stream position.
28///
29/// The offset-derived value acts as a floor — when the stream advances
30/// the cursor jumps to at least the new position. Between advances the
31/// cursor still increments so consecutive responses at the same offset
32/// are distinguishable (required for jitter detection).
33///
34/// The value is a decimal integer that fits within JavaScript's
35/// `Number.MAX_SAFE_INTEGER`.
36#[must_use]
37pub fn generate(next_offset: &crate::protocol::offset::Offset) -> String {
38    if let Some((read_seq, byte_offset)) = next_offset.parse_components() {
39        let offset_value = (read_seq << BYTE_OFFSET_BITS) | byte_offset;
40        loop {
41            let last = LAST_CURSOR.load(Ordering::Relaxed);
42            let next = offset_value.max(last + 1);
43            if LAST_CURSOR
44                .compare_exchange_weak(last, next, Ordering::Relaxed, Ordering::Relaxed)
45                .is_ok()
46            {
47                return next.to_string();
48            }
49        }
50    }
51
52    // `next_offset` should never be a sentinel, but keep a deterministic fallback.
53    "0".to_string()
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59    use crate::protocol::offset::Offset;
60
61    #[test]
62    fn test_cursor_is_digits_only() {
63        let offset = Offset::new(3, 26);
64        let cursor = generate(&offset);
65        assert!(cursor.chars().all(|c| c.is_ascii_digit()));
66    }
67
68    #[test]
69    fn test_cursor_increases_for_same_offset() {
70        // Consecutive calls at the same offset must still produce
71        // strictly increasing cursors (jitter detection).
72        let offset = Offset::new(7, 42);
73        let c1: u64 = generate(&offset).parse().unwrap();
74        let c2: u64 = generate(&offset).parse().unwrap();
75        assert!(c2 > c1);
76    }
77
78    #[test]
79    fn test_cursor_changes_with_offset() {
80        let a = Offset::new(1, 2);
81        let b = Offset::new(1, 3);
82        assert_ne!(generate(&a), generate(&b));
83    }
84
85    #[test]
86    fn test_cursor_is_monotonic() {
87        // Offsets increase → cursors increase (numerically)
88        let a = Offset::new(1, 5);
89        let b = Offset::new(1, 10);
90        let c = Offset::new(2, 0);
91        let ca: u64 = generate(&a).parse().unwrap();
92        let cb: u64 = generate(&b).parse().unwrap();
93        let cc: u64 = generate(&c).parse().unwrap();
94        assert!(cb > ca);
95        assert!(cc > cb);
96    }
97
98    #[test]
99    fn test_cursor_fits_in_js_max_safe_integer() {
100        // Typical values: read_seq up to ~67M, byte_offset up to 10MB
101        let offset = Offset::new(67_000_000, 10_485_760);
102        let cursor: u64 = generate(&offset).parse().unwrap();
103        assert!(cursor < (1_u64 << 53));
104    }
105}