Skip to main content

mqdb_core/
subscription.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::events::ChangeEvent;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
8#[serde(rename_all = "snake_case")]
9pub enum SubscriptionMode {
10    #[default]
11    Broadcast,
12    LoadBalanced,
13    Ordered,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct Subscription {
18    pub id: String,
19    pub pattern: String,
20    pub entity: Option<String>,
21    #[serde(default)]
22    pub share_group: Option<String>,
23    #[serde(default)]
24    pub mode: SubscriptionMode,
25}
26
27impl Subscription {
28    #[allow(clippy::must_use_candidate)]
29    pub fn new(id: String, pattern: String, entity: Option<String>) -> Self {
30        Self {
31            id,
32            pattern,
33            entity,
34            share_group: None,
35            mode: SubscriptionMode::default(),
36        }
37    }
38
39    #[must_use]
40    pub fn with_share_group(mut self, group: String, mode: SubscriptionMode) -> Self {
41        self.share_group = Some(group);
42        self.mode = mode;
43        self
44    }
45
46    #[must_use]
47    pub fn matches(&self, event: &ChangeEvent) -> bool {
48        if let Some(ref entity) = self.entity
49            && entity != &event.entity
50        {
51            return false;
52        }
53
54        match_pattern(&self.pattern, &event.entity, &event.id)
55    }
56}
57
58#[must_use]
59pub fn match_pattern(pattern: &str, entity: &str, id: &str) -> bool {
60    let path = format!("{entity}/{id}");
61    match_wildcard(pattern, &path)
62}
63
64#[must_use]
65pub fn match_wildcard(pattern: &str, path: &str) -> bool {
66    let pattern_parts: Vec<&str> = pattern.split('/').collect();
67    let path_parts: Vec<&str> = path.split('/').collect();
68
69    match_parts(&pattern_parts, &path_parts, 0, 0)
70}
71
72fn match_parts(pattern: &[&str], path: &[&str], p_idx: usize, path_idx: usize) -> bool {
73    if p_idx >= pattern.len() {
74        return path_idx >= path.len();
75    }
76
77    let current = pattern[p_idx];
78
79    if current == "#" {
80        if p_idx == pattern.len() - 1 {
81            return true;
82        }
83        for i in path_idx..=path.len() {
84            if match_parts(pattern, path, p_idx + 1, i) {
85                return true;
86            }
87        }
88        return false;
89    }
90
91    if path_idx >= path.len() {
92        return false;
93    }
94
95    if current == "+" || current == path[path_idx] {
96        return match_parts(pattern, path, p_idx + 1, path_idx + 1);
97    }
98
99    false
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    #[test]
107    fn test_wildcard_matching() {
108        assert!(match_wildcard("users/+", "users/123"));
109        assert!(match_wildcard("users/#", "users/123"));
110        assert!(match_wildcard("users/#", "users/123/profile"));
111        assert!(match_wildcard("+/123", "users/123"));
112        assert!(!match_wildcard("users/+", "users/123/profile"));
113        assert!(!match_wildcard("posts/+", "users/123"));
114    }
115
116    #[test]
117    fn test_subscription_matches() {
118        let sub = Subscription::new("sub1".into(), "users/+".into(), Some("users".into()));
119
120        let event = ChangeEvent::create(
121            "users".into(),
122            "123".into(),
123            serde_json::json!({"name": "test"}),
124        );
125
126        assert!(sub.matches(&event));
127    }
128}