arcp_runtime/runtime/
subscription.rs1use std::sync::Arc;
10
11use dashmap::DashMap;
12use tokio::sync::broadcast;
13
14use arcp_core::envelope::Envelope;
15use arcp_core::ids::{SessionId, SubscriptionId};
16use arcp_core::messages::SubscriptionFilter;
17
18const BROADCAST_CAPACITY: usize = 1024;
19
20#[derive(Clone)]
22pub struct SubscriptionManager {
23 inner: Arc<Inner>,
24}
25
26struct Inner {
27 bus: broadcast::Sender<Envelope>,
28 subs: DashMap<SubscriptionId, ActiveSubscription>,
29}
30
31#[derive(Clone)]
32struct ActiveSubscription {
33 #[allow(dead_code)]
36 filter: SubscriptionFilter,
37 session_id: SessionId,
39}
40
41impl std::fmt::Debug for SubscriptionManager {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("SubscriptionManager")
44 .field("active", &self.inner.subs.len())
45 .finish()
46 }
47}
48
49impl Default for SubscriptionManager {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl SubscriptionManager {
56 #[must_use]
58 pub fn new() -> Self {
59 let (bus, _drop_initial_receiver) = broadcast::channel(BROADCAST_CAPACITY);
60 Self {
61 inner: Arc::new(Inner {
62 bus,
63 subs: DashMap::new(),
64 }),
65 }
66 }
67
68 #[must_use]
71 pub fn publish(&self, envelope: &Envelope) -> usize {
72 self.inner.bus.send(envelope.clone()).unwrap_or(0)
75 }
76
77 #[must_use]
79 pub fn register(
80 &self,
81 filter: SubscriptionFilter,
82 session_id: SessionId,
83 ) -> (SubscriptionId, FilteredReceiver) {
84 let id = SubscriptionId::new();
85 let rx = self.inner.bus.subscribe();
86 self.inner.subs.insert(
87 id.clone(),
88 ActiveSubscription {
89 filter: filter.clone(),
90 session_id,
91 },
92 );
93 (id, FilteredReceiver { inner: rx, filter })
94 }
95
96 #[must_use]
98 pub fn unsubscribe(&self, id: &SubscriptionId) -> bool {
99 self.inner.subs.remove(id).is_some()
100 }
101
102 pub fn drop_session(&self, session_id: &SessionId) {
104 self.inner.subs.retain(|_, s| s.session_id != *session_id);
105 }
106
107 #[must_use]
109 pub fn len(&self) -> usize {
110 self.inner.subs.len()
111 }
112
113 #[must_use]
115 pub fn is_empty(&self) -> bool {
116 self.inner.subs.is_empty()
117 }
118}
119
120pub struct FilteredReceiver {
122 inner: broadcast::Receiver<Envelope>,
123 filter: SubscriptionFilter,
124}
125
126impl std::fmt::Debug for FilteredReceiver {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("FilteredReceiver").finish_non_exhaustive()
129 }
130}
131
132impl FilteredReceiver {
133 pub async fn next(&mut self) -> Option<Envelope> {
138 loop {
139 match self.inner.recv().await {
140 Ok(env) => {
141 if matches(&self.filter, &env) {
142 return Some(env);
143 }
144 }
145 Err(broadcast::error::RecvError::Lagged(_)) => {}
146 Err(broadcast::error::RecvError::Closed) => return None,
147 }
148 }
149 }
150}
151
152#[must_use]
155pub fn matches(filter: &SubscriptionFilter, envelope: &Envelope) -> bool {
156 if !filter.session_id.is_empty() {
157 let Some(s) = envelope.session_id.as_ref() else {
158 return false;
159 };
160 if !filter.session_id.contains(s) {
161 return false;
162 }
163 }
164 if !filter.trace_id.is_empty() {
165 let Some(t) = envelope.trace_id.as_ref() else {
166 return false;
167 };
168 if !filter.trace_id.contains(t) {
169 return false;
170 }
171 }
172 if !filter.job_id.is_empty() {
173 let Some(j) = envelope.job_id.as_ref() else {
174 return false;
175 };
176 if !filter.job_id.contains(j) {
177 return false;
178 }
179 }
180 if !filter.stream_id.is_empty() {
181 let Some(s) = envelope.stream_id.as_ref() else {
182 return false;
183 };
184 if !filter.stream_id.contains(s) {
185 return false;
186 }
187 }
188 if !filter.types.is_empty() {
189 let t = envelope.payload.type_name();
190 if !filter.types.iter().any(|filt| filt == t) {
191 return false;
192 }
193 }
194 if let Some(min) = filter.min_priority {
195 if priority_rank(envelope.priority) < priority_rank(min) {
196 return false;
197 }
198 }
199 true
200}
201
202const fn priority_rank(p: arcp_core::envelope::Priority) -> u8 {
203 match p {
204 arcp_core::envelope::Priority::Low => 0,
205 arcp_core::envelope::Priority::Normal => 1,
206 arcp_core::envelope::Priority::High => 2,
207 arcp_core::envelope::Priority::Critical => 3,
208 }
209}
210
211#[cfg(test)]
212#[allow(
213 clippy::expect_used,
214 clippy::unwrap_used,
215 clippy::panic,
216 clippy::missing_panics_doc
217)]
218mod tests {
219 use super::*;
220 use arcp_core::envelope::Envelope;
221 use arcp_core::ids::SessionId;
222 use arcp_core::messages::{MessageType, PingPayload};
223
224 fn ping_for(session: &SessionId) -> Envelope {
225 let mut env = Envelope::new(MessageType::Ping(PingPayload::default()));
226 env.session_id = Some(session.clone());
227 env
228 }
229
230 #[tokio::test]
231 async fn subscription_filters_by_session_id() {
232 let mgr = SubscriptionManager::new();
233 let s1 = SessionId::new();
234 let s2 = SessionId::new();
235 let filter = SubscriptionFilter {
236 session_id: vec![s1.clone()],
237 ..SubscriptionFilter::default()
238 };
239 let (_id, mut rx) = mgr.register(filter, s1.clone());
240
241 let _ = mgr.publish(&ping_for(&s2)); let _ = mgr.publish(&ping_for(&s1)); let env = tokio::time::timeout(std::time::Duration::from_millis(100), rx.next())
245 .await
246 .expect("timely")
247 .expect("envelope");
248 assert_eq!(env.session_id.as_ref(), Some(&s1));
249 }
250
251 #[tokio::test]
252 async fn unsubscribe_removes_entry() {
253 let mgr = SubscriptionManager::new();
254 let s = SessionId::new();
255 let (id, _rx) = mgr.register(SubscriptionFilter::default(), s);
256 assert_eq!(mgr.len(), 1);
257 assert!(mgr.unsubscribe(&id));
258 assert!(mgr.is_empty());
259 }
260
261 #[tokio::test]
262 async fn unsubscribe_returns_false_for_unknown_id() {
263 let mgr = SubscriptionManager::new();
264 let id = SubscriptionId::new();
265 assert!(!mgr.unsubscribe(&id));
266 }
267
268 #[tokio::test]
269 async fn drop_session_keeps_other_sessions() {
270 let mgr = SubscriptionManager::new();
271 let s1 = SessionId::new();
272 let s2 = SessionId::new();
273 let (_id1, _rx1) = mgr.register(SubscriptionFilter::default(), s1.clone());
274 let (_id2, _rx2) = mgr.register(SubscriptionFilter::default(), s2);
275 assert_eq!(mgr.len(), 2);
276 mgr.drop_session(&s1);
277 assert_eq!(mgr.len(), 1);
278 }
279
280 #[test]
281 fn matches_handles_every_field_combination() {
282 let session = SessionId::new();
283 let trace = arcp_core::ids::TraceId::new("t").expect("non-empty");
284 let job = arcp_core::ids::JobId::new();
285 let stream = arcp_core::ids::StreamId::new();
286
287 let mut env = ping_for(&session);
288 env.trace_id = Some(trace.clone());
289 env.job_id = Some(job.clone());
290 env.stream_id = Some(stream.clone());
291
292 let filter = SubscriptionFilter {
293 session_id: vec![session.clone()],
294 trace_id: vec![trace],
295 job_id: vec![job],
296 stream_id: vec![stream],
297 types: vec!["ping".into()],
298 min_priority: Some(arcp_core::envelope::Priority::Low),
299 };
300 assert!(matches(&filter, &env));
301
302 let mut bare = Envelope::new(MessageType::Ping(PingPayload::default()));
304 bare.session_id = None;
305 let session_only = SubscriptionFilter {
306 session_id: vec![session],
307 ..SubscriptionFilter::default()
308 };
309 assert!(!matches(&session_only, &bare));
310 }
311
312 #[test]
313 fn debug_renders() {
314 let mgr = SubscriptionManager::new();
315 let _ = format!("{mgr:?}");
316 let s = SessionId::new();
317 let (_id, rx) = mgr.register(SubscriptionFilter::default(), s);
318 let _ = format!("{rx:?}");
319 }
320
321 #[tokio::test]
322 async fn closed_bus_makes_receiver_yield_none() {
323 let mgr = SubscriptionManager::new();
324 let s = SessionId::new();
325 let (_id, mut rx) = mgr.register(SubscriptionFilter::default(), s);
326 drop(mgr); assert!(rx.next().await.is_none());
328 }
329}