durable_streams_server/protocol/cursor.rs
1// Cursor generation for long-poll/SSE responses.
2//
3// Cursors are opaque strings that clients echo back in subsequent
4// requests via the `cursor` query parameter. They enable CDN request
5// collapsing by identifying the stream position.
6//
7// The value packs (read_seq, byte_offset) into a single integer that
8// fits within JavaScript's MAX_SAFE_INTEGER (2^53 - 1) so conformance
9// tests can compare cursors via parseInt().
10//
11// Each emitted cursor is strictly greater than the previous one,
12// even when the stream position hasn't changed. This satisfies the
13// "cursor collision with jitter" conformance tests while keeping the
14// offset-derived value as a floor so cursors advance with position.
15
16use std::sync::atomic::{AtomicU64, Ordering};
17
18/// Bits reserved for the byte-offset component.
19///
20/// 27 bits gives a range of 128 MB — well above the per-stream limit.
21const BYTE_OFFSET_BITS: u32 = 27;
22
23/// Global monotonic cursor counter. Each `generate()` call returns a
24/// value strictly greater than any previously returned cursor.
25static LAST_CURSOR: AtomicU64 = AtomicU64::new(0);
26
27/// Generate a monotonically increasing cursor from stream position.
28///
29/// The offset-derived value acts as a floor — when the stream advances
30/// the cursor jumps to at least the new position. Between advances the
31/// cursor still increments so consecutive responses at the same offset
32/// are distinguishable (required for jitter detection).
33///
34/// The value is a decimal integer that fits within JavaScript's
35/// `Number.MAX_SAFE_INTEGER`.
36#[must_use]
37pub fn generate(next_offset: &crate::protocol::offset::Offset) -> String {
38 if let Some((read_seq, byte_offset)) = next_offset.parse_components() {
39 let offset_value = (read_seq << BYTE_OFFSET_BITS) | byte_offset;
40 loop {
41 let last = LAST_CURSOR.load(Ordering::Relaxed);
42 let next = offset_value.max(last + 1);
43 if LAST_CURSOR
44 .compare_exchange_weak(last, next, Ordering::Relaxed, Ordering::Relaxed)
45 .is_ok()
46 {
47 return next.to_string();
48 }
49 }
50 }
51
52 // `next_offset` should never be a sentinel, but keep a deterministic fallback.
53 "0".to_string()
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59 use crate::protocol::offset::Offset;
60
61 #[test]
62 fn test_cursor_is_digits_only() {
63 let offset = Offset::new(3, 26);
64 let cursor = generate(&offset);
65 assert!(cursor.chars().all(|c| c.is_ascii_digit()));
66 }
67
68 #[test]
69 fn test_cursor_increases_for_same_offset() {
70 // Consecutive calls at the same offset must still produce
71 // strictly increasing cursors (jitter detection).
72 let offset = Offset::new(7, 42);
73 let c1: u64 = generate(&offset).parse().unwrap();
74 let c2: u64 = generate(&offset).parse().unwrap();
75 assert!(c2 > c1);
76 }
77
78 #[test]
79 fn test_cursor_changes_with_offset() {
80 let a = Offset::new(1, 2);
81 let b = Offset::new(1, 3);
82 assert_ne!(generate(&a), generate(&b));
83 }
84
85 #[test]
86 fn test_cursor_is_monotonic() {
87 // Offsets increase → cursors increase (numerically)
88 let a = Offset::new(1, 5);
89 let b = Offset::new(1, 10);
90 let c = Offset::new(2, 0);
91 let ca: u64 = generate(&a).parse().unwrap();
92 let cb: u64 = generate(&b).parse().unwrap();
93 let cc: u64 = generate(&c).parse().unwrap();
94 assert!(cb > ca);
95 assert!(cc > cb);
96 }
97
98 #[test]
99 fn test_cursor_fits_in_js_max_safe_integer() {
100 // Typical values: read_seq up to ~67M, byte_offset up to 10MB
101 let offset = Offset::new(67_000_000, 10_485_760);
102 let cursor: u64 = generate(&offset).parse().unwrap();
103 assert!(cursor < (1_u64 << 53));
104 }
105}