1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4
5use parking_lot::RwLock;
6use tokio::sync::broadcast;
7use tracing::debug;
8
9use crate::config::WatchBacklogMode;
10
11#[derive(Debug, Clone)]
12pub enum WatchEventKind {
13 Put,
14 Delete,
15}
16
17#[derive(Debug, Clone)]
18pub struct WatchEvent {
19 pub kind: WatchEventKind,
20 pub key: Vec<u8>,
21 pub value: Arc<[u8]>,
22 pub prev_value: Arc<[u8]>,
23 pub create_revision: i64,
24 pub mod_revision: i64,
25 pub version: i64,
26 pub lease: i64,
27}
28
29#[derive(Debug, Clone)]
30pub struct WatchFilter {
31 pub key: Vec<u8>,
32 pub range_end: Vec<u8>,
33 pub start_revision: i64,
34}
35
36impl WatchFilter {
37 pub fn matches(&self, candidate: &[u8]) -> bool {
38 if self.range_end.is_empty() {
39 return candidate == self.key.as_slice();
40 }
41
42 if self.key.is_empty() && self.range_end == [0] {
43 return true;
44 }
45
46 candidate >= self.key.as_slice() && candidate < self.range_end.as_slice()
47 }
48}
49
50pub struct WatchSubscription {
51 pub backlog: Vec<Arc<WatchEvent>>,
52 pub receiver: broadcast::Receiver<Arc<WatchEvent>>,
53}
54
55#[derive(Debug)]
56pub struct WatchRing {
57 ring: RwLock<VecDeque<Arc<WatchEvent>>>,
58 ring_bytes: AtomicUsize,
59 capacity: usize,
60 tx: broadcast::Sender<Arc<WatchEvent>>,
61 backlog_mode: WatchBacklogMode,
62 dropped_no_subscriber: AtomicUsize,
63}
64
65impl WatchRing {
66 pub fn new(
67 ring_capacity: usize,
68 broadcast_capacity: usize,
69 backlog_mode: WatchBacklogMode,
70 ) -> Self {
71 let (tx, _rx) = broadcast::channel(broadcast_capacity.max(32));
72 Self {
73 ring: RwLock::new(VecDeque::with_capacity(ring_capacity)),
74 ring_bytes: AtomicUsize::new(0),
75 capacity: ring_capacity,
76 tx,
77 backlog_mode,
78 dropped_no_subscriber: AtomicUsize::new(0),
79 }
80 }
81
82 pub fn publish(&self, event: WatchEvent) {
83 let event = Arc::new(event);
84 let subscribers = self.tx.receiver_count();
85 let keep_backlog = match self.backlog_mode {
86 WatchBacklogMode::Strict => true,
87 WatchBacklogMode::Relaxed => subscribers > 0,
88 };
89
90 if keep_backlog {
91 let mut ring = self.ring.write();
92 let mut ring_bytes = self.ring_bytes.load(Ordering::Relaxed);
93 while ring.len() >= self.capacity && !ring.is_empty() {
94 if let Some(ev) = ring.pop_front() {
95 ring_bytes = ring_bytes.saturating_sub(event_bytes(&ev));
96 }
97 }
98
99 ring.push_back(event.clone());
100 ring_bytes = ring_bytes.saturating_add(event_bytes(&event));
101 self.ring_bytes.store(ring_bytes, Ordering::Relaxed);
102 } else if subscribers == 0 {
103 let dropped = self.dropped_no_subscriber.fetch_add(1, Ordering::Relaxed) + 1;
104 if dropped % 4096 == 0 {
105 debug!(
106 dropped,
107 ring_len = self.ring.read().len(),
108 ring_bytes = self.ring_bytes.load(Ordering::Relaxed),
109 "watch event dropped because no subscribers and backlog_mode=relaxed"
110 );
111 }
112 }
113
114 if subscribers > 0 {
115 let _ = self.tx.send(event);
116 }
117 }
118
119 pub fn subscribe(&self, filter: &WatchFilter) -> WatchSubscription {
120 let backlog = {
121 let ring = self.ring.read();
122 ring.iter()
123 .filter(|ev| {
124 ev.mod_revision >= filter.start_revision && filter.matches(ev.key.as_slice())
125 })
126 .cloned()
127 .collect::<Vec<_>>()
128 };
129
130 WatchSubscription {
131 backlog,
132 receiver: self.tx.subscribe(),
133 }
134 }
135}
136
137fn event_bytes(event: &WatchEvent) -> usize {
138 event
139 .key
140 .len()
141 .saturating_add(event.value.len())
142 .saturating_add(event.prev_value.len())
143 .saturating_add(64)
144}