use thiserror::Error;
use crate::caql::Query;
#[derive(Debug, Error)]
pub enum TopicParseError {
#[error("empty topic")]
Empty,
#[error("invalid regex segment at index {index}: {source}")]
BadRegex {
index: usize,
#[source]
source: regex::Error,
},
#[error("caql filter: {0}")]
Caql(#[from] crate::caql::CaqlError),
}
#[derive(Debug, Clone)]
pub enum Segment {
Literal(String),
SingleWild,
MultiWild,
Regex(regex::Regex),
}
#[derive(Debug, Clone)]
pub struct TopicPattern {
pub segments: Vec<Segment>,
pub filter: Option<Query>,
pub raw: String,
}
impl TopicPattern {
pub fn parse(input: &str) -> Result<Self, TopicParseError> {
if input.is_empty() {
return Err(TopicParseError::Empty);
}
let raw = input.to_string();
let (path, filter) = match input.find('?') {
Some(i) => {
let q = crate::caql::parse(&input[i + 1..])?;
(&input[..i], Some(q))
}
None => (input, None),
};
let mut segments = Vec::new();
for (idx, seg) in path.split('/').enumerate() {
if seg == "*" {
segments.push(Segment::SingleWild);
} else if seg == "**" {
segments.push(Segment::MultiWild);
} else if seg.starts_with('{') && seg.ends_with('}') && seg.len() >= 2 {
let inner = &seg[1..seg.len() - 1];
let re = regex::Regex::new(inner).map_err(|e| TopicParseError::BadRegex {
index: idx,
source: e,
})?;
segments.push(Segment::Regex(re));
} else {
segments.push(Segment::Literal(seg.to_string()));
}
}
Ok(Self {
segments,
filter,
raw,
})
}
pub fn matches_topic(&self, topic: &str) -> bool {
let parts: Vec<&str> = topic.split('/').collect();
matches_at(&self.segments, &parts)
}
pub fn matches_event(&self, topic: &str, payload: &serde_json::Value) -> bool {
if !self.matches_topic(topic) {
return false;
}
match &self.filter {
None => true,
Some(q) => crate::caql::matches(q, payload).unwrap_or(false),
}
}
}
fn matches_at(pat: &[Segment], parts: &[&str]) -> bool {
match pat.first() {
None => parts.is_empty(),
Some(Segment::MultiWild) => {
if pat.len() == 1 {
return !parts.is_empty();
}
for consume in 1..=parts.len() {
if matches_at(&pat[1..], &parts[consume..]) {
return true;
}
}
false
}
Some(_) if parts.is_empty() => false,
Some(Segment::SingleWild) => matches_at(&pat[1..], &parts[1..]),
Some(Segment::Literal(s)) => parts[0] == s && matches_at(&pat[1..], &parts[1..]),
Some(Segment::Regex(re)) => re.is_match(parts[0]) && matches_at(&pat[1..], &parts[1..]),
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn literal() {
let p = TopicPattern::parse("hub/led/abc/state").unwrap();
assert!(p.matches_topic("hub/led/abc/state"));
assert!(!p.matches_topic("hub/led/xyz/state"));
}
#[test]
fn single_wild() {
let p = TopicPattern::parse("hub/led/*/state").unwrap();
assert!(p.matches_topic("hub/led/abc/state"));
assert!(p.matches_topic("hub/led/xyz/state"));
assert!(!p.matches_topic("hub/led/abc/temperature"));
assert!(!p.matches_topic("hub/led/abc/inner/state"));
}
#[test]
fn multi_wild() {
let p = TopicPattern::parse("hub/**/state").unwrap();
assert!(p.matches_topic("hub/led/abc/state"));
assert!(p.matches_topic("hub/zone1/zone2/led/abc/state"));
assert!(!p.matches_topic("hub/state")); assert!(!p.matches_topic("hub/led/abc/temperature"));
}
#[test]
fn trailing_multi_wild() {
let p = TopicPattern::parse("hub/**").unwrap();
assert!(p.matches_topic("hub/led/abc/state"));
assert!(p.matches_topic("hub/x"));
assert!(!p.matches_topic("hub"));
}
#[test]
fn regex_segment() {
let p = TopicPattern::parse("{^hub.+$}/led/abc/state").unwrap();
assert!(p.matches_topic("hubA/led/abc/state"));
assert!(p.matches_topic("hub-cloud/led/abc/state"));
assert!(!p.matches_topic("hub/led/abc/state"));
}
#[test]
fn caql_filter() {
let p = TopicPattern::parse("hub/sensor/*/temp?where data > 85").unwrap();
assert!(p.matches_event("hub/sensor/abc/temp", &json!({"data": 90})));
assert!(!p.matches_event("hub/sensor/abc/temp", &json!({"data": 50})));
assert!(!p.matches_event("hub/sensor/abc/humidity", &json!({"data": 90})));
}
}