Skip to main content

pushwire_client/
cursor.rs

1use dashmap::DashMap;
2use pushwire_core::ChannelKind;
3use std::collections::HashMap;
4
5/// Result of advancing a cursor.
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum CursorResult {
8    /// Cursor advanced sequentially (expected value).
9    Sequential,
10    /// Gap detected — frames were missed.
11    GapDetected { expected: u64, got: u64 },
12    /// Duplicate cursor — frame already seen, skip.
13    Duplicate,
14}
15
16/// Per-channel cursor tracker with gap detection.
17pub struct CursorTracker<C: ChannelKind> {
18    cursors: DashMap<C, u64>,
19}
20
21impl<C: ChannelKind> CursorTracker<C> {
22    pub fn new() -> Self {
23        Self {
24            cursors: DashMap::new(),
25        }
26    }
27
28    /// Process an incoming cursor value. Returns whether it was sequential,
29    /// a gap, or a duplicate.
30    pub fn advance(&self, channel: C, cursor: u64) -> CursorResult {
31        let prev = self.cursors.get(&channel).map(|v| *v).unwrap_or(0);
32        if cursor == prev + 1 || prev == 0 {
33            self.cursors.insert(channel, cursor);
34            CursorResult::Sequential
35        } else if cursor > prev + 1 {
36            self.cursors.insert(channel, cursor);
37            CursorResult::GapDetected {
38                expected: prev + 1,
39                got: cursor,
40            }
41        } else {
42            CursorResult::Duplicate
43        }
44    }
45
46    /// Export all cursors for resume handshake.
47    pub fn export(&self) -> HashMap<C, u64> {
48        self.cursors
49            .iter()
50            .map(|entry| (*entry.key(), *entry.value()))
51            .collect()
52    }
53
54    /// Reset all cursors (e.g., on full reconnect without resume).
55    pub fn reset(&self) {
56        self.cursors.clear();
57    }
58}
59
60impl<C: ChannelKind> Default for CursorTracker<C> {
61    fn default() -> Self {
62        Self::new()
63    }
64}