1use nu_protocol::Value;
2use tokio::sync::broadcast;
3
4#[derive(Clone, Debug)]
5pub struct BusEvent {
6 pub topic: String,
7 pub value: Value,
8}
9
10pub struct Bus {
11 sender: broadcast::Sender<BusEvent>,
12}
13
14impl Bus {
15 pub fn new(capacity: usize) -> Self {
16 let (tx, _rx) = broadcast::channel(capacity);
17 Self { sender: tx }
18 }
19
20 pub fn publish(&self, topic: impl Into<String>, value: Value) {
21 let _ = self.sender.send(BusEvent {
22 topic: topic.into(),
23 value,
24 });
25 }
26
27 pub fn subscribe(&self, pattern: Option<String>) -> BusSubscription {
28 BusSubscription {
29 rx: self.sender.subscribe(),
30 matcher: pattern.map(GlobMatcher::new),
31 }
32 }
33}
34
35pub struct BusSubscription {
36 rx: broadcast::Receiver<BusEvent>,
37 matcher: Option<GlobMatcher>,
38}
39
40impl BusSubscription {
41 pub async fn recv(&mut self) -> Option<BusEvent> {
42 loop {
43 match self.rx.recv().await {
44 Ok(ev) => {
45 if self.matches(&ev.topic) {
46 return Some(ev);
47 }
48 }
49 Err(broadcast::error::RecvError::Lagged(_)) => return None,
53 Err(broadcast::error::RecvError::Closed) => return None,
54 }
55 }
56 }
57
58 fn matches(&self, topic: &str) -> bool {
59 match &self.matcher {
60 None => true,
61 Some(m) => m.matches(topic),
62 }
63 }
64}
65
66#[derive(Clone, Debug)]
67pub struct GlobMatcher {
68 parts: Vec<GlobPart>,
69}
70
71#[derive(Clone, Debug)]
72enum GlobPart {
73 Literal(String),
74 Star,
75}
76
77impl GlobMatcher {
78 pub fn new(pattern: impl Into<String>) -> Self {
79 let pattern = pattern.into();
80 let mut parts = Vec::new();
81 let mut buf = String::new();
82 for ch in pattern.chars() {
83 if ch == '*' {
84 if !buf.is_empty() {
85 parts.push(GlobPart::Literal(std::mem::take(&mut buf)));
86 }
87 if !matches!(parts.last(), Some(GlobPart::Star)) {
88 parts.push(GlobPart::Star);
89 }
90 } else {
91 buf.push(ch);
92 }
93 }
94 if !buf.is_empty() {
95 parts.push(GlobPart::Literal(buf));
96 }
97 Self { parts }
98 }
99
100 pub fn matches(&self, s: &str) -> bool {
101 match_parts(&self.parts, s)
102 }
103}
104
105fn match_parts(parts: &[GlobPart], s: &str) -> bool {
106 match parts.split_first() {
107 None => s.is_empty(),
108 Some((GlobPart::Literal(lit), rest)) => match s.strip_prefix(lit.as_str()) {
109 Some(remainder) => match_parts(rest, remainder),
110 None => false,
111 },
112 Some((GlobPart::Star, rest)) => {
113 if rest.is_empty() {
114 return true;
115 }
116 for (i, _) in s.char_indices() {
117 if match_parts(rest, &s[i..]) {
118 return true;
119 }
120 }
121 match_parts(rest, "")
122 }
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129 use nu_protocol::Span;
130
131 fn v(s: &str) -> Value {
132 Value::string(s, Span::test_data())
133 }
134
135 #[tokio::test]
136 async fn sub_no_pattern_receives_everything() {
137 let bus = Bus::new(64);
138 let mut sub = bus.subscribe(None);
139 bus.publish("a.foo", v("1"));
140 bus.publish("b.bar", v("2"));
141 let e1 = sub.recv().await.unwrap();
142 let e2 = sub.recv().await.unwrap();
143 assert_eq!(e1.topic, "a.foo");
144 assert_eq!(e1.value.as_str().unwrap(), "1");
145 assert_eq!(e2.topic, "b.bar");
146 assert_eq!(e2.value.as_str().unwrap(), "2");
147 }
148
149 #[tokio::test]
150 async fn sub_glob_filters_to_matching_topics() {
151 let bus = Bus::new(64);
152 let mut sub = bus.subscribe(Some("tab-abc.*".into()));
153 bus.publish("tab-abc.compose.close", v("yes"));
154 bus.publish("tab-xyz.compose.close", v("no"));
155 bus.publish("tab-abc.editor.open", v("yes"));
156 let e1 = sub.recv().await.unwrap();
157 assert_eq!(e1.topic, "tab-abc.compose.close");
158 let e2 = sub.recv().await.unwrap();
159 assert_eq!(e2.topic, "tab-abc.editor.open");
160 }
161
162 #[tokio::test]
163 async fn sub_terminates_on_lag() {
164 let bus = Bus::new(2);
165 let mut sub = bus.subscribe(None);
166 for i in 0..10 {
167 bus.publish("t", v(&i.to_string()));
168 }
169 assert!(sub.recv().await.is_none());
170 }
171
172 #[test]
173 fn glob_star_matches_dotted_segments() {
174 let m = GlobMatcher::new("a.*");
175 assert!(m.matches("a.b"));
176 assert!(m.matches("a.b.c"));
177 assert!(m.matches("a."));
178 assert!(!m.matches("a"));
179 assert!(!m.matches("b.a"));
180 }
181
182 #[test]
183 fn glob_bare_star_matches_anything() {
184 let m = GlobMatcher::new("*");
185 assert!(m.matches(""));
186 assert!(m.matches("anything"));
187 assert!(m.matches("with.dots.too"));
188 }
189
190 #[test]
191 fn glob_literal_only_exact_match() {
192 let m = GlobMatcher::new("exact.topic");
193 assert!(m.matches("exact.topic"));
194 assert!(!m.matches("exact.topic.suffix"));
195 assert!(!m.matches("prefix.exact.topic"));
196 }
197}