reddb_server/runtime/
kv_watch.rs1use crate::replication::cdc::{CdcBuffer, KvWatchEvent};
2use crate::runtime::KvStatsCounters;
3use std::collections::VecDeque;
4use std::time::{Duration, Instant};
5
6const WATCH_BUFFER_CAPACITY: usize = 1024;
7const WATCH_POLL_BATCH: usize = WATCH_BUFFER_CAPACITY * 4;
8
9enum KvWatchMatch {
10 Key(String),
11 Prefix(String),
12}
13
14pub struct KvWatchStream<'a> {
16 cdc: &'a CdcBuffer,
17 stats: &'a KvStatsCounters,
18 collection: String,
19 matcher: KvWatchMatch,
20 cursor_lsn: u64,
21 buffer: VecDeque<KvWatchEvent>,
22 dropped_event_count: u64,
23 idle_timeout: Duration,
24 last_activity: Instant,
25 active: bool,
26}
27
28impl<'a> KvWatchStream<'a> {
29 pub(crate) fn subscribe(
30 cdc: &'a CdcBuffer,
31 stats: &'a KvStatsCounters,
32 collection: impl Into<String>,
33 key: impl Into<String>,
34 from_lsn: Option<u64>,
35 idle_timeout_ms: u64,
36 ) -> Self {
37 Self::new(
38 cdc,
39 stats,
40 collection,
41 KvWatchMatch::Key(key.into()),
42 from_lsn,
43 idle_timeout_ms,
44 )
45 }
46
47 pub(crate) fn subscribe_prefix(
48 cdc: &'a CdcBuffer,
49 stats: &'a KvStatsCounters,
50 collection: impl Into<String>,
51 prefix: impl Into<String>,
52 from_lsn: Option<u64>,
53 idle_timeout_ms: u64,
54 ) -> Self {
55 Self::new(
56 cdc,
57 stats,
58 collection,
59 KvWatchMatch::Prefix(prefix.into()),
60 from_lsn,
61 idle_timeout_ms,
62 )
63 }
64
65 fn new(
66 cdc: &'a CdcBuffer,
67 stats: &'a KvStatsCounters,
68 collection: impl Into<String>,
69 matcher: KvWatchMatch,
70 from_lsn: Option<u64>,
71 idle_timeout_ms: u64,
72 ) -> Self {
73 stats.incr_watch_streams_active();
74 Self {
75 cursor_lsn: from_lsn.unwrap_or_else(|| cdc.current_lsn()),
76 cdc,
77 stats,
78 collection: collection.into(),
79 matcher,
80 buffer: VecDeque::with_capacity(WATCH_BUFFER_CAPACITY),
81 dropped_event_count: 0,
82 idle_timeout: Duration::from_millis(idle_timeout_ms.max(1)),
83 last_activity: Instant::now(),
84 active: true,
85 }
86 }
87
88 pub fn poll_next(&mut self) -> Option<KvWatchEvent> {
89 if !self.active {
90 return None;
91 }
92 if self.last_activity.elapsed() >= self.idle_timeout {
93 self.close();
94 return None;
95 }
96 self.last_activity = Instant::now();
97
98 if self.buffer.is_empty() {
99 self.fill_buffer();
100 }
101
102 self.buffer.pop_front().map(|mut event| {
103 event.dropped_event_count = self.dropped_event_count;
104 event
105 })
106 }
107
108 pub fn dropped_event_count(&self) -> u64 {
109 self.dropped_event_count
110 }
111
112 pub fn record_drop_count(&mut self, count: u64) {
113 self.dropped_event_count = self.dropped_event_count.saturating_add(count);
114 self.stats.add_watch_drops(count);
115 }
116
117 fn fill_buffer(&mut self) {
118 if let Some(oldest_lsn) = self.cdc.oldest_lsn() {
119 if self.cursor_lsn + 1 < oldest_lsn {
120 self.record_drop_count(oldest_lsn - self.cursor_lsn - 1);
121 self.cursor_lsn = oldest_lsn - 1;
122 }
123 }
124
125 for event in self.cdc.poll(self.cursor_lsn, WATCH_POLL_BATCH) {
126 self.cursor_lsn = event.lsn;
127 let Some(kv) = event.kv else {
128 continue;
129 };
130 if !self.matches(&kv) {
131 continue;
132 }
133 if self.buffer.len() >= WATCH_BUFFER_CAPACITY {
134 self.buffer.pop_front();
135 self.record_drop_count(1);
136 }
137 self.buffer.push_back(kv);
138 }
139 }
140
141 fn matches(&self, event: &KvWatchEvent) -> bool {
142 if event.collection != self.collection {
143 return false;
144 }
145 match &self.matcher {
146 KvWatchMatch::Key(key) => event.key == *key,
147 KvWatchMatch::Prefix(prefix) => event.key.starts_with(prefix),
148 }
149 }
150
151 fn close(&mut self) {
152 if self.active {
153 self.active = false;
154 self.stats.decr_watch_streams_active();
155 }
156 }
157}
158
159impl Drop for KvWatchStream<'_> {
160 fn drop(&mut self) {
161 self.close();
162 }
163}