Skip to main content

emissary_core/
events.rs

1// Permission is hereby granted, free of charge, to any person obtaining a
2// copy of this software and associated documentation files (the "Software"),
3// to deal in the Software without restriction, including without limitation
4// the rights to use, copy, modify, merge, publish, distribute, sublicense,
5// and/or sell copies of the Software, and to permit persons to whom the
6// Software is furnished to do so, subject to the following conditions:
7//
8// The above copyright notice and this permission notice shall be included in
9// all copies or substantial portions of the Software.
10//
11// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
12// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
13// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
14// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
15// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
16// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
17// DEALINGS IN THE SOFTWARE.
18
19use crate::runtime::Runtime;
20
21#[cfg(feature = "events")]
22use futures::FutureExt;
23#[cfg(feature = "events")]
24use thingbuf::mpsc::{channel, Receiver, Sender};
25
26use alloc::{string::String, vec::Vec};
27use core::{
28    future::Future,
29    pin::Pin,
30    task::{Context, Poll},
31    time::Duration,
32};
33
34#[cfg(feature = "events")]
35use alloc::sync::Arc;
36#[cfg(feature = "events")]
37use core::{
38    mem,
39    sync::atomic::{AtomicUsize, Ordering},
40};
41
42/// Default update interval.
43#[cfg(feature = "events")]
44const UPDATE_INTERVAL: Duration = Duration::from_secs(1);
45
46/// Events emitted by [`EventSubscriber`].
47#[derive(Debug, Clone)]
48#[cfg(feature = "events")]
49enum SubsystemEvent {
50    /// Client destination has been started.
51    ClientDestinationStarted {
52        /// Name of the destination.
53        name: String,
54    },
55
56    /// Server destination has been started.
57    ServerDestinationStarted {
58        /// Name of the destination.
59        name: String,
60
61        /// Address of the destination.
62        address: String,
63    },
64}
65
66#[cfg(feature = "events")]
67impl Default for SubsystemEvent {
68    fn default() -> Self {
69        Self::ClientDestinationStarted {
70            name: String::new(),
71        }
72    }
73}
74
75/// Event handle.
76#[cfg(feature = "events")]
77pub(crate) struct EventHandle<R: Runtime> {
78    /// TX channel for sending events to [`EventSubscriber`].
79    event_tx: Sender<SubsystemEvent>,
80
81    /// Cumulative inbound bandwidth used by all transports.
82    inbound_bandwidth: Arc<AtomicUsize>,
83
84    /// Cumulative outbound bandwidth used by all transports.
85    outbound_bandwidth: Arc<AtomicUsize>,
86
87    /// Number of connected routers.
88    num_connected_routers: Arc<AtomicUsize>,
89
90    /// Number of transit tunnels.
91    num_transit_tunnels: Arc<AtomicUsize>,
92
93    /// Number of tunnel build failures, either timeouts or rejections.
94    num_tunnel_build_failures: Arc<AtomicUsize>,
95
96    /// Number of successfully built tunnels.
97    num_tunnels_built: Arc<AtomicUsize>,
98
99    /// Cumulative inbound bandwidth used by all transit tunnels.
100    transit_inbound_bandwidth: Arc<AtomicUsize>,
101
102    /// Cumulative outbound bandwidth used by all transit tunnels.
103    transit_outbound_bandwidth: Arc<AtomicUsize>,
104
105    /// Update interval.
106    update_interval: Duration,
107
108    /// Event timer.
109    timer: Option<R::Timer>,
110}
111
112#[cfg(feature = "events")]
113impl<R: Runtime> Clone for EventHandle<R> {
114    fn clone(&self) -> Self {
115        EventHandle {
116            event_tx: self.event_tx.clone(),
117            inbound_bandwidth: Arc::clone(&self.inbound_bandwidth),
118            outbound_bandwidth: Arc::clone(&self.outbound_bandwidth),
119            num_connected_routers: Arc::clone(&self.num_connected_routers),
120            num_transit_tunnels: Arc::clone(&self.num_transit_tunnels),
121            num_tunnel_build_failures: Arc::clone(&self.num_tunnel_build_failures),
122            num_tunnels_built: Arc::clone(&self.num_tunnels_built),
123            transit_inbound_bandwidth: Arc::clone(&self.transit_inbound_bandwidth),
124            transit_outbound_bandwidth: Arc::clone(&self.transit_outbound_bandwidth),
125            update_interval: self.update_interval,
126            timer: Some(R::timer(self.update_interval)),
127        }
128    }
129}
130
131/// Event handle.
132#[cfg(not(feature = "events"))]
133#[derive(Clone)]
134pub(crate) struct EventHandle<R: Runtime> {
135    /// Marker for `Runtime`.
136    _marker: core::marker::PhantomData<R>,
137}
138
139impl<R: Runtime> EventHandle<R> {
140    /// Update transit tunnel count.
141    ///
142    /// [`AtomicUsize::store()`] is used because the count is updated only by
143    /// `TransitTunnelManager`.
144    #[inline(always)]
145    pub(crate) fn num_transit_tunnels(&self, _num_tunnels: usize) {
146        #[cfg(feature = "events")]
147        self.num_transit_tunnels.store(_num_tunnels, Ordering::Release);
148    }
149
150    /// Update inbound transit tunnel bandwidth.
151    ///
152    /// [`AtomicUsize::fetch_add()`] is used because each transit tunnel keeps track
153    /// of its own bandwidth.
154    pub(crate) fn transit_inbound_bandwidth(&self, _bandwidth: usize) {
155        #[cfg(feature = "events")]
156        self.transit_inbound_bandwidth.fetch_add(_bandwidth, Ordering::Release);
157    }
158
159    /// Update outbound transit tunnel bandwidth.
160    ///
161    /// [`AtomicUsize::fetch_add()`] is used because each transit tunnel keeps track
162    /// of its own bandwidth.
163    #[inline(always)]
164    pub(crate) fn transit_outbound_bandwidth(&self, _bandwidth: usize) {
165        #[cfg(feature = "events")]
166        self.transit_outbound_bandwidth.fetch_add(_bandwidth, Ordering::Release);
167    }
168
169    /// Update inbound transport bandwidth.
170    ///
171    /// [`AtomicUsize::fetch_add()`] is used because each connection keeps track of its own
172    /// bandwidth.
173    #[inline(always)]
174    pub(crate) fn transport_inbound_bandwidth(&self, _bandwidth: usize) {
175        #[cfg(feature = "events")]
176        self.inbound_bandwidth.fetch_add(_bandwidth, Ordering::Release);
177    }
178
179    /// Update outbound transport bandwidth.
180    ///
181    /// [`AtomicUsize::fetch_add()`] is used because each connection keeps track of its own
182    /// bandwidth.
183    #[inline(always)]
184    pub(crate) fn transport_outbound_bandwidth(&self, _bandwidth: usize) {
185        #[cfg(feature = "events")]
186        self.outbound_bandwidth.fetch_add(_bandwidth, Ordering::Release);
187    }
188
189    /// Update connected router count.
190    ///
191    /// [`AtomicUsize::store()`] is used because the count is updated only by
192    /// `TransportManager`.
193    #[inline(always)]
194    pub(crate) fn num_connected_routers(&self, _num_connected_routers: usize) {
195        #[cfg(feature = "events")]
196        self.num_connected_routers.store(_num_connected_routers, Ordering::Release);
197    }
198
199    /// Update tunnel build success/failure status.
200    ///
201    /// [`AtomicUsize::fetch_add()`] is used because each tunnel pool keeps track of its own
202    /// tunnel build success/failure rate.
203    #[inline(always)]
204    pub(crate) fn tunnel_status(
205        &self,
206        _num_tunnels_built: usize,
207        _num_tunnel_build_failures: usize,
208    ) {
209        #[cfg(feature = "events")]
210        self.num_tunnels_built.fetch_add(_num_tunnels_built, Ordering::Release);
211        #[cfg(feature = "events")]
212        self.num_tunnel_build_failures
213            .fetch_add(_num_tunnel_build_failures, Ordering::Release);
214    }
215
216    /// Notify the [`EventManager`] that a server destination was started.
217    #[inline(always)]
218    pub(crate) fn server_destination_started(&self, _name: String, _address: String) {
219        #[cfg(feature = "events")]
220        let _ = self.event_tx.try_send(SubsystemEvent::ServerDestinationStarted {
221            name: _name,
222            address: _address,
223        });
224    }
225
226    /// Notify the [`EventManager`] that a client destination was started.
227    #[inline(always)]
228    pub(crate) fn client_destination_started(&self, _name: String) {
229        #[cfg(feature = "events")]
230        let _ = self.event_tx.try_send(SubsystemEvent::ClientDestinationStarted { name: _name });
231    }
232}
233
234impl<R: Runtime> Future for EventHandle<R> {
235    type Output = ();
236
237    #[cfg(feature = "events")]
238    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
239        match &mut self.timer {
240            None => Poll::Pending,
241            Some(timer) => {
242                futures::ready!(timer.poll_unpin(cx));
243
244                // create new timer and register it into the executor
245                let mut timer = R::timer(self.update_interval);
246                let _ = timer.poll_unpin(cx);
247                self.timer = Some(timer);
248
249                Poll::Ready(())
250            }
251        }
252    }
253
254    #[cfg(not(feature = "events"))]
255    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
256        Poll::Pending
257    }
258}
259
260/// Client destination has been started.
261#[derive(Debug, Clone, Default)]
262pub struct ClientDestinationStarted {
263    /// Name of the destination.
264    pub name: String,
265}
266
267/// Server destination has been started.
268#[derive(Debug, Clone, Default)]
269pub struct ServerDestinationStarted {
270    /// Name of the destination.
271    pub name: String,
272
273    /// Address of the destination.
274    pub address: String,
275}
276
277/// Transit tunnel status.
278#[derive(Debug, Clone, Default)]
279pub struct TransitTunnelStatus {
280    /// Number of transit tunnels.
281    pub num_tunnels: usize,
282
283    /// Cumulative inbound bandwith used by all transit tunnels.
284    pub inbound_bandwidth: usize,
285
286    /// Cumulative outbound bandwith used by all transit tunnels.
287    pub outbound_bandwidth: usize,
288}
289
290/// Transport status.
291#[derive(Debug, Clone, Default)]
292pub struct TransportStatus {
293    /// Number of connected routers.
294    pub num_connected_routers: usize,
295
296    /// Cumulative inbound bandwidth consumed by all transports.
297    pub inbound_bandwidth: usize,
298
299    /// Cumulative outbound bandwidth consumed by all transports.
300    pub outbound_bandwidth: usize,
301}
302
303/// Tunnel status.
304#[derive(Debug, Clone, Default)]
305pub struct TunnelStatus {
306    /// Number of tunnels built.
307    pub num_tunnels_built: usize,
308
309    /// Number of tunnel build failures.
310    pub num_tunnel_build_failures: usize,
311}
312
313/// Events emitted by [`EventManager`].
314#[derive(Debug, Clone, Default)]
315pub enum Event {
316    RouterStatus {
317        /// Client destination status updates.
318        client_destinations: Vec<String>,
319
320        /// Server destination status updates.
321        server_destinations: Vec<(String, String)>,
322
323        /// Transit tunnel subsystem status.
324        transit: TransitTunnelStatus,
325
326        /// Transport subsystem status.
327        transport: TransportStatus,
328
329        /// Tunnel subsystem status.
330        tunnel: TunnelStatus,
331    },
332
333    /// Router is shutting down.
334    ShuttingDown,
335
336    /// Router has shut down.
337    #[default]
338    ShutDown,
339}
340
341/// [`EventManager`] state.
342#[cfg(feature = "events")]
343enum State {
344    /// [`EventManager`] and the router is active.
345    Active,
346
347    /// [`EventManager`] and the router is shutting down.
348    ShuttingDown,
349
350    /// [`EventManager`]  and the routerhas shut down.
351    ShutDown,
352}
353
354/// Event manager.
355#[cfg(feature = "events")]
356pub(crate) struct EventManager<R: Runtime> {
357    /// RX channel for receiving events from other subsystems.
358    event_rx: Receiver<SubsystemEvent>,
359
360    /// Event handle.
361    handle: EventHandle<R>,
362
363    /// Pending client destinatin updates.
364    pending_client_updates: Vec<String>,
365
366    /// Pending server destination updates.
367    pending_server_updates: Vec<(String, String)>,
368
369    /// Event manager and router state.
370    state: State,
371
372    /// TX channel for sending router status updates to [`EventSubscriber`].
373    status_tx: Sender<Event>,
374
375    /// Update timer.
376    timer: R::Timer,
377}
378
379/// Event manager.
380#[cfg(not(feature = "events"))]
381pub(crate) struct EventManager<R: Runtime> {
382    _marker: core::marker::PhantomData<R>,
383}
384
385impl<R: Runtime> EventManager<R> {
386    /// Create new [`EventManager`].
387    #[cfg(feature = "events")]
388    pub(crate) fn new(
389        update_interval: Option<Duration>,
390    ) -> (Self, EventSubscriber, EventHandle<R>) {
391        let (event_tx, event_rx) = channel(64);
392        let (status_tx, status_rx) = channel(64);
393        let update_interval = update_interval.unwrap_or(UPDATE_INTERVAL);
394        let handle = EventHandle {
395            event_tx,
396            inbound_bandwidth: Default::default(),
397            outbound_bandwidth: Default::default(),
398            num_connected_routers: Default::default(),
399            num_transit_tunnels: Default::default(),
400            num_tunnel_build_failures: Default::default(),
401            num_tunnels_built: Default::default(),
402            transit_inbound_bandwidth: Default::default(),
403            transit_outbound_bandwidth: Default::default(),
404            update_interval,
405            timer: None,
406        };
407
408        (
409            Self {
410                event_rx,
411                state: State::Active,
412                handle: EventHandle {
413                    event_tx: handle.event_tx.clone(),
414                    inbound_bandwidth: Arc::clone(&handle.inbound_bandwidth),
415                    outbound_bandwidth: Arc::clone(&handle.outbound_bandwidth),
416                    num_connected_routers: Arc::clone(&handle.num_connected_routers),
417                    num_transit_tunnels: Arc::clone(&handle.num_transit_tunnels),
418                    num_tunnel_build_failures: Arc::clone(&handle.num_tunnel_build_failures),
419                    num_tunnels_built: Arc::clone(&handle.num_tunnels_built),
420                    transit_inbound_bandwidth: Arc::clone(&handle.transit_inbound_bandwidth),
421                    transit_outbound_bandwidth: Arc::clone(&handle.transit_outbound_bandwidth),
422                    update_interval,
423                    timer: None,
424                },
425                pending_client_updates: Vec::new(),
426                pending_server_updates: Vec::new(),
427                status_tx,
428                timer: R::timer(update_interval),
429            },
430            EventSubscriber { status_rx },
431            handle,
432        )
433    }
434
435    /// Create new [`EventManager`].
436    #[cfg(not(feature = "events"))]
437    pub(crate) fn new(
438        _update_interval: Option<Duration>,
439    ) -> (Self, EventSubscriber, EventHandle<R>) {
440        (
441            Self {
442                _marker: Default::default(),
443            },
444            EventSubscriber {},
445            EventHandle {
446                _marker: Default::default(),
447            },
448        )
449    }
450
451    /// Send shutdown signal to [`EventSubscriber`].
452    pub(crate) fn shutdown(&mut self) {
453        #[cfg(feature = "events")]
454        match self.state {
455            State::Active => {
456                let _ = self.status_tx.try_send(Event::ShuttingDown);
457
458                self.state = State::ShuttingDown;
459            }
460            State::ShuttingDown => {
461                let _ = self.status_tx.try_send(Event::ShutDown);
462
463                self.state = State::ShutDown;
464            }
465            State::ShutDown => {}
466        }
467    }
468}
469
470impl<R: Runtime> Future for EventManager<R> {
471    type Output = ();
472
473    #[cfg(feature = "events")]
474    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
475        loop {
476            match self.event_rx.poll_recv(cx) {
477                Poll::Pending => break,
478                Poll::Ready(None) => return Poll::Ready(()),
479                Poll::Ready(Some(SubsystemEvent::ClientDestinationStarted { name })) => {
480                    self.pending_client_updates.push(name);
481                }
482                Poll::Ready(Some(SubsystemEvent::ServerDestinationStarted { name, address })) => {
483                    self.pending_server_updates.push((name, address));
484                }
485            }
486        }
487
488        if self.timer.poll_unpin(cx).is_ready() {
489            let server_destinations = mem::take(&mut self.pending_server_updates);
490            let client_destinations = mem::take(&mut self.pending_client_updates);
491
492            let _ = self.status_tx.try_send(Event::RouterStatus {
493                transit: TransitTunnelStatus {
494                    num_tunnels: self.handle.num_transit_tunnels.load(Ordering::Acquire),
495                    inbound_bandwidth: self
496                        .handle
497                        .transit_inbound_bandwidth
498                        .load(Ordering::Acquire),
499                    outbound_bandwidth: self
500                        .handle
501                        .transit_outbound_bandwidth
502                        .load(Ordering::Acquire),
503                },
504                transport: TransportStatus {
505                    num_connected_routers: self
506                        .handle
507                        .num_connected_routers
508                        .load(Ordering::Acquire),
509                    outbound_bandwidth: self.handle.outbound_bandwidth.load(Ordering::Acquire),
510                    inbound_bandwidth: self.handle.inbound_bandwidth.load(Ordering::Acquire),
511                },
512                tunnel: TunnelStatus {
513                    num_tunnels_built: self.handle.num_tunnels_built.load(Ordering::Acquire),
514                    num_tunnel_build_failures: self
515                        .handle
516                        .num_tunnel_build_failures
517                        .load(Ordering::Acquire),
518                },
519                server_destinations,
520                client_destinations,
521            });
522
523            self.timer = R::timer(self.handle.update_interval);
524            let _ = self.timer.poll_unpin(cx);
525        }
526
527        Poll::Pending
528    }
529
530    #[cfg(not(feature = "events"))]
531    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
532        Poll::Pending
533    }
534}
535
536/// Event subscriber.
537pub struct EventSubscriber {
538    /// RX channel for receiving events.
539    #[cfg(feature = "events")]
540    status_rx: Receiver<Event>,
541}
542
543impl EventSubscriber {
544    /// Attempt to get next [`Event`].
545    #[cfg(feature = "events")]
546    pub fn router_status(&mut self) -> Option<Event> {
547        self.status_rx.try_recv().ok()
548    }
549
550    /// Attempt to get next [`Event`].
551    #[cfg(not(feature = "events"))]
552    pub fn router_status(&mut self) -> Option<Event> {
553        None
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use crate::runtime::mock::MockRuntime;
561
562    #[tokio::test]
563    async fn event_handle_timer_works() {
564        let (_manager, _subscriber, handle) =
565            EventManager::<MockRuntime>::new(Some(Duration::from_secs(1)));
566
567        // make a clone of the handle which initializes the event timer
568        let mut new_handle = handle.clone();
569
570        // ensure that the timer keeps firing
571        for _ in 0..3 {
572            assert!(tokio::time::timeout(Duration::from_secs(5), &mut new_handle).await.is_ok());
573        }
574    }
575}