Skip to main content

astra_core/
watch.rs

1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4
5use parking_lot::RwLock;
6use tokio::sync::broadcast;
7use tracing::debug;
8
9use crate::config::WatchBacklogMode;
10
11#[derive(Debug, Clone)]
12pub enum WatchEventKind {
13    Put,
14    Delete,
15}
16
17#[derive(Debug, Clone)]
18pub struct WatchEvent {
19    pub kind: WatchEventKind,
20    pub key: Vec<u8>,
21    pub value: Arc<[u8]>,
22    pub prev_value: Arc<[u8]>,
23    pub create_revision: i64,
24    pub mod_revision: i64,
25    pub version: i64,
26    pub lease: i64,
27}
28
29#[derive(Debug, Clone)]
30pub struct WatchFilter {
31    pub key: Vec<u8>,
32    pub range_end: Vec<u8>,
33    pub start_revision: i64,
34}
35
36impl WatchFilter {
37    pub fn matches(&self, candidate: &[u8]) -> bool {
38        if self.range_end.is_empty() {
39            return candidate == self.key.as_slice();
40        }
41
42        if self.key.is_empty() && self.range_end == [0] {
43            return true;
44        }
45
46        candidate >= self.key.as_slice() && candidate < self.range_end.as_slice()
47    }
48}
49
50pub struct WatchSubscription {
51    pub backlog: Vec<Arc<WatchEvent>>,
52    pub receiver: broadcast::Receiver<Arc<WatchEvent>>,
53}
54
55#[derive(Debug)]
56pub struct WatchRing {
57    ring: RwLock<VecDeque<Arc<WatchEvent>>>,
58    ring_bytes: AtomicUsize,
59    capacity: usize,
60    tx: broadcast::Sender<Arc<WatchEvent>>,
61    backlog_mode: WatchBacklogMode,
62    dropped_no_subscriber: AtomicUsize,
63}
64
65impl WatchRing {
66    pub fn new(
67        ring_capacity: usize,
68        broadcast_capacity: usize,
69        backlog_mode: WatchBacklogMode,
70    ) -> Self {
71        let (tx, _rx) = broadcast::channel(broadcast_capacity.max(32));
72        Self {
73            ring: RwLock::new(VecDeque::with_capacity(ring_capacity)),
74            ring_bytes: AtomicUsize::new(0),
75            capacity: ring_capacity,
76            tx,
77            backlog_mode,
78            dropped_no_subscriber: AtomicUsize::new(0),
79        }
80    }
81
82    pub fn publish(&self, event: WatchEvent) {
83        let event = Arc::new(event);
84        let subscribers = self.tx.receiver_count();
85        let keep_backlog = match self.backlog_mode {
86            WatchBacklogMode::Strict => true,
87            WatchBacklogMode::Relaxed => subscribers > 0,
88        };
89
90        if keep_backlog {
91            let mut ring = self.ring.write();
92            let mut ring_bytes = self.ring_bytes.load(Ordering::Relaxed);
93            while ring.len() >= self.capacity && !ring.is_empty() {
94                if let Some(ev) = ring.pop_front() {
95                    ring_bytes = ring_bytes.saturating_sub(event_bytes(&ev));
96                }
97            }
98
99            ring.push_back(event.clone());
100            ring_bytes = ring_bytes.saturating_add(event_bytes(&event));
101            self.ring_bytes.store(ring_bytes, Ordering::Relaxed);
102        } else if subscribers == 0 {
103            let dropped = self.dropped_no_subscriber.fetch_add(1, Ordering::Relaxed) + 1;
104            if dropped % 4096 == 0 {
105                debug!(
106                    dropped,
107                    ring_len = self.ring.read().len(),
108                    ring_bytes = self.ring_bytes.load(Ordering::Relaxed),
109                    "watch event dropped because no subscribers and backlog_mode=relaxed"
110                );
111            }
112        }
113
114        if subscribers > 0 {
115            let _ = self.tx.send(event);
116        }
117    }
118
119    pub fn subscribe(&self, filter: &WatchFilter) -> WatchSubscription {
120        let backlog = {
121            let ring = self.ring.read();
122            ring.iter()
123                .filter(|ev| {
124                    ev.mod_revision >= filter.start_revision && filter.matches(ev.key.as_slice())
125                })
126                .cloned()
127                .collect::<Vec<_>>()
128        };
129
130        WatchSubscription {
131            backlog,
132            receiver: self.tx.subscribe(),
133        }
134    }
135}
136
137fn event_bytes(event: &WatchEvent) -> usize {
138    event
139        .key
140        .len()
141        .saturating_add(event.value.len())
142        .saturating_add(event.prev_value.len())
143        .saturating_add(64)
144}