use thiserror::Error;
use crate::query::{Query, QueryError};
#[derive(Debug, Error)]
pub enum TopicParseError {
#[error("empty topic")]
Empty,
#[error("invalid regex segment at index {index}: {source}")]
BadRegex {
index: usize,
#[source]
source: regex::Error,
},
#[error("caql filter: {0}")]
Caql(#[from] QueryError),
#[error(
"topic suffix `?{0}` does not include a `ql=` parameter; \
the only filter parameter recognized today is `ql=<caql>`"
)]
MissingQlParam(String),
}
#[derive(Debug, Clone)]
pub enum Segment {
Literal(String),
SingleWild,
MultiWild,
Regex(regex::Regex),
}
#[derive(Debug, Clone)]
pub struct TopicPattern {
pub segments: Vec<Segment>,
pub filter: Option<Query>,
pub raw: String,
}
impl TopicPattern {
pub fn parse(input: &str) -> Result<Self, TopicParseError> {
if input.is_empty() {
return Err(TopicParseError::Empty);
}
let raw = input.to_string();
let (path, filter) = match input.find('?') {
Some(i) => {
let suffix = &input[i + 1..];
let mut ql_value: Option<String> = None;
for (k, v) in url::form_urlencoded::parse(suffix.as_bytes()) {
if k == "ql" {
ql_value = Some(v.into_owned());
break;
}
}
let q = match ql_value {
Some(v) => crate::caql::parse(&v)?,
None => return Err(TopicParseError::MissingQlParam(suffix.to_string())),
};
(&input[..i], Some(q))
}
None => (input, None),
};
let mut segments = Vec::new();
for (idx, seg) in path.split('/').enumerate() {
if seg == "*" {
segments.push(Segment::SingleWild);
} else if seg == "**" {
segments.push(Segment::MultiWild);
} else if seg.starts_with('{') && seg.ends_with('}') && seg.len() >= 2 {
let inner = &seg[1..seg.len() - 1];
let re = regex::Regex::new(inner).map_err(|e| TopicParseError::BadRegex {
index: idx,
source: e,
})?;
segments.push(Segment::Regex(re));
} else {
segments.push(Segment::Literal(seg.to_string()));
}
}
Ok(Self {
segments,
filter,
raw,
})
}
pub fn matches_topic(&self, topic: &str) -> bool {
let parts: Vec<&str> = topic.split('/').collect();
matches_at(&self.segments, &parts)
}
pub fn matches_event(&self, topic: &str, payload: &serde_json::Value) -> bool {
if !self.matches_topic(topic) {
return false;
}
match &self.filter {
None => true,
Some(q) => crate::query::matches(q, payload).unwrap_or(false),
}
}
}
fn matches_at(pat: &[Segment], parts: &[&str]) -> bool {
match pat.first() {
None => parts.is_empty(),
Some(Segment::MultiWild) => {
if pat.len() == 1 {
return !parts.is_empty();
}
for consume in 1..=parts.len() {
if matches_at(&pat[1..], &parts[consume..]) {
return true;
}
}
false
}
Some(_) if parts.is_empty() => false,
Some(Segment::SingleWild) => matches_at(&pat[1..], &parts[1..]),
Some(Segment::Literal(s)) => parts[0] == s && matches_at(&pat[1..], &parts[1..]),
Some(Segment::Regex(re)) => re.is_match(parts[0]) && matches_at(&pat[1..], &parts[1..]),
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn literal() {
let p = TopicPattern::parse("hub/led/abc/state").unwrap();
assert!(p.matches_topic("hub/led/abc/state"));
assert!(!p.matches_topic("hub/led/xyz/state"));
}
#[test]
fn single_wild() {
let p = TopicPattern::parse("hub/led/*/state").unwrap();
assert!(p.matches_topic("hub/led/abc/state"));
assert!(p.matches_topic("hub/led/xyz/state"));
assert!(!p.matches_topic("hub/led/abc/temperature"));
assert!(!p.matches_topic("hub/led/abc/inner/state"));
}
#[test]
fn multi_wild() {
let p = TopicPattern::parse("hub/**/state").unwrap();
assert!(p.matches_topic("hub/led/abc/state"));
assert!(p.matches_topic("hub/zone1/zone2/led/abc/state"));
assert!(!p.matches_topic("hub/state")); assert!(!p.matches_topic("hub/led/abc/temperature"));
}
#[test]
fn trailing_multi_wild() {
let p = TopicPattern::parse("hub/**").unwrap();
assert!(p.matches_topic("hub/led/abc/state"));
assert!(p.matches_topic("hub/x"));
assert!(!p.matches_topic("hub"));
}
#[test]
fn regex_segment() {
let p = TopicPattern::parse("{^hub.+$}/led/abc/state").unwrap();
assert!(p.matches_topic("hubA/led/abc/state"));
assert!(p.matches_topic("hub-cloud/led/abc/state"));
assert!(!p.matches_topic("hub/led/abc/state"));
}
#[test]
fn caql_filter() {
let p = TopicPattern::parse("hub/sensor/*/temp?ql=where data > 85").unwrap();
assert!(p.matches_event("hub/sensor/abc/temp", &json!({"data": 90})));
assert!(!p.matches_event("hub/sensor/abc/temp", &json!({"data": 50})));
assert!(!p.matches_event("hub/sensor/abc/humidity", &json!({"data": 90})));
}
#[test]
fn topic_filter_field_type_is_query_query() {
let p = TopicPattern::parse("hub/x?ql=where data > 1").unwrap();
let _: Option<crate::query::Query> = p.filter;
}
#[test]
fn topic_filter_syntax_is_url_query_string_with_ql_param() {
TopicPattern::parse("hub/sensor/*/temp?ql=where data > 85")
.expect("`?ql=<caql>` is the supported suffix");
assert!(
TopicPattern::parse("hub/sensor/*/temp?where data > 85").is_err(),
"bare `?<caql>` is rejected — clients must use `?ql=<caql>`"
);
}
#[test]
fn topic_filter_with_url_encoded_value() {
let raw = format!(
"hub/sensor/*/state?ql={}",
url::form_urlencoded::byte_serialize(b"where data = \"on\"").collect::<String>()
);
let p = TopicPattern::parse(&raw).unwrap();
assert!(p.matches_event("hub/sensor/abc/state", &json!({"data": "on"})));
assert!(!p.matches_event("hub/sensor/abc/state", &json!({"data": "off"})));
}
#[test]
fn topic_filter_extra_params_are_ignored() {
let p = TopicPattern::parse("hub/x?ql=where data > 1&limit=10").unwrap();
assert!(p.filter.is_some());
}
#[test]
fn topic_filter_uses_query_evaluator_for_contains() {
use crate::query::{FieldPath, Literal, Predicate, Projection, Query};
let p = TopicPattern {
segments: vec![Segment::Literal("hub".into()), Segment::Literal("x".into())],
filter: Some(Query {
projection: Projection::All,
predicate: Predicate::contains(
FieldPath::parse("labels"),
Literal::String("urgent".into()),
),
}),
raw: "hub/x".into(),
};
assert!(p.matches_event("hub/x", &json!({"labels": ["urgent", "cron"]})));
assert!(!p.matches_event("hub/x", &json!({"labels": ["cron"]})));
}
}