Skip to main content

exoware_server/
stream.rs

1//! Live stream coordination for `store.stream.v1`.
2//!
3//! A [`StreamNotifier`] tracks the highest published batch sequence and wakes
4//! subscribers. Each subscriber then pulls batches from the log at its own
5//! pace, so live delivery is naturally paced by client reads instead of an
6//! internal per-subscriber backlog.
7//!
8//! `StreamNotifier` is an in-process coordination primitive. Split deployments
9//! need a separate remote notification path that advances a local notifier after
10//! the query worker can serve the announced batches.
11
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14
15use bytes::Bytes;
16use connectrpc::ConnectError;
17use exoware_sdk::common::KvEntry;
18use exoware_sdk::keys::KeyCodec;
19use exoware_sdk::match_key::compile_payload_regex;
20use exoware_sdk::stream_filter::{validate_filter, CompiledBytesFilters, StreamFilter};
21use regex::bytes::Regex;
22use tokio::sync::Notify;
23
24/// `ErrorInfo.domain` used for all stream-service errors.
25pub const STREAM_ERROR_DOMAIN: &str = "store.stream";
26/// `ErrorInfo.reason` when a `since_sequence_number` or `Get(seq)` references a
27/// batch that has been pruned from the log.
28pub const REASON_BATCH_EVICTED: &str = "BATCH_EVICTED";
29/// `ErrorInfo.reason` when a `Get(seq)` references a sequence number greater
30/// than any that has ever been issued.
31pub const REASON_BATCH_NOT_FOUND: &str = "BATCH_NOT_FOUND";
32/// Metadata key on `BATCH_EVICTED` errors carrying the lowest retained seq.
33pub const METADATA_OLDEST_RETAINED: &str = "oldest_retained";
34
35#[derive(Clone)]
36pub(crate) struct CompiledKeyMatcher {
37    codec: KeyCodec,
38    regex: Regex,
39}
40
41#[derive(Clone)]
42pub(crate) struct CompiledMatchers {
43    pub keys: Vec<CompiledKeyMatcher>,
44    pub values: Option<CompiledBytesFilters>,
45}
46
47/// Validate and compile a `StreamFilter`. Shared between replay and live
48/// delivery so both paths match identically and regexes are compiled once per
49/// subscribe.
50pub(crate) fn compile_matchers(filter: &StreamFilter) -> Result<CompiledMatchers, ConnectError> {
51    validate_filter(filter).map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
52    let keys = filter
53        .match_keys
54        .iter()
55        .map(|mk| {
56            let regex = compile_payload_regex(&mk.payload_regex)
57                .map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
58            Ok(CompiledKeyMatcher {
59                codec: KeyCodec::new(mk.reserved_bits, mk.prefix),
60                regex,
61            })
62        })
63        .collect::<Result<Vec<_>, ConnectError>>()?;
64    let values = CompiledBytesFilters::compile(&filter.value_filters)
65        .map_err(|e| ConnectError::invalid_argument(format!("invalid value_filter: {e}")))?;
66    Ok(CompiledMatchers { keys, values })
67}
68
69/// Apply a compiled filter to a batch. First-match-wins per `(key, value)`.
70pub(crate) fn apply_filter(matchers: &CompiledMatchers, kvs: &[(Bytes, Bytes)]) -> Vec<KvEntry> {
71    let mut out = Vec::with_capacity(kvs.len());
72    'outer: for (k, v) in kvs {
73        let value_ok = matchers.values.as_ref().is_none_or(|m| m.matches(v));
74        if !value_ok {
75            continue;
76        }
77        for matcher in &matchers.keys {
78            if !matcher.codec.matches(k) {
79                continue;
80            }
81            let payload_len = matcher.codec.payload_capacity_bytes_for_key_len(k.len());
82            let Ok(payload) = matcher.codec.read_payload(k, 0, payload_len) else {
83                continue;
84            };
85            if matcher.regex.is_match(&payload) {
86                out.push(KvEntry {
87                    key: k.to_vec(),
88                    value: v.clone(),
89                    ..Default::default()
90                });
91                continue 'outer;
92            }
93        }
94    }
95    out
96}
97
98#[derive(Clone)]
99pub struct StreamNotification {
100    pub current_sequence: u64,
101    pub notify: Arc<Notify>,
102}
103
104// TODO (#56): Add a separate remote stream notification abstraction for split deployments.
105/// In-process notification capability for stream subscribers.
106pub trait StreamNotifier: Send + Sync + 'static {
107    /// Atomically snapshot the visible batch frontier and return a notifier
108    /// that wakes when the frontier may have advanced.
109    fn subscribe(&self) -> StreamNotification;
110
111    /// Highest batch sequence currently visible to live subscribers.
112    fn current_sequence(&self) -> u64;
113
114    /// Announce that batches through `seq` may now be visible.
115    fn advance(&self, seq: u64);
116}
117
118pub struct StreamHub {
119    published_sequence: AtomicU64,
120    notify: Arc<Notify>,
121}
122
123impl StreamHub {
124    pub fn new(initial_sequence: u64) -> Self {
125        Self {
126            published_sequence: AtomicU64::new(initial_sequence),
127            notify: Arc::new(Notify::new()),
128        }
129    }
130
131    /// Announce a newly committed batch sequence to subscribers.
132    pub fn publish(&self, seq: u64) {
133        self.advance(seq);
134    }
135}
136
137impl StreamNotifier for StreamHub {
138    fn subscribe(&self) -> StreamNotification {
139        StreamNotification {
140            current_sequence: self.published_sequence.load(Ordering::Acquire),
141            notify: self.notify.clone(),
142        }
143    }
144
145    fn current_sequence(&self) -> u64 {
146        self.published_sequence.load(Ordering::Acquire)
147    }
148
149    fn advance(&self, seq: u64) {
150        self.published_sequence.fetch_max(seq, Ordering::SeqCst);
151        self.notify.notify_waiters();
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use exoware_sdk::kv_codec::Utf8;
159    use exoware_sdk::match_key::MatchKey;
160    use exoware_sdk::stream_filter::BytesFilter;
161
162    fn filter(prefix: u16, regex: &str) -> StreamFilter {
163        StreamFilter {
164            match_keys: vec![MatchKey {
165                reserved_bits: 4,
166                prefix,
167                payload_regex: Utf8::from(regex),
168            }],
169            value_filters: vec![],
170        }
171    }
172
173    fn filter_with_values(
174        prefix: u16,
175        regex: &str,
176        value_filters: Vec<BytesFilter>,
177    ) -> StreamFilter {
178        StreamFilter {
179            match_keys: vec![MatchKey {
180                reserved_bits: 4,
181                prefix,
182                payload_regex: Utf8::from(regex),
183            }],
184            value_filters,
185        }
186    }
187
188    fn key(family: u8, payload: &[u8]) -> Bytes {
189        let codec = KeyCodec::new(4, u16::from(family));
190        let key = codec.encode(payload).unwrap();
191        Bytes::copy_from_slice(key.as_ref())
192    }
193
194    #[test]
195    fn publish_sequence_is_monotonic() {
196        let hub = StreamHub::new(7);
197        assert_eq!(hub.current_sequence(), 7);
198        hub.publish(3);
199        assert_eq!(hub.current_sequence(), 7);
200        hub.publish(9);
201        assert_eq!(hub.current_sequence(), 9);
202    }
203
204    #[test]
205    fn subscribe_snapshots_current_sequence() {
206        let hub = StreamHub::new(11);
207        let subscription = hub.subscribe();
208        assert_eq!(subscription.current_sequence, 11);
209    }
210
211    #[test]
212    fn apply_filter_still_selects_matching_entries() {
213        let matchers = compile_matchers(&filter(1, "(?s).*")).unwrap();
214        let kvs = vec![
215            (key(1, b"hit"), Bytes::from_static(b"v1")),
216            (key(2, b"miss"), Bytes::from_static(b"v2")),
217        ];
218        let entries = apply_filter(&matchers, &kvs);
219        assert_eq!(entries.len(), 1);
220        assert_eq!(entries[0].value.as_ref(), b"v1");
221    }
222
223    #[test]
224    fn subscribe_rejects_invalid_filter() {
225        let bad = StreamFilter {
226            match_keys: vec![],
227            value_filters: vec![],
228        };
229        assert!(compile_matchers(&bad).is_err());
230    }
231
232    #[test]
233    fn value_filter_intersects_with_key_filter() {
234        let matchers = compile_matchers(&filter_with_values(
235            1,
236            "(?s).*",
237            vec![BytesFilter::Regex("^keep$".into())],
238        ))
239        .unwrap();
240        let kvs = vec![
241            (key(1, b"a"), Bytes::from_static(b"keep")),
242            (key(1, b"b"), Bytes::from_static(b"drop")),
243        ];
244        let entries = apply_filter(&matchers, &kvs);
245        assert_eq!(entries.len(), 1);
246        assert_eq!(entries[0].value.as_ref(), b"keep");
247    }
248
249    #[test]
250    fn value_filter_exact_match() {
251        let matchers = compile_matchers(&filter_with_values(
252            1,
253            "(?s).*",
254            vec![BytesFilter::Exact(Bytes::from_static(b"target"))],
255        ))
256        .unwrap();
257        let kvs = vec![
258            (key(1, b"a"), Bytes::from_static(b"target")),
259            (key(1, b"b"), Bytes::from_static(b"other")),
260        ];
261        let entries = apply_filter(&matchers, &kvs);
262        assert_eq!(entries.len(), 1);
263        assert_eq!(entries[0].value.as_ref(), b"target");
264    }
265
266    #[test]
267    fn value_filter_empty_accepts_all_matching_keys() {
268        let matchers = compile_matchers(&filter(1, "(?s).*")).unwrap();
269        let kvs = vec![
270            (key(1, b"a"), Bytes::from_static(b"one")),
271            (key(1, b"b"), Bytes::from_static(b"two")),
272        ];
273        let entries = apply_filter(&matchers, &kvs);
274        assert_eq!(entries.len(), 2);
275    }
276}