Skip to main content

pushwire_client/
subscription.rs

1//! Channel subscription management.
2//!
3//! Tracks which channels the client is subscribed to and builds the
4//! `SystemOp::Subscribe` / `SystemOp::Unsubscribe` messages.
5
6use dashmap::DashSet;
7use pushwire_core::{ChannelKind, SystemOp};
8
9/// Tracks active channel subscriptions.
10pub(crate) struct SubscriptionTracker<C: ChannelKind> {
11    channels: DashSet<C>,
12}
13
14impl<C: ChannelKind> SubscriptionTracker<C> {
15    pub(crate) fn new() -> Self {
16        Self {
17            channels: DashSet::new(),
18        }
19    }
20
21    /// Add channels to the subscription set. Returns the `Subscribe` system op
22    /// if any channels were newly added.
23    pub(crate) fn subscribe(&self, channels: &[C]) -> Option<SystemOp<C>> {
24        let mut added = Vec::new();
25        for ch in channels {
26            if self.channels.insert(*ch) {
27                added.push(*ch);
28            }
29        }
30        if added.is_empty() {
31            None
32        } else {
33            Some(SystemOp::Subscribe { channels: added })
34        }
35    }
36
37    /// Remove channels from the subscription set. Returns the `Unsubscribe`
38    /// system op if any channels were actually removed.
39    pub(crate) fn unsubscribe(&self, channels: &[C]) -> Option<SystemOp<C>> {
40        let mut removed = Vec::new();
41        for ch in channels {
42            if self.channels.remove(ch).is_some() {
43                removed.push(*ch);
44            }
45        }
46        if removed.is_empty() {
47            None
48        } else {
49            Some(SystemOp::Unsubscribe { channels: removed })
50        }
51    }
52
53    /// All currently subscribed channels.
54    pub(crate) fn active(&self) -> Vec<C> {
55        self.channels.iter().map(|c| *c).collect()
56    }
57}