1use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12use bytes::Bytes;
13use connectrpc::ConnectError;
14use exoware_sdk::common::KvEntry;
15use exoware_sdk::keys::KeyCodec;
16use exoware_sdk::match_key::compile_payload_regex;
17use exoware_sdk::stream_filter::{validate_filter, CompiledBytesFilters, StreamFilter};
18use regex::bytes::Regex;
19use tokio::sync::Notify;
20
21pub const STREAM_ERROR_DOMAIN: &str = "store.stream";
23pub const REASON_BATCH_EVICTED: &str = "BATCH_EVICTED";
26pub const REASON_BATCH_NOT_FOUND: &str = "BATCH_NOT_FOUND";
29pub const METADATA_OLDEST_RETAINED: &str = "oldest_retained";
31
32#[derive(Clone)]
33pub(crate) struct CompiledKeyMatcher {
34 codec: KeyCodec,
35 regex: Regex,
36}
37
38#[derive(Clone)]
39pub(crate) struct CompiledMatchers {
40 pub keys: Vec<CompiledKeyMatcher>,
41 pub values: Option<CompiledBytesFilters>,
42}
43
44pub(crate) fn compile_matchers(filter: &StreamFilter) -> Result<CompiledMatchers, ConnectError> {
48 validate_filter(filter).map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
49 let keys = filter
50 .match_keys
51 .iter()
52 .map(|mk| {
53 let regex = compile_payload_regex(&mk.payload_regex)
54 .map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
55 Ok(CompiledKeyMatcher {
56 codec: KeyCodec::new(mk.reserved_bits, mk.prefix),
57 regex,
58 })
59 })
60 .collect::<Result<Vec<_>, ConnectError>>()?;
61 let values = CompiledBytesFilters::compile(&filter.value_filters)
62 .map_err(|e| ConnectError::invalid_argument(format!("invalid value_filter: {e}")))?;
63 Ok(CompiledMatchers { keys, values })
64}
65
66pub(crate) fn apply_filter(matchers: &CompiledMatchers, kvs: &[(Bytes, Bytes)]) -> Vec<KvEntry> {
68 let mut out = Vec::with_capacity(kvs.len());
69 'outer: for (k, v) in kvs {
70 let value_ok = matchers.values.as_ref().is_none_or(|m| m.matches(v));
71 if !value_ok {
72 continue;
73 }
74 for matcher in &matchers.keys {
75 if !matcher.codec.matches(k) {
76 continue;
77 }
78 let payload_len = matcher.codec.payload_capacity_bytes_for_key_len(k.len());
79 let Ok(payload) = matcher.codec.read_payload(k, 0, payload_len) else {
80 continue;
81 };
82 if matcher.regex.is_match(&payload) {
83 out.push(KvEntry {
84 key: k.to_vec(),
85 value: v.to_vec(),
86 ..Default::default()
87 });
88 continue 'outer;
89 }
90 }
91 }
92 out
93}
94
95pub struct StreamHub {
96 published_sequence: AtomicU64,
97 notify: Arc<Notify>,
98}
99
100impl StreamHub {
101 pub fn new(initial_sequence: u64) -> Self {
102 Self {
103 published_sequence: AtomicU64::new(initial_sequence),
104 notify: Arc::new(Notify::new()),
105 }
106 }
107
108 pub(crate) fn subscribe(
112 &self,
113 filter: StreamFilter,
114 ) -> Result<(CompiledMatchers, u64, Arc<Notify>), ConnectError> {
115 let matchers = compile_matchers(&filter)?;
116 let floor = self.published_sequence.load(Ordering::Acquire);
117 Ok((matchers, floor, self.notify.clone()))
118 }
119
120 pub fn publish(&self, seq: u64) {
122 self.published_sequence.fetch_max(seq, Ordering::SeqCst);
123 self.notify.notify_waiters();
124 }
125
126 pub(crate) fn current_sequence(&self) -> u64 {
127 self.published_sequence.load(Ordering::Acquire)
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use exoware_sdk::kv_codec::Utf8;
135 use exoware_sdk::match_key::MatchKey;
136 use exoware_sdk::stream_filter::BytesFilter;
137
138 fn filter(prefix: u16, regex: &str) -> StreamFilter {
139 StreamFilter {
140 match_keys: vec![MatchKey {
141 reserved_bits: 4,
142 prefix,
143 payload_regex: Utf8::from(regex),
144 }],
145 value_filters: vec![],
146 }
147 }
148
149 fn filter_with_values(
150 prefix: u16,
151 regex: &str,
152 value_filters: Vec<BytesFilter>,
153 ) -> StreamFilter {
154 StreamFilter {
155 match_keys: vec![MatchKey {
156 reserved_bits: 4,
157 prefix,
158 payload_regex: Utf8::from(regex),
159 }],
160 value_filters,
161 }
162 }
163
164 fn key(family: u8, payload: &[u8]) -> Bytes {
165 let codec = KeyCodec::new(4, u16::from(family));
166 let key = codec.encode(payload).unwrap();
167 Bytes::copy_from_slice(key.as_ref())
168 }
169
170 #[test]
171 fn publish_sequence_is_monotonic() {
172 let hub = StreamHub::new(7);
173 assert_eq!(hub.current_sequence(), 7);
174 hub.publish(3);
175 assert_eq!(hub.current_sequence(), 7);
176 hub.publish(9);
177 assert_eq!(hub.current_sequence(), 9);
178 }
179
180 #[test]
181 fn subscribe_snapshots_current_sequence() {
182 let hub = StreamHub::new(11);
183 let (_matchers, floor, _notify) = hub.subscribe(filter(1, "(?s).*")).unwrap();
184 assert_eq!(floor, 11);
185 }
186
187 #[test]
188 fn apply_filter_still_selects_matching_entries() {
189 let matchers = compile_matchers(&filter(1, "(?s).*")).unwrap();
190 let kvs = vec![
191 (key(1, b"hit"), Bytes::from_static(b"v1")),
192 (key(2, b"miss"), Bytes::from_static(b"v2")),
193 ];
194 let entries = apply_filter(&matchers, &kvs);
195 assert_eq!(entries.len(), 1);
196 assert_eq!(entries[0].value.as_slice(), b"v1");
197 }
198
199 #[test]
200 fn subscribe_rejects_invalid_filter() {
201 let hub = StreamHub::new(0);
202 let bad = StreamFilter {
203 match_keys: vec![],
204 value_filters: vec![],
205 };
206 assert!(hub.subscribe(bad).is_err());
207 }
208
209 #[test]
210 fn value_filter_intersects_with_key_filter() {
211 let matchers = compile_matchers(&filter_with_values(
212 1,
213 "(?s).*",
214 vec![BytesFilter::Regex("^keep$".into())],
215 ))
216 .unwrap();
217 let kvs = vec![
218 (key(1, b"a"), Bytes::from_static(b"keep")),
219 (key(1, b"b"), Bytes::from_static(b"drop")),
220 ];
221 let entries = apply_filter(&matchers, &kvs);
222 assert_eq!(entries.len(), 1);
223 assert_eq!(entries[0].value.as_slice(), b"keep");
224 }
225
226 #[test]
227 fn value_filter_exact_match() {
228 let matchers = compile_matchers(&filter_with_values(
229 1,
230 "(?s).*",
231 vec![BytesFilter::Exact(b"target".to_vec())],
232 ))
233 .unwrap();
234 let kvs = vec![
235 (key(1, b"a"), Bytes::from_static(b"target")),
236 (key(1, b"b"), Bytes::from_static(b"other")),
237 ];
238 let entries = apply_filter(&matchers, &kvs);
239 assert_eq!(entries.len(), 1);
240 assert_eq!(entries[0].value.as_slice(), b"target");
241 }
242
243 #[test]
244 fn value_filter_empty_accepts_all_matching_keys() {
245 let matchers = compile_matchers(&filter(1, "(?s).*")).unwrap();
246 let kvs = vec![
247 (key(1, b"a"), Bytes::from_static(b"one")),
248 (key(1, b"b"), Bytes::from_static(b"two")),
249 ];
250 let entries = apply_filter(&matchers, &kvs);
251 assert_eq!(entries.len(), 2);
252 }
253}