Skip to main content

exoware_sdk/
stream_filter.rs

1//! Validated filter for `store.stream.v1.Subscribe`.
2//!
3//! The filter is a list of `MatchKey`s with OR semantics: a row is delivered
4//! if any match_key's (reserved_bits, prefix) selects its family AND its
5//! payload_regex matches the key's payload bytes. The list is capped at 16 to
6//! keep server-side regex compile cost predictable. When `value_filters` is
7//! non-empty, rows that pass the key filter must also satisfy any one
8//! `BytesFilter` in the value list (OR within the value list; AND between key
9//! and value filters).
10
11use std::collections::BTreeSet;
12
13use anyhow::ensure;
14use regex::bytes::Regex;
15
16use crate::keys::KeyCodec;
17use crate::match_key::MatchKey;
18
19pub const MAX_MATCH_KEYS_PER_FILTER: usize = 16;
20pub const MAX_VALUE_FILTERS_PER_FILTER: usize = 16;
21
22/// Matches a row's raw value bytes by exact match, prefix, or regex. Wire
23/// shape mirrors `store.common.v1.BytesFilter`.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub enum BytesFilter {
26    Exact(Vec<u8>),
27    Prefix(Vec<u8>),
28    Regex(String),
29}
30
31#[derive(Clone, Debug, PartialEq, Eq)]
32pub struct StreamFilter {
33    pub match_keys: Vec<MatchKey>,
34    pub value_filters: Vec<BytesFilter>,
35}
36
37/// A compiled bundle of `BytesFilter`s ready to evaluate against a byte
38/// slice. OR semantics: `matches` returns true when any contained filter
39/// matches the input. An empty bundle matches nothing; callers should model
40/// "no filter configured" as `Option::None` and skip the match.
41#[derive(Clone, Debug)]
42pub struct CompiledBytesFilters {
43    exacts: BTreeSet<Vec<u8>>,
44    prefixes: Vec<Vec<u8>>,
45    regexes: Vec<Regex>,
46}
47
48impl CompiledBytesFilters {
49    /// Compile filters from the domain `BytesFilter` representation. Returns
50    /// `Ok(None)` when the input is empty (caller should skip matching).
51    pub fn compile(filters: &[BytesFilter]) -> Result<Option<Self>, String> {
52        if filters.is_empty() {
53            return Ok(None);
54        }
55        let mut exacts = BTreeSet::<Vec<u8>>::new();
56        let mut prefixes = Vec::new();
57        let mut regexes = Vec::new();
58        for filter in filters {
59            match filter {
60                BytesFilter::Exact(bytes) => {
61                    exacts.insert(bytes.clone());
62                }
63                BytesFilter::Prefix(bytes) => prefixes.push(bytes.clone()),
64                BytesFilter::Regex(pattern) => {
65                    if pattern.trim().is_empty() {
66                        return Err("regex filter must not be empty".to_string());
67                    }
68                    regexes.push(
69                        Regex::new(pattern)
70                            .map_err(|e| format!("invalid regex `{pattern}`: {e}"))?,
71                    );
72                }
73            }
74        }
75        Ok(Some(Self {
76            exacts,
77            prefixes,
78            regexes,
79        }))
80    }
81
82    pub fn matches(&self, bytes: &[u8]) -> bool {
83        self.exacts.contains(bytes)
84            || self.prefixes.iter().any(|p| bytes.starts_with(p))
85            || self.regexes.iter().any(|r| r.is_match(bytes))
86    }
87}
88
89/// Shape-only validation: bounds, family validity, non-empty regex string.
90/// Does NOT compile the regex — the server compiles once per subscribe.
91pub fn validate_filter(filter: &StreamFilter) -> anyhow::Result<()> {
92    ensure!(
93        !filter.match_keys.is_empty(),
94        "stream filter must contain at least one match_key"
95    );
96    ensure!(
97        filter.match_keys.len() <= MAX_MATCH_KEYS_PER_FILTER,
98        "stream filter capped at {MAX_MATCH_KEYS_PER_FILTER} match_keys"
99    );
100    ensure!(
101        filter.value_filters.len() <= MAX_VALUE_FILTERS_PER_FILTER,
102        "stream filter capped at {MAX_VALUE_FILTERS_PER_FILTER} value_filters"
103    );
104    for mk in &filter.match_keys {
105        // Panics on invalid (reserved_bits, prefix); translate to Err.
106        std::panic::catch_unwind(|| KeyCodec::new(mk.reserved_bits, mk.prefix)).map_err(|_| {
107            anyhow::anyhow!(
108                "invalid (reserved_bits={}, prefix={}) — see KeyCodec::new",
109                mk.reserved_bits,
110                mk.prefix
111            )
112        })?;
113        ensure!(
114            !mk.payload_regex.trim().is_empty(),
115            "match_key payload_regex must not be empty"
116        );
117    }
118    for vf in &filter.value_filters {
119        match vf {
120            BytesFilter::Regex(r) => {
121                ensure!(!r.trim().is_empty(), "value_filter regex must not be empty")
122            }
123            BytesFilter::Exact(_) | BytesFilter::Prefix(_) => {}
124        }
125    }
126    Ok(())
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use crate::kv_codec::Utf8;
133
134    fn mk(prefix: u16) -> MatchKey {
135        MatchKey {
136            reserved_bits: 4,
137            prefix,
138            payload_regex: Utf8::from("(?s).*"),
139        }
140    }
141
142    #[test]
143    fn accepts_one_match_key() {
144        let f = StreamFilter {
145            match_keys: vec![mk(1)],
146            value_filters: vec![],
147        };
148        validate_filter(&f).unwrap();
149    }
150
151    #[test]
152    fn rejects_empty() {
153        let f = StreamFilter {
154            match_keys: vec![],
155            value_filters: vec![],
156        };
157        assert!(validate_filter(&f).is_err());
158    }
159
160    #[test]
161    fn rejects_too_many() {
162        let f = StreamFilter {
163            match_keys: (0..(MAX_MATCH_KEYS_PER_FILTER as u16 + 1))
164                .map(mk)
165                .collect(),
166            value_filters: vec![],
167        };
168        assert!(validate_filter(&f).is_err());
169    }
170
171    #[test]
172    fn rejects_empty_regex() {
173        let f = StreamFilter {
174            match_keys: vec![MatchKey {
175                reserved_bits: 4,
176                prefix: 1,
177                payload_regex: Utf8::from(""),
178            }],
179            value_filters: vec![],
180        };
181        assert!(validate_filter(&f).is_err());
182    }
183}