1use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14
15use bytes::Bytes;
16use connectrpc::ConnectError;
17use exoware_sdk::common::KvEntry;
18use exoware_sdk::keys::KeyCodec;
19use exoware_sdk::match_key::compile_payload_regex;
20use exoware_sdk::stream_filter::{validate_filter, CompiledBytesFilters, StreamFilter};
21use regex::bytes::Regex;
22use tokio::sync::Notify;
23
24pub const STREAM_ERROR_DOMAIN: &str = "store.stream";
26pub const REASON_BATCH_EVICTED: &str = "BATCH_EVICTED";
29pub const REASON_BATCH_NOT_FOUND: &str = "BATCH_NOT_FOUND";
32pub const METADATA_OLDEST_RETAINED: &str = "oldest_retained";
34
35#[derive(Clone)]
36pub(crate) struct CompiledKeyMatcher {
37 codec: KeyCodec,
38 regex: Regex,
39}
40
41#[derive(Clone)]
42pub(crate) struct CompiledMatchers {
43 pub keys: Vec<CompiledKeyMatcher>,
44 pub values: Option<CompiledBytesFilters>,
45}
46
47pub(crate) fn compile_matchers(filter: &StreamFilter) -> Result<CompiledMatchers, ConnectError> {
51 validate_filter(filter).map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
52 let keys = filter
53 .match_keys
54 .iter()
55 .map(|mk| {
56 let regex = compile_payload_regex(&mk.payload_regex)
57 .map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
58 Ok(CompiledKeyMatcher {
59 codec: KeyCodec::new(mk.reserved_bits, mk.prefix),
60 regex,
61 })
62 })
63 .collect::<Result<Vec<_>, ConnectError>>()?;
64 let values = CompiledBytesFilters::compile(&filter.value_filters)
65 .map_err(|e| ConnectError::invalid_argument(format!("invalid value_filter: {e}")))?;
66 Ok(CompiledMatchers { keys, values })
67}
68
69pub(crate) fn apply_filter(matchers: &CompiledMatchers, kvs: &[(Bytes, Bytes)]) -> Vec<KvEntry> {
71 let mut out = Vec::with_capacity(kvs.len());
72 'outer: for (k, v) in kvs {
73 let value_ok = matchers.values.as_ref().is_none_or(|m| m.matches(v));
74 if !value_ok {
75 continue;
76 }
77 for matcher in &matchers.keys {
78 if !matcher.codec.matches(k) {
79 continue;
80 }
81 let payload_len = matcher.codec.payload_capacity_bytes_for_key_len(k.len());
82 let Ok(payload) = matcher.codec.read_payload(k, 0, payload_len) else {
83 continue;
84 };
85 if matcher.regex.is_match(&payload) {
86 out.push(KvEntry {
87 key: k.to_vec(),
88 value: v.clone(),
89 ..Default::default()
90 });
91 continue 'outer;
92 }
93 }
94 }
95 out
96}
97
98#[derive(Clone)]
99pub struct StreamNotification {
100 pub current_sequence: u64,
101 pub notify: Arc<Notify>,
102}
103
104pub trait StreamNotifier: Send + Sync + 'static {
107 fn subscribe(&self) -> StreamNotification;
110
111 fn current_sequence(&self) -> u64;
113
114 fn advance(&self, seq: u64);
116}
117
118pub struct StreamHub {
119 published_sequence: AtomicU64,
120 notify: Arc<Notify>,
121}
122
123impl StreamHub {
124 pub fn new(initial_sequence: u64) -> Self {
125 Self {
126 published_sequence: AtomicU64::new(initial_sequence),
127 notify: Arc::new(Notify::new()),
128 }
129 }
130
131 pub fn publish(&self, seq: u64) {
133 self.advance(seq);
134 }
135}
136
137impl StreamNotifier for StreamHub {
138 fn subscribe(&self) -> StreamNotification {
139 StreamNotification {
140 current_sequence: self.published_sequence.load(Ordering::Acquire),
141 notify: self.notify.clone(),
142 }
143 }
144
145 fn current_sequence(&self) -> u64 {
146 self.published_sequence.load(Ordering::Acquire)
147 }
148
149 fn advance(&self, seq: u64) {
150 self.published_sequence.fetch_max(seq, Ordering::SeqCst);
151 self.notify.notify_waiters();
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use exoware_sdk::kv_codec::Utf8;
159 use exoware_sdk::match_key::MatchKey;
160 use exoware_sdk::stream_filter::BytesFilter;
161
162 fn filter(prefix: u16, regex: &str) -> StreamFilter {
163 StreamFilter {
164 match_keys: vec![MatchKey {
165 reserved_bits: 4,
166 prefix,
167 payload_regex: Utf8::from(regex),
168 }],
169 value_filters: vec![],
170 }
171 }
172
173 fn filter_with_values(
174 prefix: u16,
175 regex: &str,
176 value_filters: Vec<BytesFilter>,
177 ) -> StreamFilter {
178 StreamFilter {
179 match_keys: vec![MatchKey {
180 reserved_bits: 4,
181 prefix,
182 payload_regex: Utf8::from(regex),
183 }],
184 value_filters,
185 }
186 }
187
188 fn key(family: u8, payload: &[u8]) -> Bytes {
189 let codec = KeyCodec::new(4, u16::from(family));
190 let key = codec.encode(payload).unwrap();
191 Bytes::copy_from_slice(key.as_ref())
192 }
193
194 #[test]
195 fn publish_sequence_is_monotonic() {
196 let hub = StreamHub::new(7);
197 assert_eq!(hub.current_sequence(), 7);
198 hub.publish(3);
199 assert_eq!(hub.current_sequence(), 7);
200 hub.publish(9);
201 assert_eq!(hub.current_sequence(), 9);
202 }
203
204 #[test]
205 fn subscribe_snapshots_current_sequence() {
206 let hub = StreamHub::new(11);
207 let subscription = hub.subscribe();
208 assert_eq!(subscription.current_sequence, 11);
209 }
210
211 #[test]
212 fn apply_filter_still_selects_matching_entries() {
213 let matchers = compile_matchers(&filter(1, "(?s).*")).unwrap();
214 let kvs = vec![
215 (key(1, b"hit"), Bytes::from_static(b"v1")),
216 (key(2, b"miss"), Bytes::from_static(b"v2")),
217 ];
218 let entries = apply_filter(&matchers, &kvs);
219 assert_eq!(entries.len(), 1);
220 assert_eq!(entries[0].value.as_ref(), b"v1");
221 }
222
223 #[test]
224 fn subscribe_rejects_invalid_filter() {
225 let bad = StreamFilter {
226 match_keys: vec![],
227 value_filters: vec![],
228 };
229 assert!(compile_matchers(&bad).is_err());
230 }
231
232 #[test]
233 fn value_filter_intersects_with_key_filter() {
234 let matchers = compile_matchers(&filter_with_values(
235 1,
236 "(?s).*",
237 vec![BytesFilter::Regex("^keep$".into())],
238 ))
239 .unwrap();
240 let kvs = vec![
241 (key(1, b"a"), Bytes::from_static(b"keep")),
242 (key(1, b"b"), Bytes::from_static(b"drop")),
243 ];
244 let entries = apply_filter(&matchers, &kvs);
245 assert_eq!(entries.len(), 1);
246 assert_eq!(entries[0].value.as_ref(), b"keep");
247 }
248
249 #[test]
250 fn value_filter_exact_match() {
251 let matchers = compile_matchers(&filter_with_values(
252 1,
253 "(?s).*",
254 vec![BytesFilter::Exact(Bytes::from_static(b"target"))],
255 ))
256 .unwrap();
257 let kvs = vec![
258 (key(1, b"a"), Bytes::from_static(b"target")),
259 (key(1, b"b"), Bytes::from_static(b"other")),
260 ];
261 let entries = apply_filter(&matchers, &kvs);
262 assert_eq!(entries.len(), 1);
263 assert_eq!(entries[0].value.as_ref(), b"target");
264 }
265
266 #[test]
267 fn value_filter_empty_accepts_all_matching_keys() {
268 let matchers = compile_matchers(&filter(1, "(?s).*")).unwrap();
269 let kvs = vec![
270 (key(1, b"a"), Bytes::from_static(b"one")),
271 (key(1, b"b"), Bytes::from_static(b"two")),
272 ];
273 let entries = apply_filter(&matchers, &kvs);
274 assert_eq!(entries.len(), 2);
275 }
276}