Skip to main content

exoware_server/
stream.rs

1//! Live stream coordination for `store.stream.v1`.
2//!
3//! `StreamHub::publish` is called synchronously after `StoreEngine::put_batch`
4//! returns `Ok`. The hub only tracks the highest published batch sequence and
5//! wakes subscribers; each subscriber then pulls batches from the engine at its
6//! own pace, so live delivery is naturally paced by client reads instead of an
7//! internal per-subscriber backlog.
8
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12use bytes::Bytes;
13use connectrpc::ConnectError;
14use exoware_sdk::common::KvEntry;
15use exoware_sdk::keys::KeyCodec;
16use exoware_sdk::match_key::compile_payload_regex;
17use exoware_sdk::stream_filter::{validate_filter, CompiledBytesFilters, StreamFilter};
18use regex::bytes::Regex;
19use tokio::sync::Notify;
20
21/// `ErrorInfo.domain` used for all stream-service errors.
22pub const STREAM_ERROR_DOMAIN: &str = "store.stream";
23/// `ErrorInfo.reason` when a `since_sequence_number` or `Get(seq)` references a
24/// batch that has been pruned from the batch log.
25pub const REASON_BATCH_EVICTED: &str = "BATCH_EVICTED";
26/// `ErrorInfo.reason` when a `Get(seq)` references a sequence number greater
27/// than any that has ever been issued.
28pub const REASON_BATCH_NOT_FOUND: &str = "BATCH_NOT_FOUND";
29/// Metadata key on `BATCH_EVICTED` errors carrying the lowest retained seq.
30pub const METADATA_OLDEST_RETAINED: &str = "oldest_retained";
31
32#[derive(Clone)]
33pub(crate) struct CompiledKeyMatcher {
34    codec: KeyCodec,
35    regex: Regex,
36}
37
38#[derive(Clone)]
39pub(crate) struct CompiledMatchers {
40    pub keys: Vec<CompiledKeyMatcher>,
41    pub values: Option<CompiledBytesFilters>,
42}
43
44/// Validate and compile a `StreamFilter`. Shared between replay and live
45/// delivery so both paths match identically and regexes are compiled once per
46/// subscribe.
47pub(crate) fn compile_matchers(filter: &StreamFilter) -> Result<CompiledMatchers, ConnectError> {
48    validate_filter(filter).map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
49    let keys = filter
50        .match_keys
51        .iter()
52        .map(|mk| {
53            let regex = compile_payload_regex(&mk.payload_regex)
54                .map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
55            Ok(CompiledKeyMatcher {
56                codec: KeyCodec::new(mk.reserved_bits, mk.prefix),
57                regex,
58            })
59        })
60        .collect::<Result<Vec<_>, ConnectError>>()?;
61    let values = CompiledBytesFilters::compile(&filter.value_filters)
62        .map_err(|e| ConnectError::invalid_argument(format!("invalid value_filter: {e}")))?;
63    Ok(CompiledMatchers { keys, values })
64}
65
66/// Apply a compiled filter to a batch. First-match-wins per `(key, value)`.
67pub(crate) fn apply_filter(matchers: &CompiledMatchers, kvs: &[(Bytes, Bytes)]) -> Vec<KvEntry> {
68    let mut out = Vec::with_capacity(kvs.len());
69    'outer: for (k, v) in kvs {
70        let value_ok = matchers.values.as_ref().is_none_or(|m| m.matches(v));
71        if !value_ok {
72            continue;
73        }
74        for matcher in &matchers.keys {
75            if !matcher.codec.matches(k) {
76                continue;
77            }
78            let payload_len = matcher.codec.payload_capacity_bytes_for_key_len(k.len());
79            let Ok(payload) = matcher.codec.read_payload(k, 0, payload_len) else {
80                continue;
81            };
82            if matcher.regex.is_match(&payload) {
83                out.push(KvEntry {
84                    key: k.to_vec(),
85                    value: v.to_vec(),
86                    ..Default::default()
87                });
88                continue 'outer;
89            }
90        }
91    }
92    out
93}
94
95pub struct StreamHub {
96    published_sequence: AtomicU64,
97    notify: Arc<Notify>,
98}
99
100impl StreamHub {
101    pub fn new(initial_sequence: u64) -> Self {
102        Self {
103            published_sequence: AtomicU64::new(initial_sequence),
104            notify: Arc::new(Notify::new()),
105        }
106    }
107
108    /// Compile the filter and atomically snapshot the highest published batch
109    /// sequence that should be considered "already visible" to this
110    /// subscription. Later publishes wake the returned notifier.
111    pub(crate) fn subscribe(
112        &self,
113        filter: StreamFilter,
114    ) -> Result<(CompiledMatchers, u64, Arc<Notify>), ConnectError> {
115        let matchers = compile_matchers(&filter)?;
116        let floor = self.published_sequence.load(Ordering::Acquire);
117        Ok((matchers, floor, self.notify.clone()))
118    }
119
120    /// Announce a newly committed batch sequence to subscribers.
121    pub fn publish(&self, seq: u64) {
122        self.published_sequence.fetch_max(seq, Ordering::SeqCst);
123        self.notify.notify_waiters();
124    }
125
126    pub(crate) fn current_sequence(&self) -> u64 {
127        self.published_sequence.load(Ordering::Acquire)
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use exoware_sdk::kv_codec::Utf8;
135    use exoware_sdk::match_key::MatchKey;
136    use exoware_sdk::stream_filter::BytesFilter;
137
138    fn filter(prefix: u16, regex: &str) -> StreamFilter {
139        StreamFilter {
140            match_keys: vec![MatchKey {
141                reserved_bits: 4,
142                prefix,
143                payload_regex: Utf8::from(regex),
144            }],
145            value_filters: vec![],
146        }
147    }
148
149    fn filter_with_values(
150        prefix: u16,
151        regex: &str,
152        value_filters: Vec<BytesFilter>,
153    ) -> StreamFilter {
154        StreamFilter {
155            match_keys: vec![MatchKey {
156                reserved_bits: 4,
157                prefix,
158                payload_regex: Utf8::from(regex),
159            }],
160            value_filters,
161        }
162    }
163
164    fn key(family: u8, payload: &[u8]) -> Bytes {
165        let codec = KeyCodec::new(4, u16::from(family));
166        let key = codec.encode(payload).unwrap();
167        Bytes::copy_from_slice(key.as_ref())
168    }
169
170    #[test]
171    fn publish_sequence_is_monotonic() {
172        let hub = StreamHub::new(7);
173        assert_eq!(hub.current_sequence(), 7);
174        hub.publish(3);
175        assert_eq!(hub.current_sequence(), 7);
176        hub.publish(9);
177        assert_eq!(hub.current_sequence(), 9);
178    }
179
180    #[test]
181    fn subscribe_snapshots_current_sequence() {
182        let hub = StreamHub::new(11);
183        let (_matchers, floor, _notify) = hub.subscribe(filter(1, "(?s).*")).unwrap();
184        assert_eq!(floor, 11);
185    }
186
187    #[test]
188    fn apply_filter_still_selects_matching_entries() {
189        let matchers = compile_matchers(&filter(1, "(?s).*")).unwrap();
190        let kvs = vec![
191            (key(1, b"hit"), Bytes::from_static(b"v1")),
192            (key(2, b"miss"), Bytes::from_static(b"v2")),
193        ];
194        let entries = apply_filter(&matchers, &kvs);
195        assert_eq!(entries.len(), 1);
196        assert_eq!(entries[0].value.as_slice(), b"v1");
197    }
198
199    #[test]
200    fn subscribe_rejects_invalid_filter() {
201        let hub = StreamHub::new(0);
202        let bad = StreamFilter {
203            match_keys: vec![],
204            value_filters: vec![],
205        };
206        assert!(hub.subscribe(bad).is_err());
207    }
208
209    #[test]
210    fn value_filter_intersects_with_key_filter() {
211        let matchers = compile_matchers(&filter_with_values(
212            1,
213            "(?s).*",
214            vec![BytesFilter::Regex("^keep$".into())],
215        ))
216        .unwrap();
217        let kvs = vec![
218            (key(1, b"a"), Bytes::from_static(b"keep")),
219            (key(1, b"b"), Bytes::from_static(b"drop")),
220        ];
221        let entries = apply_filter(&matchers, &kvs);
222        assert_eq!(entries.len(), 1);
223        assert_eq!(entries[0].value.as_slice(), b"keep");
224    }
225
226    #[test]
227    fn value_filter_exact_match() {
228        let matchers = compile_matchers(&filter_with_values(
229            1,
230            "(?s).*",
231            vec![BytesFilter::Exact(b"target".to_vec())],
232        ))
233        .unwrap();
234        let kvs = vec![
235            (key(1, b"a"), Bytes::from_static(b"target")),
236            (key(1, b"b"), Bytes::from_static(b"other")),
237        ];
238        let entries = apply_filter(&matchers, &kvs);
239        assert_eq!(entries.len(), 1);
240        assert_eq!(entries[0].value.as_slice(), b"target");
241    }
242
243    #[test]
244    fn value_filter_empty_accepts_all_matching_keys() {
245        let matchers = compile_matchers(&filter(1, "(?s).*")).unwrap();
246        let kvs = vec![
247            (key(1, b"a"), Bytes::from_static(b"one")),
248            (key(1, b"b"), Bytes::from_static(b"two")),
249        ];
250        let entries = apply_filter(&matchers, &kvs);
251        assert_eq!(entries.len(), 2);
252    }
253}