tor_events/
lib.rs

1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![cfg_attr(not(ci_arti_stable), allow(renamed_and_removed_lints))]
5#![cfg_attr(not(ci_arti_nightly), allow(unknown_lints))]
6#![deny(missing_docs)]
7#![warn(noop_method_call)]
8#![deny(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![deny(clippy::missing_panics_doc)]
25#![warn(clippy::needless_borrow)]
26#![warn(clippy::needless_pass_by_value)]
27#![warn(clippy::option_option)]
28#![deny(clippy::print_stderr)]
29#![deny(clippy::print_stdout)]
30#![warn(clippy::rc_buffer)]
31#![deny(clippy::ref_option_ref)]
32#![warn(clippy::semicolon_if_nothing_returned)]
33#![warn(clippy::trait_duplication_in_bounds)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
38#![allow(clippy::uninlined_format_args)]
39#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
40#![allow(clippy::result_large_err)] // temporary workaround for arti#587
41//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
42
43pub mod events;
44
45use crate::events::{TorEvent, TorEventKind};
46use async_broadcast::{InactiveReceiver, Receiver, Sender, TrySendError};
47use futures::channel::mpsc;
48use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
49use futures::future::Either;
50use futures::StreamExt;
51use once_cell::sync::OnceCell;
52use std::pin::Pin;
53use std::sync::atomic::{AtomicUsize, Ordering};
54use std::task::{Context, Poll};
55use thiserror::Error;
56use tracing::{error, warn};
57
58/// Pointer to an `UnboundedSender`, used to send events into the `EventReactor`.
59static EVENT_SENDER: OnceCell<UnboundedSender<TorEvent>> = OnceCell::new();
60/// An inactive receiver for the currently active broadcast channel, if there is one.
61static CURRENT_RECEIVER: OnceCell<InactiveReceiver<TorEvent>> = OnceCell::new();
62/// The number of `TorEventKind`s there are.
63const EVENT_KIND_COUNT: usize = 1;
64/// An array containing one `AtomicUsize` for each `TorEventKind`, used to track subscriptions.
65///
66/// When a `TorEventReceiver` subscribes to a `TorEventKind`, it uses its `usize` value to index
67/// into this array and increment the associated `AtomicUsize` (and decrements it to unsubscribe).
68/// This lets event emitters check whether there are any subscribers, and avoid emitting events
69/// if there aren't.
70static EVENT_SUBSCRIBERS: [AtomicUsize; EVENT_KIND_COUNT] = [AtomicUsize::new(0); EVENT_KIND_COUNT];
71
72/// The size of the internal broadcast channel used to implement event subscription.
73pub static BROADCAST_CAPACITY: usize = 512;
74
75/// A reactor used to forward events to make the event reporting system work.
76///
77/// # Note
78///
79/// Currently, this type is a singleton; there is one event reporting system used for the entire
80/// program. This is not stable, and may change in future.
81pub struct EventReactor {
82    /// A receiver that the reactor uses to learn about incoming events.
83    ///
84    /// This is unbounded so that event publication doesn't have to be async.
85    receiver: UnboundedReceiver<TorEvent>,
86    /// A sender that the reactor uses to publish events.
87    ///
88    /// Events are only sent here if at least one subscriber currently wants them.
89    broadcast: Sender<TorEvent>,
90}
91
92impl EventReactor {
93    /// Initialize the event reporting system, returning a reactor that must be run for it to work,
94    /// and a `TorEventReceiver` that can be used to extract events from the system. If the system
95    /// has already been initialized, returns `None` instead of a reactor.
96    ///
97    /// # Warnings
98    ///
99    /// The returned reactor *must* be run with `EventReactor::run`, in a background async task.
100    /// If it is not, the event system might consume unbounded amounts of memory.
101    pub fn new() -> Option<Self> {
102        let (tx, rx) = mpsc::unbounded();
103        if EVENT_SENDER.set(tx).is_ok() {
104            let (btx, brx) = async_broadcast::broadcast(BROADCAST_CAPACITY);
105            CURRENT_RECEIVER
106                .set(brx.deactivate())
107                .expect("CURRENT_RECEIVER can't be set if EVENT_SENDER is unset!");
108            Some(Self {
109                receiver: rx,
110                broadcast: btx,
111            })
112        } else {
113            None
114        }
115    }
116    /// Get a `TorEventReceiver` to receive events from, assuming an `EventReactor` is already
117    /// running somewhere. (If it isn't, returns `None`.)
118    ///
119    /// As noted in the type-level documentation, this function might not always work this way.
120    pub fn receiver() -> Option<TorEventReceiver> {
121        CURRENT_RECEIVER
122            .get()
123            .map(|rx| TorEventReceiver::wrap(rx.clone()))
124    }
125    /// Run the event forwarding reactor.
126    ///
127    /// You *must* call this function once a reactor is created.
128    pub async fn run(mut self) {
129        while let Some(event) = self.receiver.next().await {
130            match self.broadcast.try_broadcast(event) {
131                Ok(_) => {}
132                Err(TrySendError::Closed(_)) => break,
133                Err(TrySendError::Full(event)) => {
134                    // If the channel is full, do a blocking broadcast to wait for it to be
135                    // not full, and log a warning about receivers lagging behind.
136                    warn!("TorEventReceivers aren't receiving events fast enough!");
137                    if self.broadcast.broadcast(event).await.is_err() {
138                        break;
139                    }
140                }
141                Err(TrySendError::Inactive(_)) => {
142                    // no active receivers, so just drop the event on the floor.
143                }
144            }
145        }
146        // It shouldn't be possible to get here, since we have globals keeping the channels
147        // open. Still, if we somehow do, log an error about it.
148        error!("event reactor shutting down; this shouldn't ever happen");
149    }
150}
151
152/// An error encountered when trying to receive a `TorEvent`.
153#[derive(Clone, Debug, Error)]
154#[non_exhaustive]
155pub enum ReceiverError {
156    /// The receiver isn't subscribed to anything, so wouldn't ever return any events.
157    #[error("No event subscriptions")]
158    NoSubscriptions,
159    /// The internal broadcast channel was closed, which shouldn't ever happen.
160    #[error("Internal event broadcast channel closed")]
161    ChannelClosed,
162}
163
164/// A receiver for `TorEvent`s emitted by other users of this crate.
165///
166/// To use this type, first subscribe to some kinds of event by calling
167/// `TorEventReceiver::subscribe`. Then, consume events using the implementation of
168/// `futures::stream::Stream`.
169///
170/// # Warning
171///
172/// Once interest in events has been signalled with `subscribe`, events must be continuously
173/// read from the receiver in order to avoid excessive memory consumption.
174#[derive(Clone, Debug)]
175pub struct TorEventReceiver {
176    /// If no events have been subscribed to yet, this is an `InactiveReceiver`; otherwise,
177    /// it's a `Receiver`.
178    inner: Either<Receiver<TorEvent>, InactiveReceiver<TorEvent>>,
179    /// Whether we're subscribed to each event kind (if `subscribed[kind]` is true, we're
180    /// subscribed to `kind`).
181    subscribed: [bool; EVENT_KIND_COUNT],
182}
183
184impl futures::stream::Stream for TorEventReceiver {
185    type Item = TorEvent;
186
187    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188        let this = self.get_mut();
189        match this.inner {
190            Either::Left(ref mut active) => loop {
191                match Pin::new(&mut *active).poll_next(cx) {
192                    Poll::Ready(Some(e)) => {
193                        if this.subscribed[e.kind() as usize] {
194                            return Poll::Ready(Some(e));
195                        }
196                        // loop, since we weren't subscribed to that event
197                    }
198                    x => return x,
199                }
200            },
201            Either::Right(_) => {
202                warn!("TorEventReceiver::poll_next() called without subscriptions!");
203                Poll::Ready(None)
204            }
205        }
206    }
207}
208
209impl TorEventReceiver {
210    /// Create a `TorEventReceiver` from an `InactiveReceiver` handle.
211    pub(crate) fn wrap(rx: InactiveReceiver<TorEvent>) -> Self {
212        Self {
213            inner: Either::Right(rx),
214            subscribed: [false; EVENT_KIND_COUNT],
215        }
216    }
217    /// Subscribe to a given kind of `TorEvent`.
218    ///
219    /// After calling this function, `TorEventReceiver::recv` will emit events of that kind.
220    /// This function is idempotent (subscribing twice has the same effect as doing so once).
221    pub fn subscribe(&mut self, kind: TorEventKind) {
222        if !self.subscribed[kind as usize] {
223            EVENT_SUBSCRIBERS[kind as usize].fetch_add(1, Ordering::SeqCst);
224            self.subscribed[kind as usize] = true;
225        }
226        // FIXME(eta): cloning is ungood, but hard to avoid
227        if let Either::Right(inactive) = self.inner.clone() {
228            self.inner = Either::Left(inactive.activate());
229        }
230    }
231    /// Unsubscribe from a given kind of `TorEvent`.
232    ///
233    /// After calling this function, `TorEventReceiver::recv` will no longer emit events of that
234    /// kind.
235    /// This function is idempotent (unsubscribing twice has the same effect as doing so once).
236    pub fn unsubscribe(&mut self, kind: TorEventKind) {
237        if self.subscribed[kind as usize] {
238            EVENT_SUBSCRIBERS[kind as usize].fetch_sub(1, Ordering::SeqCst);
239            self.subscribed[kind as usize] = false;
240        }
241        // If we're now not subscribed to anything, deactivate our channel.
242        if self.subscribed.iter().all(|x| !*x) {
243            // FIXME(eta): cloning is ungood, but hard to avoid
244            if let Either::Left(active) = self.inner.clone() {
245                self.inner = Either::Right(active.deactivate());
246            }
247        }
248    }
249}
250
251impl Drop for TorEventReceiver {
252    fn drop(&mut self) {
253        for (i, subscribed) in self.subscribed.iter().enumerate() {
254            // FIXME(eta): duplicates logic from Self::unsubscribe, because it's not possible
255            //             to go from a `usize` to a `TorEventKind`
256            if *subscribed {
257                EVENT_SUBSCRIBERS[i].fetch_sub(1, Ordering::SeqCst);
258            }
259        }
260    }
261}
262
263/// Returns a boolean indicating whether the event `kind` has any subscribers (as in,
264/// whether `TorEventReceiver::subscribe` has been called with that event kind).
265///
266/// This is useful to avoid doing work to generate events that might be computationally expensive
267/// to generate.
268pub fn event_has_subscribers(kind: TorEventKind) -> bool {
269    EVENT_SUBSCRIBERS[kind as usize].load(Ordering::SeqCst) > 0
270}
271
272/// Broadcast the given `TorEvent` to any interested subscribers.
273///
274/// As an optimization, does nothing if the event has no subscribers (`event_has_subscribers`
275/// returns false). (also does nothing if the event subsystem hasn't been initialized yet)
276///
277/// This function isn't intended for use outside Arti crates (as in, library consumers of Arti
278/// shouldn't broadcast events!).
279pub fn broadcast(event: TorEvent) {
280    if !event_has_subscribers(event.kind()) {
281        return;
282    }
283    if let Some(sender) = EVENT_SENDER.get() {
284        // If this fails, there isn't much we can really do about it!
285        let _ = sender.unbounded_send(event);
286    }
287}
288
289#[cfg(test)]
290mod test {
291    // @@ begin test lint list maintained by maint/add_warning @@
292    #![allow(clippy::bool_assert_comparison)]
293    #![allow(clippy::clone_on_copy)]
294    #![allow(clippy::dbg_macro)]
295    #![allow(clippy::print_stderr)]
296    #![allow(clippy::print_stdout)]
297    #![allow(clippy::single_char_pattern)]
298    #![allow(clippy::unwrap_used)]
299    #![allow(clippy::unchecked_duration_subtraction)]
300    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
301    use crate::{
302        broadcast, event_has_subscribers, EventReactor, StreamExt, TorEvent, TorEventKind,
303    };
304    use once_cell::sync::OnceCell;
305    use std::sync::{Mutex, MutexGuard};
306    use std::time::Duration;
307    use tokio::runtime::Runtime;
308
309    // HACK(eta): these tests need to run effectively singlethreaded, since they mutate global
310    //            state. They *also* need to share the same tokio runtime, which the
311    //            #[tokio::test] thing doesn't do (it makes a new runtime per test), because of
312    //            the need to have a background singleton EventReactor.
313    //
314    //            To hack around this, we just have a global runtime protected by a mutex!
315    static TEST_MUTEX: OnceCell<Mutex<Runtime>> = OnceCell::new();
316
317    /// Locks the mutex, and makes sure the event reactor is initialized.
318    fn test_setup() -> MutexGuard<'static, Runtime> {
319        let mutex = TEST_MUTEX.get_or_init(|| Mutex::new(Runtime::new().unwrap()));
320        let runtime = mutex
321            .lock()
322            .expect("mutex poisoned, probably by other failing tests");
323        if let Some(reactor) = EventReactor::new() {
324            runtime.handle().spawn(reactor.run());
325        }
326        runtime
327    }
328
329    #[test]
330    fn subscriptions() {
331        let rt = test_setup();
332
333        rt.block_on(async move {
334            // shouldn't have any subscribers at the start
335            assert!(!event_has_subscribers(TorEventKind::Empty));
336
337            let mut rx = EventReactor::receiver().unwrap();
338            // creating a receiver shouldn't result in any subscriptions
339            assert!(!event_has_subscribers(TorEventKind::Empty));
340
341            rx.subscribe(TorEventKind::Empty);
342            // subscription should work
343            assert!(event_has_subscribers(TorEventKind::Empty));
344
345            rx.unsubscribe(TorEventKind::Empty);
346            // unsubscribing should work
347            assert!(!event_has_subscribers(TorEventKind::Empty));
348
349            // subscription should be idempotent
350            rx.subscribe(TorEventKind::Empty);
351            rx.subscribe(TorEventKind::Empty);
352            rx.subscribe(TorEventKind::Empty);
353            assert!(event_has_subscribers(TorEventKind::Empty));
354
355            rx.unsubscribe(TorEventKind::Empty);
356            assert!(!event_has_subscribers(TorEventKind::Empty));
357
358            rx.subscribe(TorEventKind::Empty);
359            assert!(event_has_subscribers(TorEventKind::Empty));
360
361            std::mem::drop(rx);
362            // dropping the receiver should auto-unsubscribe
363            assert!(!event_has_subscribers(TorEventKind::Empty));
364        });
365    }
366
367    #[test]
368    fn empty_recv() {
369        let rt = test_setup();
370
371        rt.block_on(async move {
372            let mut rx = EventReactor::receiver().unwrap();
373            // attempting to read from a receiver with no subscriptions should return None
374            let result = rx.next().await;
375            assert!(result.is_none());
376        });
377    }
378
379    #[test]
380    fn receives_events() {
381        let rt = test_setup();
382
383        rt.block_on(async move {
384            let mut rx = EventReactor::receiver().unwrap();
385            rx.subscribe(TorEventKind::Empty);
386            // HACK(eta): give the event reactor time to run
387            tokio::time::sleep(Duration::from_millis(100)).await;
388            broadcast(TorEvent::Empty);
389
390            let result = rx.next().await;
391            assert_eq!(result, Some(TorEvent::Empty));
392        });
393    }
394
395    #[test]
396    fn does_not_send_to_no_subscribers() {
397        let rt = test_setup();
398
399        rt.block_on(async move {
400            // this event should just get dropped on the floor, because no subscribers exist
401            broadcast(TorEvent::Empty);
402
403            let mut rx = EventReactor::receiver().unwrap();
404            rx.subscribe(TorEventKind::Empty);
405
406            // this shouldn't have an event to receive now
407            let result = tokio::time::timeout(Duration::from_millis(100), rx.next()).await;
408            assert!(result.is_err());
409        });
410    }
411}