1use antenna_protocol::{PeerID, UserMsgPayload};
2use anyhow::Result;
3use std::collections::{HashMap, HashSet};
4
5#[derive(Clone)]
10pub enum EventType<Msg: UserMsgPayload> {
11 Connected,
12 UserMessage(PeerID, Msg),
13 Disconnected,
14 PeerConnected(PeerID),
15 PeerDisconnected(PeerID),
16 PeerDropped(PeerID),
17 Available,
18 Unavailable,
19}
20
21pub struct NoArgCallback(Box<dyn Fn() -> Result<()> + Send + Sync>);
23
24impl NoArgCallback {
25 pub fn from_fn<F>(f: F) -> Self
26 where
27 F: Fn() -> Result<()> + Send + Sync + 'static,
28 {
29 Self(Box::new(f))
30 }
31
32 pub fn call(&self) -> Result<()> {
33 (self.0)()
34 }
35}
36
37impl From<fn()> for NoArgCallback {
38 fn from(f: fn()) -> Self {
39 Self(Box::new(move || {
40 f();
41 Ok(())
42 }))
43 }
44}
45
46type PeerCallbackFn = dyn Fn(&PeerID) -> Result<()> + Send + Sync;
47pub struct PeerCallback(Box<PeerCallbackFn>);
49
50impl PeerCallback {
51 pub fn from_fn<F>(f: F) -> Self
52 where
53 F: Fn(&PeerID) -> Result<()> + Send + Sync + 'static,
54 {
55 Self(Box::new(f))
56 }
57
58 pub fn call(&self, peer: &PeerID) -> Result<()> {
59 (self.0)(peer)
60 }
61}
62
63impl From<fn(PeerID)> for PeerCallback {
64 fn from(f: fn(PeerID)) -> Self {
65 Self(Box::new(move |peer| {
66 f(peer.clone());
67 Ok(())
68 }))
69 }
70}
71
72type MessageCallbackFn<Msg> = dyn Fn(&PeerID, &Msg) -> Result<()> + Send + Sync;
73pub struct MessageCallback<Msg: UserMsgPayload>(Box<MessageCallbackFn<Msg>>);
75
76impl<Msg: UserMsgPayload> MessageCallback<Msg> {
77 pub fn from_fn<F>(f: F) -> Self
78 where
79 F: Fn(&PeerID, &Msg) -> Result<()> + Send + Sync + 'static,
80 {
81 Self(Box::new(f))
82 }
83
84 pub fn call(&self, peer: &PeerID, data: &Msg) -> Result<()> {
85 (self.0)(peer, data)
86 }
87}
88
89impl<Msg: UserMsgPayload + 'static> From<fn(PeerID, Msg)> for MessageCallback<Msg> {
90 fn from(f: fn(PeerID, Msg)) -> Self {
91 Self(Box::new(move |peer, data| {
92 f(peer.clone(), data.clone());
93 Ok(())
94 }))
95 }
96}
97
98pub enum Event<Msg: UserMsgPayload> {
103 Connected(NoArgCallback),
105 UserMessage(MessageCallback<Msg>),
107 Disconnected(NoArgCallback),
109 PeerConnected(PeerCallback),
111 PeerDisconnected(PeerCallback),
113 PeerLost(PeerCallback),
115 Available(NoArgCallback),
117 Unavailable(NoArgCallback),
119}
120
121#[derive(Clone, Copy, PartialEq, Eq, Hash)]
122enum SubscriptionKind {
123 Connected,
124 UserMessage,
125 Disconnected,
126 PeerConnected,
127 PeerDisconnected,
128 PeerLost,
129 Available,
130 Unavailable,
131}
132
133impl<Msg: UserMsgPayload> Event<Msg> {
134 fn kind(&self) -> SubscriptionKind {
135 match self {
136 Self::Connected(_) => SubscriptionKind::Connected,
137 Self::UserMessage(_) => SubscriptionKind::UserMessage,
138 Self::Disconnected(_) => SubscriptionKind::Disconnected,
139 Self::PeerConnected(_) => SubscriptionKind::PeerConnected,
140 Self::PeerDisconnected(_) => SubscriptionKind::PeerDisconnected,
141 Self::PeerLost(_) => SubscriptionKind::PeerLost,
142 Self::Available(_) => SubscriptionKind::Available,
143 Self::Unavailable(_) => SubscriptionKind::Unavailable,
144 }
145 }
146}
147
148fn event_kind<Msg: UserMsgPayload>(event: &EventType<Msg>) -> SubscriptionKind {
149 match event {
150 EventType::Connected => SubscriptionKind::Connected,
151 EventType::UserMessage(_, _) => SubscriptionKind::UserMessage,
152 EventType::Disconnected => SubscriptionKind::Disconnected,
153 EventType::PeerConnected(_) => SubscriptionKind::PeerConnected,
154 EventType::PeerDisconnected(_) => SubscriptionKind::PeerDisconnected,
155 EventType::PeerDropped(_) => SubscriptionKind::PeerLost,
156 EventType::Available => SubscriptionKind::Available,
157 EventType::Unavailable => SubscriptionKind::Unavailable,
158 }
159}
160
161pub struct RtcCallbacks<Msg: UserMsgPayload> {
166 next_callback_id: u64,
167 subscriptions: HashMap<u64, Event<Msg>>,
168 subscriptions_by_kind: HashMap<SubscriptionKind, HashSet<u64>>,
169}
170
171impl<Msg: UserMsgPayload> Default for RtcCallbacks<Msg> {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177impl<Msg: UserMsgPayload> RtcCallbacks<Msg> {
178 pub fn new() -> Self {
179 Self {
180 next_callback_id: 1,
181 subscriptions: HashMap::new(),
182 subscriptions_by_kind: HashMap::new(),
183 }
184 }
185
186 fn next_id(&mut self) -> u64 {
187 let id = self.next_callback_id;
188 self.next_callback_id += 1;
189 id
190 }
191
192 pub fn subscribe(&mut self, subscription: Event<Msg>) -> u64 {
193 let id = self.next_id();
194 let kind = subscription.kind();
195
196 self.subscriptions.insert(id, subscription);
197 self.subscriptions_by_kind
198 .entry(kind)
199 .or_default()
200 .insert(id);
201
202 id
203 }
204
205 pub fn unsubscribe(&mut self, id: u64) -> bool {
206 let Some(subscription) = self.subscriptions.remove(&id) else {
207 return false;
208 };
209
210 let kind = subscription.kind();
211 if let Some(ids) = self.subscriptions_by_kind.get_mut(&kind) {
212 ids.remove(&id);
213 if ids.is_empty() {
214 self.subscriptions_by_kind.remove(&kind);
215 }
216 }
217 true
218 }
219}
220
221impl<Msg: UserMsgPayload> RtcCallbacks<Msg> {
222 pub fn emit(&self, event: EventType<Msg>) -> Result<()> {
223 let kind = event_kind(&event);
224 let Some(ids) = self.subscriptions_by_kind.get(&kind) else {
225 return Ok(());
226 };
227
228 let ids: Vec<u64> = ids.iter().copied().collect();
229
230 for id in ids {
231 let Some(subscription) = self.subscriptions.get(&id) else {
232 continue;
233 };
234 match (subscription, &event) {
235 (Event::Connected(cb), EventType::Connected) => cb.call()?,
236 (Event::UserMessage(cb), EventType::UserMessage(peer, data)) => {
237 cb.call(peer, data)?
238 }
239 (Event::Disconnected(cb), EventType::Disconnected) => cb.call()?,
240 (Event::PeerConnected(cb), EventType::PeerConnected(peer)) => cb.call(peer)?,
241 (Event::PeerDisconnected(cb), EventType::PeerDisconnected(peer)) => {
242 cb.call(peer)?
243 }
244 (Event::PeerLost(cb), EventType::PeerDropped(peer)) => cb.call(peer)?,
245 (Event::Available(cb), EventType::Available) => cb.call()?,
246 (Event::Unavailable(cb), EventType::Unavailable) => cb.call()?,
247 _ => {}
248 }
249 }
250
251 Ok(())
252 }
253}