Skip to main content

http_nu/
bus.rs

1use nu_protocol::Value;
2use tokio::sync::broadcast;
3
4#[derive(Clone, Debug)]
5pub struct BusEvent {
6    pub topic: String,
7    pub value: Value,
8}
9
10pub struct Bus {
11    sender: broadcast::Sender<BusEvent>,
12}
13
14impl Bus {
15    pub fn new(capacity: usize) -> Self {
16        let (tx, _rx) = broadcast::channel(capacity);
17        Self { sender: tx }
18    }
19
20    pub fn publish(&self, topic: impl Into<String>, value: Value) {
21        let _ = self.sender.send(BusEvent {
22            topic: topic.into(),
23            value,
24        });
25    }
26
27    pub fn subscribe(&self, pattern: Option<String>) -> BusSubscription {
28        BusSubscription {
29            rx: self.sender.subscribe(),
30            matcher: pattern.map(GlobMatcher::new),
31        }
32    }
33}
34
35pub struct BusSubscription {
36    rx: broadcast::Receiver<BusEvent>,
37    matcher: Option<GlobMatcher>,
38}
39
40impl BusSubscription {
41    pub async fn recv(&mut self) -> Option<BusEvent> {
42        loop {
43            match self.rx.recv().await {
44                Ok(ev) => {
45                    if self.matches(&ev.topic) {
46                        return Some(ev);
47                    }
48                }
49                // On overflow we terminate the subscription rather than skip
50                // events. UI state may be inconsistent if events were missed;
51                // better to end the stream and let the client reconnect.
52                Err(broadcast::error::RecvError::Lagged(_)) => return None,
53                Err(broadcast::error::RecvError::Closed) => return None,
54            }
55        }
56    }
57
58    fn matches(&self, topic: &str) -> bool {
59        match &self.matcher {
60            None => true,
61            Some(m) => m.matches(topic),
62        }
63    }
64}
65
66#[derive(Clone, Debug)]
67pub struct GlobMatcher {
68    parts: Vec<GlobPart>,
69}
70
71#[derive(Clone, Debug)]
72enum GlobPart {
73    Literal(String),
74    Star,
75}
76
77impl GlobMatcher {
78    pub fn new(pattern: impl Into<String>) -> Self {
79        let pattern = pattern.into();
80        let mut parts = Vec::new();
81        let mut buf = String::new();
82        for ch in pattern.chars() {
83            if ch == '*' {
84                if !buf.is_empty() {
85                    parts.push(GlobPart::Literal(std::mem::take(&mut buf)));
86                }
87                if !matches!(parts.last(), Some(GlobPart::Star)) {
88                    parts.push(GlobPart::Star);
89                }
90            } else {
91                buf.push(ch);
92            }
93        }
94        if !buf.is_empty() {
95            parts.push(GlobPart::Literal(buf));
96        }
97        Self { parts }
98    }
99
100    pub fn matches(&self, s: &str) -> bool {
101        match_parts(&self.parts, s)
102    }
103}
104
105fn match_parts(parts: &[GlobPart], s: &str) -> bool {
106    match parts.split_first() {
107        None => s.is_empty(),
108        Some((GlobPart::Literal(lit), rest)) => match s.strip_prefix(lit.as_str()) {
109            Some(remainder) => match_parts(rest, remainder),
110            None => false,
111        },
112        Some((GlobPart::Star, rest)) => {
113            if rest.is_empty() {
114                return true;
115            }
116            for (i, _) in s.char_indices() {
117                if match_parts(rest, &s[i..]) {
118                    return true;
119                }
120            }
121            match_parts(rest, "")
122        }
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129    use nu_protocol::Span;
130
131    fn v(s: &str) -> Value {
132        Value::string(s, Span::test_data())
133    }
134
135    #[tokio::test]
136    async fn sub_no_pattern_receives_everything() {
137        let bus = Bus::new(64);
138        let mut sub = bus.subscribe(None);
139        bus.publish("a.foo", v("1"));
140        bus.publish("b.bar", v("2"));
141        let e1 = sub.recv().await.unwrap();
142        let e2 = sub.recv().await.unwrap();
143        assert_eq!(e1.topic, "a.foo");
144        assert_eq!(e1.value.as_str().unwrap(), "1");
145        assert_eq!(e2.topic, "b.bar");
146        assert_eq!(e2.value.as_str().unwrap(), "2");
147    }
148
149    #[tokio::test]
150    async fn sub_glob_filters_to_matching_topics() {
151        let bus = Bus::new(64);
152        let mut sub = bus.subscribe(Some("tab-abc.*".into()));
153        bus.publish("tab-abc.compose.close", v("yes"));
154        bus.publish("tab-xyz.compose.close", v("no"));
155        bus.publish("tab-abc.editor.open", v("yes"));
156        let e1 = sub.recv().await.unwrap();
157        assert_eq!(e1.topic, "tab-abc.compose.close");
158        let e2 = sub.recv().await.unwrap();
159        assert_eq!(e2.topic, "tab-abc.editor.open");
160    }
161
162    #[tokio::test]
163    async fn sub_terminates_on_lag() {
164        let bus = Bus::new(2);
165        let mut sub = bus.subscribe(None);
166        for i in 0..10 {
167            bus.publish("t", v(&i.to_string()));
168        }
169        assert!(sub.recv().await.is_none());
170    }
171
172    #[test]
173    fn glob_star_matches_dotted_segments() {
174        let m = GlobMatcher::new("a.*");
175        assert!(m.matches("a.b"));
176        assert!(m.matches("a.b.c"));
177        assert!(m.matches("a."));
178        assert!(!m.matches("a"));
179        assert!(!m.matches("b.a"));
180    }
181
182    #[test]
183    fn glob_bare_star_matches_anything() {
184        let m = GlobMatcher::new("*");
185        assert!(m.matches(""));
186        assert!(m.matches("anything"));
187        assert!(m.matches("with.dots.too"));
188    }
189
190    #[test]
191    fn glob_literal_only_exact_match() {
192        let m = GlobMatcher::new("exact.topic");
193        assert!(m.matches("exact.topic"));
194        assert!(!m.matches("exact.topic.suffix"));
195        assert!(!m.matches("prefix.exact.topic"));
196    }
197}