Skip to main content

antenna_client_shared/
dispatcher.rs

1use antenna_protocol::{PeerID, UserMsgPayload};
2use anyhow::Result;
3use std::collections::{HashMap, HashSet};
4
5/// Driver-emitted event — the data side of a notification.
6///
7/// Drivers emit `EventType` values into [`RtcCallbacks::emit`], which fans
8/// them out to user-registered [`Event`] subscriptions.
9#[derive(Clone)]
10pub enum EventType<Msg: UserMsgPayload> {
11    Connected,
12    UserMessage(PeerID, Msg),
13    Disconnected,
14    PeerConnected(PeerID),
15    PeerDisconnected(PeerID),
16    PeerDropped(PeerID),
17    Available,
18    Unavailable,
19}
20
21/// Callback for events that carry no payload (`Connected`, `Available`, …).
22pub struct NoArgCallback(Box<dyn Fn() -> Result<()> + Send + Sync>);
23
24impl NoArgCallback {
25    pub fn from_fn<F>(f: F) -> Self
26    where
27        F: Fn() -> Result<()> + Send + Sync + 'static,
28    {
29        Self(Box::new(f))
30    }
31
32    pub fn call(&self) -> Result<()> {
33        (self.0)()
34    }
35}
36
37impl From<fn()> for NoArgCallback {
38    fn from(f: fn()) -> Self {
39        Self(Box::new(move || {
40            f();
41            Ok(())
42        }))
43    }
44}
45
46type PeerCallbackFn = dyn Fn(&PeerID) -> Result<()> + Send + Sync;
47/// Callback for peer-scoped events (`PeerConnected`, `PeerDisconnected`, `PeerLost`).
48pub struct PeerCallback(Box<PeerCallbackFn>);
49
50impl PeerCallback {
51    pub fn from_fn<F>(f: F) -> Self
52    where
53        F: Fn(&PeerID) -> Result<()> + Send + Sync + 'static,
54    {
55        Self(Box::new(f))
56    }
57
58    pub fn call(&self, peer: &PeerID) -> Result<()> {
59        (self.0)(peer)
60    }
61}
62
63impl From<fn(PeerID)> for PeerCallback {
64    fn from(f: fn(PeerID)) -> Self {
65        Self(Box::new(move |peer| {
66            f(peer.clone());
67            Ok(())
68        }))
69    }
70}
71
72type MessageCallbackFn<Msg> = dyn Fn(&PeerID, &Msg) -> Result<()> + Send + Sync;
73/// Callback for `UserMessage` events — receives the sender [`PeerID`] and message body.
74pub struct MessageCallback<Msg: UserMsgPayload>(Box<MessageCallbackFn<Msg>>);
75
76impl<Msg: UserMsgPayload> MessageCallback<Msg> {
77    pub fn from_fn<F>(f: F) -> Self
78    where
79        F: Fn(&PeerID, &Msg) -> Result<()> + Send + Sync + 'static,
80    {
81        Self(Box::new(f))
82    }
83
84    pub fn call(&self, peer: &PeerID, data: &Msg) -> Result<()> {
85        (self.0)(peer, data)
86    }
87}
88
89impl<Msg: UserMsgPayload + 'static> From<fn(PeerID, Msg)> for MessageCallback<Msg> {
90    fn from(f: fn(PeerID, Msg)) -> Self {
91        Self(Box::new(move |peer, data| {
92            f(peer.clone(), data.clone());
93            Ok(())
94        }))
95    }
96}
97
98/// User-facing subscription — pairs an event kind with the callback to run.
99///
100/// Pass to `Peer::subscribe` to register; the returned id is what
101/// `Peer::unsubscribe` consumes.
102pub enum Event<Msg: UserMsgPayload> {
103    /// Local node connected to its first peer.
104    Connected(NoArgCallback),
105    /// Message arrived from a remote peer.
106    UserMessage(MessageCallback<Msg>),
107    /// Local node left the mesh.
108    Disconnected(NoArgCallback),
109    /// Remote peer joined the mesh.
110    PeerConnected(PeerCallback),
111    /// Remote peer left gracefully.
112    PeerDisconnected(PeerCallback),
113    /// Remote peer dropped abruptly (reconnect will be attempted).
114    PeerLost(PeerCallback),
115    /// All in-progress relay handshakes settled — node is fully meshed.
116    Available(NoArgCallback),
117    /// At least one relay handshake is in progress, or no peers connected.
118    Unavailable(NoArgCallback),
119}
120
121#[derive(Clone, Copy, PartialEq, Eq, Hash)]
122enum SubscriptionKind {
123    Connected,
124    UserMessage,
125    Disconnected,
126    PeerConnected,
127    PeerDisconnected,
128    PeerLost,
129    Available,
130    Unavailable,
131}
132
133impl<Msg: UserMsgPayload> Event<Msg> {
134    fn kind(&self) -> SubscriptionKind {
135        match self {
136            Self::Connected(_) => SubscriptionKind::Connected,
137            Self::UserMessage(_) => SubscriptionKind::UserMessage,
138            Self::Disconnected(_) => SubscriptionKind::Disconnected,
139            Self::PeerConnected(_) => SubscriptionKind::PeerConnected,
140            Self::PeerDisconnected(_) => SubscriptionKind::PeerDisconnected,
141            Self::PeerLost(_) => SubscriptionKind::PeerLost,
142            Self::Available(_) => SubscriptionKind::Available,
143            Self::Unavailable(_) => SubscriptionKind::Unavailable,
144        }
145    }
146}
147
148fn event_kind<Msg: UserMsgPayload>(event: &EventType<Msg>) -> SubscriptionKind {
149    match event {
150        EventType::Connected => SubscriptionKind::Connected,
151        EventType::UserMessage(_, _) => SubscriptionKind::UserMessage,
152        EventType::Disconnected => SubscriptionKind::Disconnected,
153        EventType::PeerConnected(_) => SubscriptionKind::PeerConnected,
154        EventType::PeerDisconnected(_) => SubscriptionKind::PeerDisconnected,
155        EventType::PeerDropped(_) => SubscriptionKind::PeerLost,
156        EventType::Available => SubscriptionKind::Available,
157        EventType::Unavailable => SubscriptionKind::Unavailable,
158    }
159}
160
161/// Subscription registry that backs `Peer::subscribe` / `Peer::unsubscribe`.
162///
163/// Drivers own one instance and call [`Self::emit`] on every protocol event;
164/// it routes to all matching user callbacks.
165pub struct RtcCallbacks<Msg: UserMsgPayload> {
166    next_callback_id: u64,
167    subscriptions: HashMap<u64, Event<Msg>>,
168    subscriptions_by_kind: HashMap<SubscriptionKind, HashSet<u64>>,
169}
170
171impl<Msg: UserMsgPayload> Default for RtcCallbacks<Msg> {
172    fn default() -> Self {
173        Self::new()
174    }
175}
176
177impl<Msg: UserMsgPayload> RtcCallbacks<Msg> {
178    pub fn new() -> Self {
179        Self {
180            next_callback_id: 1,
181            subscriptions: HashMap::new(),
182            subscriptions_by_kind: HashMap::new(),
183        }
184    }
185
186    fn next_id(&mut self) -> u64 {
187        let id = self.next_callback_id;
188        self.next_callback_id += 1;
189        id
190    }
191
192    pub fn subscribe(&mut self, subscription: Event<Msg>) -> u64 {
193        let id = self.next_id();
194        let kind = subscription.kind();
195
196        self.subscriptions.insert(id, subscription);
197        self.subscriptions_by_kind
198            .entry(kind)
199            .or_default()
200            .insert(id);
201
202        id
203    }
204
205    pub fn unsubscribe(&mut self, id: u64) -> bool {
206        let Some(subscription) = self.subscriptions.remove(&id) else {
207            return false;
208        };
209
210        let kind = subscription.kind();
211        if let Some(ids) = self.subscriptions_by_kind.get_mut(&kind) {
212            ids.remove(&id);
213            if ids.is_empty() {
214                self.subscriptions_by_kind.remove(&kind);
215            }
216        }
217        true
218    }
219}
220
221impl<Msg: UserMsgPayload> RtcCallbacks<Msg> {
222    pub fn emit(&self, event: EventType<Msg>) -> Result<()> {
223        let kind = event_kind(&event);
224        let Some(ids) = self.subscriptions_by_kind.get(&kind) else {
225            return Ok(());
226        };
227
228        let ids: Vec<u64> = ids.iter().copied().collect();
229
230        for id in ids {
231            let Some(subscription) = self.subscriptions.get(&id) else {
232                continue;
233            };
234            match (subscription, &event) {
235                (Event::Connected(cb), EventType::Connected) => cb.call()?,
236                (Event::UserMessage(cb), EventType::UserMessage(peer, data)) => {
237                    cb.call(peer, data)?
238                }
239                (Event::Disconnected(cb), EventType::Disconnected) => cb.call()?,
240                (Event::PeerConnected(cb), EventType::PeerConnected(peer)) => cb.call(peer)?,
241                (Event::PeerDisconnected(cb), EventType::PeerDisconnected(peer)) => {
242                    cb.call(peer)?
243                }
244                (Event::PeerLost(cb), EventType::PeerDropped(peer)) => cb.call(peer)?,
245                (Event::Available(cb), EventType::Available) => cb.call()?,
246                (Event::Unavailable(cb), EventType::Unavailable) => cb.call()?,
247                _ => {}
248            }
249        }
250
251        Ok(())
252    }
253}