Skip to main content

reddb_server/runtime/
kv_watch.rs

1use crate::replication::cdc::{CdcBuffer, KvWatchEvent};
2use crate::runtime::KvStatsCounters;
3use std::collections::VecDeque;
4use std::time::{Duration, Instant};
5
6const WATCH_BUFFER_CAPACITY: usize = 1024;
7const WATCH_POLL_BATCH: usize = WATCH_BUFFER_CAPACITY * 4;
8
9enum KvWatchMatch {
10    Key(String),
11    Prefix(String),
12}
13
14/// CDC-backed WATCH cursor for one normal KV key or key prefix.
15pub struct KvWatchStream<'a> {
16    cdc: &'a CdcBuffer,
17    stats: &'a KvStatsCounters,
18    collection: String,
19    matcher: KvWatchMatch,
20    cursor_lsn: u64,
21    buffer: VecDeque<KvWatchEvent>,
22    dropped_event_count: u64,
23    idle_timeout: Duration,
24    last_activity: Instant,
25    active: bool,
26}
27
28impl<'a> KvWatchStream<'a> {
29    pub(crate) fn subscribe(
30        cdc: &'a CdcBuffer,
31        stats: &'a KvStatsCounters,
32        collection: impl Into<String>,
33        key: impl Into<String>,
34        from_lsn: Option<u64>,
35        idle_timeout_ms: u64,
36    ) -> Self {
37        Self::new(
38            cdc,
39            stats,
40            collection,
41            KvWatchMatch::Key(key.into()),
42            from_lsn,
43            idle_timeout_ms,
44        )
45    }
46
47    pub(crate) fn subscribe_prefix(
48        cdc: &'a CdcBuffer,
49        stats: &'a KvStatsCounters,
50        collection: impl Into<String>,
51        prefix: impl Into<String>,
52        from_lsn: Option<u64>,
53        idle_timeout_ms: u64,
54    ) -> Self {
55        Self::new(
56            cdc,
57            stats,
58            collection,
59            KvWatchMatch::Prefix(prefix.into()),
60            from_lsn,
61            idle_timeout_ms,
62        )
63    }
64
65    fn new(
66        cdc: &'a CdcBuffer,
67        stats: &'a KvStatsCounters,
68        collection: impl Into<String>,
69        matcher: KvWatchMatch,
70        from_lsn: Option<u64>,
71        idle_timeout_ms: u64,
72    ) -> Self {
73        stats.incr_watch_streams_active();
74        Self {
75            cursor_lsn: from_lsn.unwrap_or_else(|| cdc.current_lsn()),
76            cdc,
77            stats,
78            collection: collection.into(),
79            matcher,
80            buffer: VecDeque::with_capacity(WATCH_BUFFER_CAPACITY),
81            dropped_event_count: 0,
82            idle_timeout: Duration::from_millis(idle_timeout_ms.max(1)),
83            last_activity: Instant::now(),
84            active: true,
85        }
86    }
87
88    pub fn poll_next(&mut self) -> Option<KvWatchEvent> {
89        if !self.active {
90            return None;
91        }
92        if self.last_activity.elapsed() >= self.idle_timeout {
93            self.close();
94            return None;
95        }
96        self.last_activity = Instant::now();
97
98        if self.buffer.is_empty() {
99            self.fill_buffer();
100        }
101
102        self.buffer.pop_front().map(|mut event| {
103            event.dropped_event_count = self.dropped_event_count;
104            event
105        })
106    }
107
108    pub fn dropped_event_count(&self) -> u64 {
109        self.dropped_event_count
110    }
111
112    pub fn record_drop_count(&mut self, count: u64) {
113        self.dropped_event_count = self.dropped_event_count.saturating_add(count);
114        self.stats.add_watch_drops(count);
115    }
116
117    fn fill_buffer(&mut self) {
118        if let Some(oldest_lsn) = self.cdc.oldest_lsn() {
119            if self.cursor_lsn + 1 < oldest_lsn {
120                self.record_drop_count(oldest_lsn - self.cursor_lsn - 1);
121                self.cursor_lsn = oldest_lsn - 1;
122            }
123        }
124
125        for event in self.cdc.poll(self.cursor_lsn, WATCH_POLL_BATCH) {
126            self.cursor_lsn = event.lsn;
127            let Some(kv) = event.kv else {
128                continue;
129            };
130            if !self.matches(&kv) {
131                continue;
132            }
133            if self.buffer.len() >= WATCH_BUFFER_CAPACITY {
134                self.buffer.pop_front();
135                self.record_drop_count(1);
136            }
137            self.buffer.push_back(kv);
138        }
139    }
140
141    fn matches(&self, event: &KvWatchEvent) -> bool {
142        if event.collection != self.collection {
143            return false;
144        }
145        match &self.matcher {
146            KvWatchMatch::Key(key) => event.key == *key,
147            KvWatchMatch::Prefix(prefix) => event.key.starts_with(prefix),
148        }
149    }
150
151    fn close(&mut self) {
152        if self.active {
153            self.active = false;
154            self.stats.decr_watch_streams_active();
155        }
156    }
157}
158
159impl Drop for KvWatchStream<'_> {
160    fn drop(&mut self) {
161        self.close();
162    }
163}