Skip to main content

arcp_runtime/runtime/
subscription.rs

1//! Subscription manager (RFC §13).
2//!
3//! Phase 5 ships a lightweight implementation: every accepted envelope is
4//! published into a `tokio::sync::broadcast` channel. Subscriptions filter
5//! the live tail by type / `session_id` / `job_id`; backfill replays from
6//! the event log. Rich filter authorisation (PLAN.md §A4.10) is reserved
7//! for a follow-up phase.
8
9use std::sync::Arc;
10
11use dashmap::DashMap;
12use tokio::sync::broadcast;
13
14use arcp_core::envelope::Envelope;
15use arcp_core::ids::{SessionId, SubscriptionId};
16use arcp_core::messages::SubscriptionFilter;
17
18const BROADCAST_CAPACITY: usize = 1024;
19
20/// Map of active subscriptions, keyed by `SubscriptionId`.
21#[derive(Clone)]
22pub struct SubscriptionManager {
23    inner: Arc<Inner>,
24}
25
26struct Inner {
27    bus: broadcast::Sender<Envelope>,
28    subs: DashMap<SubscriptionId, ActiveSubscription>,
29}
30
31#[derive(Clone)]
32struct ActiveSubscription {
33    /// Filter, retained for re-binding scenarios — future phases will allow
34    /// querying filters and rebuilding receivers after replays.
35    #[allow(dead_code)]
36    filter: SubscriptionFilter,
37    /// Owning session — used for tear-down on session eviction.
38    session_id: SessionId,
39}
40
41impl std::fmt::Debug for SubscriptionManager {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("SubscriptionManager")
44            .field("active", &self.inner.subs.len())
45            .finish()
46    }
47}
48
49impl Default for SubscriptionManager {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl SubscriptionManager {
56    /// Construct a new manager.
57    #[must_use]
58    pub fn new() -> Self {
59        let (bus, _drop_initial_receiver) = broadcast::channel(BROADCAST_CAPACITY);
60        Self {
61            inner: Arc::new(Inner {
62                bus,
63                subs: DashMap::new(),
64            }),
65        }
66    }
67
68    /// Publish `envelope` to all subscribers; lossy under backpressure.
69    /// Returns the number of subscribers the message was delivered to.
70    #[must_use]
71    pub fn publish(&self, envelope: &Envelope) -> usize {
72        // broadcast::send returns the receiver count even when there are
73        // no live receivers (it returns Err in that case); collapse both.
74        self.inner.bus.send(envelope.clone()).unwrap_or(0)
75    }
76
77    /// Register a new subscription. Returns the new id and a receiver.
78    #[must_use]
79    pub fn register(
80        &self,
81        filter: SubscriptionFilter,
82        session_id: SessionId,
83    ) -> (SubscriptionId, FilteredReceiver) {
84        let id = SubscriptionId::new();
85        let rx = self.inner.bus.subscribe();
86        self.inner.subs.insert(
87            id.clone(),
88            ActiveSubscription {
89                filter: filter.clone(),
90                session_id,
91            },
92        );
93        (id, FilteredReceiver { inner: rx, filter })
94    }
95
96    /// Tear down a subscription. Returns whether it existed.
97    #[must_use]
98    pub fn unsubscribe(&self, id: &SubscriptionId) -> bool {
99        self.inner.subs.remove(id).is_some()
100    }
101
102    /// Drop every subscription owned by `session_id` (e.g. on eviction).
103    pub fn drop_session(&self, session_id: &SessionId) {
104        self.inner.subs.retain(|_, s| s.session_id != *session_id);
105    }
106
107    /// Number of active subscriptions.
108    #[must_use]
109    pub fn len(&self) -> usize {
110        self.inner.subs.len()
111    }
112
113    /// True if no active subscriptions exist.
114    #[must_use]
115    pub fn is_empty(&self) -> bool {
116        self.inner.subs.is_empty()
117    }
118}
119
120/// Receiver that yields envelopes matching a [`SubscriptionFilter`].
121pub struct FilteredReceiver {
122    inner: broadcast::Receiver<Envelope>,
123    filter: SubscriptionFilter,
124}
125
126impl std::fmt::Debug for FilteredReceiver {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        f.debug_struct("FilteredReceiver").finish_non_exhaustive()
129    }
130}
131
132impl FilteredReceiver {
133    /// Receive the next matching envelope. Skips over envelopes that don't
134    /// match the filter and over lagged broadcasts.
135    ///
136    /// Returns `None` when the underlying channel is closed.
137    pub async fn next(&mut self) -> Option<Envelope> {
138        loop {
139            match self.inner.recv().await {
140                Ok(env) => {
141                    if matches(&self.filter, &env) {
142                        return Some(env);
143                    }
144                }
145                Err(broadcast::error::RecvError::Lagged(_)) => {}
146                Err(broadcast::error::RecvError::Closed) => return None,
147            }
148        }
149    }
150}
151
152/// True if `envelope` satisfies the filter (AND across fields, OR within
153/// list-valued fields).
154#[must_use]
155pub fn matches(filter: &SubscriptionFilter, envelope: &Envelope) -> bool {
156    if !filter.session_id.is_empty() {
157        let Some(s) = envelope.session_id.as_ref() else {
158            return false;
159        };
160        if !filter.session_id.contains(s) {
161            return false;
162        }
163    }
164    if !filter.trace_id.is_empty() {
165        let Some(t) = envelope.trace_id.as_ref() else {
166            return false;
167        };
168        if !filter.trace_id.contains(t) {
169            return false;
170        }
171    }
172    if !filter.job_id.is_empty() {
173        let Some(j) = envelope.job_id.as_ref() else {
174            return false;
175        };
176        if !filter.job_id.contains(j) {
177            return false;
178        }
179    }
180    if !filter.stream_id.is_empty() {
181        let Some(s) = envelope.stream_id.as_ref() else {
182            return false;
183        };
184        if !filter.stream_id.contains(s) {
185            return false;
186        }
187    }
188    if !filter.types.is_empty() {
189        let t = envelope.payload.type_name();
190        if !filter.types.iter().any(|filt| filt == t) {
191            return false;
192        }
193    }
194    if let Some(min) = filter.min_priority {
195        if priority_rank(envelope.priority) < priority_rank(min) {
196            return false;
197        }
198    }
199    true
200}
201
202const fn priority_rank(p: arcp_core::envelope::Priority) -> u8 {
203    match p {
204        arcp_core::envelope::Priority::Low => 0,
205        arcp_core::envelope::Priority::Normal => 1,
206        arcp_core::envelope::Priority::High => 2,
207        arcp_core::envelope::Priority::Critical => 3,
208    }
209}
210
211#[cfg(test)]
212#[allow(
213    clippy::expect_used,
214    clippy::unwrap_used,
215    clippy::panic,
216    clippy::missing_panics_doc
217)]
218mod tests {
219    use super::*;
220    use arcp_core::envelope::Envelope;
221    use arcp_core::ids::SessionId;
222    use arcp_core::messages::{MessageType, PingPayload};
223
224    fn ping_for(session: &SessionId) -> Envelope {
225        let mut env = Envelope::new(MessageType::Ping(PingPayload::default()));
226        env.session_id = Some(session.clone());
227        env
228    }
229
230    #[tokio::test]
231    async fn subscription_filters_by_session_id() {
232        let mgr = SubscriptionManager::new();
233        let s1 = SessionId::new();
234        let s2 = SessionId::new();
235        let filter = SubscriptionFilter {
236            session_id: vec![s1.clone()],
237            ..SubscriptionFilter::default()
238        };
239        let (_id, mut rx) = mgr.register(filter, s1.clone());
240
241        let _ = mgr.publish(&ping_for(&s2)); // should be filtered out
242        let _ = mgr.publish(&ping_for(&s1)); // should pass
243
244        let env = tokio::time::timeout(std::time::Duration::from_millis(100), rx.next())
245            .await
246            .expect("timely")
247            .expect("envelope");
248        assert_eq!(env.session_id.as_ref(), Some(&s1));
249    }
250
251    #[tokio::test]
252    async fn unsubscribe_removes_entry() {
253        let mgr = SubscriptionManager::new();
254        let s = SessionId::new();
255        let (id, _rx) = mgr.register(SubscriptionFilter::default(), s);
256        assert_eq!(mgr.len(), 1);
257        assert!(mgr.unsubscribe(&id));
258        assert!(mgr.is_empty());
259    }
260
261    #[tokio::test]
262    async fn unsubscribe_returns_false_for_unknown_id() {
263        let mgr = SubscriptionManager::new();
264        let id = SubscriptionId::new();
265        assert!(!mgr.unsubscribe(&id));
266    }
267
268    #[tokio::test]
269    async fn drop_session_keeps_other_sessions() {
270        let mgr = SubscriptionManager::new();
271        let s1 = SessionId::new();
272        let s2 = SessionId::new();
273        let (_id1, _rx1) = mgr.register(SubscriptionFilter::default(), s1.clone());
274        let (_id2, _rx2) = mgr.register(SubscriptionFilter::default(), s2);
275        assert_eq!(mgr.len(), 2);
276        mgr.drop_session(&s1);
277        assert_eq!(mgr.len(), 1);
278    }
279
280    #[test]
281    fn matches_handles_every_field_combination() {
282        let session = SessionId::new();
283        let trace = arcp_core::ids::TraceId::new("t").expect("non-empty");
284        let job = arcp_core::ids::JobId::new();
285        let stream = arcp_core::ids::StreamId::new();
286
287        let mut env = ping_for(&session);
288        env.trace_id = Some(trace.clone());
289        env.job_id = Some(job.clone());
290        env.stream_id = Some(stream.clone());
291
292        let filter = SubscriptionFilter {
293            session_id: vec![session.clone()],
294            trace_id: vec![trace],
295            job_id: vec![job],
296            stream_id: vec![stream],
297            types: vec!["ping".into()],
298            min_priority: Some(arcp_core::envelope::Priority::Low),
299        };
300        assert!(matches(&filter, &env));
301
302        // No session id on envelope but filter requires one => no match.
303        let mut bare = Envelope::new(MessageType::Ping(PingPayload::default()));
304        bare.session_id = None;
305        let session_only = SubscriptionFilter {
306            session_id: vec![session],
307            ..SubscriptionFilter::default()
308        };
309        assert!(!matches(&session_only, &bare));
310    }
311
312    #[test]
313    fn debug_renders() {
314        let mgr = SubscriptionManager::new();
315        let _ = format!("{mgr:?}");
316        let s = SessionId::new();
317        let (_id, rx) = mgr.register(SubscriptionFilter::default(), s);
318        let _ = format!("{rx:?}");
319    }
320
321    #[tokio::test]
322    async fn closed_bus_makes_receiver_yield_none() {
323        let mgr = SubscriptionManager::new();
324        let s = SessionId::new();
325        let (_id, mut rx) = mgr.register(SubscriptionFilter::default(), s);
326        drop(mgr); // drop sender side
327        assert!(rx.next().await.is_none());
328    }
329}