1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![cfg_attr(not(ci_arti_stable), allow(renamed_and_removed_lints))]
5#![cfg_attr(not(ci_arti_nightly), allow(unknown_lints))]
6#![deny(missing_docs)]
7#![warn(noop_method_call)]
8#![deny(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![deny(clippy::missing_panics_doc)]
25#![warn(clippy::needless_borrow)]
26#![warn(clippy::needless_pass_by_value)]
27#![warn(clippy::option_option)]
28#![deny(clippy::print_stderr)]
29#![deny(clippy::print_stdout)]
30#![warn(clippy::rc_buffer)]
31#![deny(clippy::ref_option_ref)]
32#![warn(clippy::semicolon_if_nothing_returned)]
33#![warn(clippy::trait_duplication_in_bounds)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
39#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] pub mod events;
44
45use crate::events::{TorEvent, TorEventKind};
46use async_broadcast::{InactiveReceiver, Receiver, Sender, TrySendError};
47use futures::channel::mpsc;
48use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
49use futures::future::Either;
50use futures::StreamExt;
51use once_cell::sync::OnceCell;
52use std::pin::Pin;
53use std::sync::atomic::{AtomicUsize, Ordering};
54use std::task::{Context, Poll};
55use thiserror::Error;
56use tracing::{error, warn};
57
58static EVENT_SENDER: OnceCell<UnboundedSender<TorEvent>> = OnceCell::new();
60static CURRENT_RECEIVER: OnceCell<InactiveReceiver<TorEvent>> = OnceCell::new();
62const EVENT_KIND_COUNT: usize = 1;
64static EVENT_SUBSCRIBERS: [AtomicUsize; EVENT_KIND_COUNT] = [AtomicUsize::new(0); EVENT_KIND_COUNT];
71
72pub static BROADCAST_CAPACITY: usize = 512;
74
75pub struct EventReactor {
82 receiver: UnboundedReceiver<TorEvent>,
86 broadcast: Sender<TorEvent>,
90}
91
92impl EventReactor {
93 pub fn new() -> Option<Self> {
102 let (tx, rx) = mpsc::unbounded();
103 if EVENT_SENDER.set(tx).is_ok() {
104 let (btx, brx) = async_broadcast::broadcast(BROADCAST_CAPACITY);
105 CURRENT_RECEIVER
106 .set(brx.deactivate())
107 .expect("CURRENT_RECEIVER can't be set if EVENT_SENDER is unset!");
108 Some(Self {
109 receiver: rx,
110 broadcast: btx,
111 })
112 } else {
113 None
114 }
115 }
116 pub fn receiver() -> Option<TorEventReceiver> {
121 CURRENT_RECEIVER
122 .get()
123 .map(|rx| TorEventReceiver::wrap(rx.clone()))
124 }
125 pub async fn run(mut self) {
129 while let Some(event) = self.receiver.next().await {
130 match self.broadcast.try_broadcast(event) {
131 Ok(_) => {}
132 Err(TrySendError::Closed(_)) => break,
133 Err(TrySendError::Full(event)) => {
134 warn!("TorEventReceivers aren't receiving events fast enough!");
137 if self.broadcast.broadcast(event).await.is_err() {
138 break;
139 }
140 }
141 Err(TrySendError::Inactive(_)) => {
142 }
144 }
145 }
146 error!("event reactor shutting down; this shouldn't ever happen");
149 }
150}
151
152#[derive(Clone, Debug, Error)]
154#[non_exhaustive]
155pub enum ReceiverError {
156 #[error("No event subscriptions")]
158 NoSubscriptions,
159 #[error("Internal event broadcast channel closed")]
161 ChannelClosed,
162}
163
164#[derive(Clone, Debug)]
175pub struct TorEventReceiver {
176 inner: Either<Receiver<TorEvent>, InactiveReceiver<TorEvent>>,
179 subscribed: [bool; EVENT_KIND_COUNT],
182}
183
184impl futures::stream::Stream for TorEventReceiver {
185 type Item = TorEvent;
186
187 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188 let this = self.get_mut();
189 match this.inner {
190 Either::Left(ref mut active) => loop {
191 match Pin::new(&mut *active).poll_next(cx) {
192 Poll::Ready(Some(e)) => {
193 if this.subscribed[e.kind() as usize] {
194 return Poll::Ready(Some(e));
195 }
196 }
198 x => return x,
199 }
200 },
201 Either::Right(_) => {
202 warn!("TorEventReceiver::poll_next() called without subscriptions!");
203 Poll::Ready(None)
204 }
205 }
206 }
207}
208
209impl TorEventReceiver {
210 pub(crate) fn wrap(rx: InactiveReceiver<TorEvent>) -> Self {
212 Self {
213 inner: Either::Right(rx),
214 subscribed: [false; EVENT_KIND_COUNT],
215 }
216 }
217 pub fn subscribe(&mut self, kind: TorEventKind) {
222 if !self.subscribed[kind as usize] {
223 EVENT_SUBSCRIBERS[kind as usize].fetch_add(1, Ordering::SeqCst);
224 self.subscribed[kind as usize] = true;
225 }
226 if let Either::Right(inactive) = self.inner.clone() {
228 self.inner = Either::Left(inactive.activate());
229 }
230 }
231 pub fn unsubscribe(&mut self, kind: TorEventKind) {
237 if self.subscribed[kind as usize] {
238 EVENT_SUBSCRIBERS[kind as usize].fetch_sub(1, Ordering::SeqCst);
239 self.subscribed[kind as usize] = false;
240 }
241 if self.subscribed.iter().all(|x| !*x) {
243 if let Either::Left(active) = self.inner.clone() {
245 self.inner = Either::Right(active.deactivate());
246 }
247 }
248 }
249}
250
251impl Drop for TorEventReceiver {
252 fn drop(&mut self) {
253 for (i, subscribed) in self.subscribed.iter().enumerate() {
254 if *subscribed {
257 EVENT_SUBSCRIBERS[i].fetch_sub(1, Ordering::SeqCst);
258 }
259 }
260 }
261}
262
263pub fn event_has_subscribers(kind: TorEventKind) -> bool {
269 EVENT_SUBSCRIBERS[kind as usize].load(Ordering::SeqCst) > 0
270}
271
272pub fn broadcast(event: TorEvent) {
280 if !event_has_subscribers(event.kind()) {
281 return;
282 }
283 if let Some(sender) = EVENT_SENDER.get() {
284 let _ = sender.unbounded_send(event);
286 }
287}
288
289#[cfg(test)]
290mod test {
291 #![allow(clippy::bool_assert_comparison)]
293 #![allow(clippy::clone_on_copy)]
294 #![allow(clippy::dbg_macro)]
295 #![allow(clippy::print_stderr)]
296 #![allow(clippy::print_stdout)]
297 #![allow(clippy::single_char_pattern)]
298 #![allow(clippy::unwrap_used)]
299 #![allow(clippy::unchecked_duration_subtraction)]
300 use crate::{
302 broadcast, event_has_subscribers, EventReactor, StreamExt, TorEvent, TorEventKind,
303 };
304 use once_cell::sync::OnceCell;
305 use std::sync::{Mutex, MutexGuard};
306 use std::time::Duration;
307 use tokio::runtime::Runtime;
308
309 static TEST_MUTEX: OnceCell<Mutex<Runtime>> = OnceCell::new();
316
317 fn test_setup() -> MutexGuard<'static, Runtime> {
319 let mutex = TEST_MUTEX.get_or_init(|| Mutex::new(Runtime::new().unwrap()));
320 let runtime = mutex
321 .lock()
322 .expect("mutex poisoned, probably by other failing tests");
323 if let Some(reactor) = EventReactor::new() {
324 runtime.handle().spawn(reactor.run());
325 }
326 runtime
327 }
328
329 #[test]
330 fn subscriptions() {
331 let rt = test_setup();
332
333 rt.block_on(async move {
334 assert!(!event_has_subscribers(TorEventKind::Empty));
336
337 let mut rx = EventReactor::receiver().unwrap();
338 assert!(!event_has_subscribers(TorEventKind::Empty));
340
341 rx.subscribe(TorEventKind::Empty);
342 assert!(event_has_subscribers(TorEventKind::Empty));
344
345 rx.unsubscribe(TorEventKind::Empty);
346 assert!(!event_has_subscribers(TorEventKind::Empty));
348
349 rx.subscribe(TorEventKind::Empty);
351 rx.subscribe(TorEventKind::Empty);
352 rx.subscribe(TorEventKind::Empty);
353 assert!(event_has_subscribers(TorEventKind::Empty));
354
355 rx.unsubscribe(TorEventKind::Empty);
356 assert!(!event_has_subscribers(TorEventKind::Empty));
357
358 rx.subscribe(TorEventKind::Empty);
359 assert!(event_has_subscribers(TorEventKind::Empty));
360
361 std::mem::drop(rx);
362 assert!(!event_has_subscribers(TorEventKind::Empty));
364 });
365 }
366
367 #[test]
368 fn empty_recv() {
369 let rt = test_setup();
370
371 rt.block_on(async move {
372 let mut rx = EventReactor::receiver().unwrap();
373 let result = rx.next().await;
375 assert!(result.is_none());
376 });
377 }
378
379 #[test]
380 fn receives_events() {
381 let rt = test_setup();
382
383 rt.block_on(async move {
384 let mut rx = EventReactor::receiver().unwrap();
385 rx.subscribe(TorEventKind::Empty);
386 tokio::time::sleep(Duration::from_millis(100)).await;
388 broadcast(TorEvent::Empty);
389
390 let result = rx.next().await;
391 assert_eq!(result, Some(TorEvent::Empty));
392 });
393 }
394
395 #[test]
396 fn does_not_send_to_no_subscribers() {
397 let rt = test_setup();
398
399 rt.block_on(async move {
400 broadcast(TorEvent::Empty);
402
403 let mut rx = EventReactor::receiver().unwrap();
404 rx.subscribe(TorEventKind::Empty);
405
406 let result = tokio::time::timeout(Duration::from_millis(100), rx.next()).await;
408 assert!(result.is_err());
409 });
410 }
411}