exoware_sdk/
stream_filter.rs1use std::collections::BTreeSet;
12
13use anyhow::ensure;
14use regex::bytes::Regex;
15
16use crate::keys::KeyCodec;
17use crate::match_key::MatchKey;
18
19pub const MAX_MATCH_KEYS_PER_FILTER: usize = 16;
20pub const MAX_VALUE_FILTERS_PER_FILTER: usize = 16;
21
22#[derive(Clone, Debug, PartialEq, Eq)]
25pub enum BytesFilter {
26 Exact(Vec<u8>),
27 Prefix(Vec<u8>),
28 Regex(String),
29}
30
31#[derive(Clone, Debug, PartialEq, Eq)]
32pub struct StreamFilter {
33 pub match_keys: Vec<MatchKey>,
34 pub value_filters: Vec<BytesFilter>,
35}
36
37#[derive(Clone, Debug)]
42pub struct CompiledBytesFilters {
43 exacts: BTreeSet<Vec<u8>>,
44 prefixes: Vec<Vec<u8>>,
45 regexes: Vec<Regex>,
46}
47
48impl CompiledBytesFilters {
49 pub fn compile(filters: &[BytesFilter]) -> Result<Option<Self>, String> {
52 if filters.is_empty() {
53 return Ok(None);
54 }
55 let mut exacts = BTreeSet::<Vec<u8>>::new();
56 let mut prefixes = Vec::new();
57 let mut regexes = Vec::new();
58 for filter in filters {
59 match filter {
60 BytesFilter::Exact(bytes) => {
61 exacts.insert(bytes.clone());
62 }
63 BytesFilter::Prefix(bytes) => prefixes.push(bytes.clone()),
64 BytesFilter::Regex(pattern) => {
65 if pattern.trim().is_empty() {
66 return Err("regex filter must not be empty".to_string());
67 }
68 regexes.push(
69 Regex::new(pattern)
70 .map_err(|e| format!("invalid regex `{pattern}`: {e}"))?,
71 );
72 }
73 }
74 }
75 Ok(Some(Self {
76 exacts,
77 prefixes,
78 regexes,
79 }))
80 }
81
82 pub fn matches(&self, bytes: &[u8]) -> bool {
83 self.exacts.contains(bytes)
84 || self.prefixes.iter().any(|p| bytes.starts_with(p))
85 || self.regexes.iter().any(|r| r.is_match(bytes))
86 }
87}
88
89pub fn validate_filter(filter: &StreamFilter) -> anyhow::Result<()> {
92 ensure!(
93 !filter.match_keys.is_empty(),
94 "stream filter must contain at least one match_key"
95 );
96 ensure!(
97 filter.match_keys.len() <= MAX_MATCH_KEYS_PER_FILTER,
98 "stream filter capped at {MAX_MATCH_KEYS_PER_FILTER} match_keys"
99 );
100 ensure!(
101 filter.value_filters.len() <= MAX_VALUE_FILTERS_PER_FILTER,
102 "stream filter capped at {MAX_VALUE_FILTERS_PER_FILTER} value_filters"
103 );
104 for mk in &filter.match_keys {
105 std::panic::catch_unwind(|| KeyCodec::new(mk.reserved_bits, mk.prefix)).map_err(|_| {
107 anyhow::anyhow!(
108 "invalid (reserved_bits={}, prefix={}) — see KeyCodec::new",
109 mk.reserved_bits,
110 mk.prefix
111 )
112 })?;
113 ensure!(
114 !mk.payload_regex.trim().is_empty(),
115 "match_key payload_regex must not be empty"
116 );
117 }
118 for vf in &filter.value_filters {
119 match vf {
120 BytesFilter::Regex(r) => {
121 ensure!(!r.trim().is_empty(), "value_filter regex must not be empty")
122 }
123 BytesFilter::Exact(_) | BytesFilter::Prefix(_) => {}
124 }
125 }
126 Ok(())
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use crate::kv_codec::Utf8;
133
134 fn mk(prefix: u16) -> MatchKey {
135 MatchKey {
136 reserved_bits: 4,
137 prefix,
138 payload_regex: Utf8::from("(?s).*"),
139 }
140 }
141
142 #[test]
143 fn accepts_one_match_key() {
144 let f = StreamFilter {
145 match_keys: vec![mk(1)],
146 value_filters: vec![],
147 };
148 validate_filter(&f).unwrap();
149 }
150
151 #[test]
152 fn rejects_empty() {
153 let f = StreamFilter {
154 match_keys: vec![],
155 value_filters: vec![],
156 };
157 assert!(validate_filter(&f).is_err());
158 }
159
160 #[test]
161 fn rejects_too_many() {
162 let f = StreamFilter {
163 match_keys: (0..(MAX_MATCH_KEYS_PER_FILTER as u16 + 1))
164 .map(mk)
165 .collect(),
166 value_filters: vec![],
167 };
168 assert!(validate_filter(&f).is_err());
169 }
170
171 #[test]
172 fn rejects_empty_regex() {
173 let f = StreamFilter {
174 match_keys: vec![MatchKey {
175 reserved_bits: 4,
176 prefix: 1,
177 payload_regex: Utf8::from(""),
178 }],
179 value_filters: vec![],
180 };
181 assert!(validate_filter(&f).is_err());
182 }
183}