1use crate::runtime::Runtime;
20
21#[cfg(feature = "events")]
22use futures::FutureExt;
23#[cfg(feature = "events")]
24use thingbuf::mpsc::{channel, Receiver, Sender};
25
26use alloc::{string::String, vec::Vec};
27use core::{
28 future::Future,
29 pin::Pin,
30 task::{Context, Poll},
31 time::Duration,
32};
33
34#[cfg(feature = "events")]
35use alloc::sync::Arc;
36#[cfg(feature = "events")]
37use core::{
38 mem,
39 sync::atomic::{AtomicUsize, Ordering},
40};
41
42#[cfg(feature = "events")]
44const UPDATE_INTERVAL: Duration = Duration::from_secs(1);
45
46#[derive(Debug, Clone)]
48#[cfg(feature = "events")]
49enum SubsystemEvent {
50 ClientDestinationStarted {
52 name: String,
54 },
55
56 ServerDestinationStarted {
58 name: String,
60
61 address: String,
63 },
64}
65
66#[cfg(feature = "events")]
67impl Default for SubsystemEvent {
68 fn default() -> Self {
69 Self::ClientDestinationStarted {
70 name: String::new(),
71 }
72 }
73}
74
75#[cfg(feature = "events")]
77pub(crate) struct EventHandle<R: Runtime> {
78 event_tx: Sender<SubsystemEvent>,
80
81 inbound_bandwidth: Arc<AtomicUsize>,
83
84 outbound_bandwidth: Arc<AtomicUsize>,
86
87 num_connected_routers: Arc<AtomicUsize>,
89
90 num_transit_tunnels: Arc<AtomicUsize>,
92
93 num_tunnel_build_failures: Arc<AtomicUsize>,
95
96 num_tunnels_built: Arc<AtomicUsize>,
98
99 transit_inbound_bandwidth: Arc<AtomicUsize>,
101
102 transit_outbound_bandwidth: Arc<AtomicUsize>,
104
105 update_interval: Duration,
107
108 timer: Option<R::Timer>,
110}
111
112#[cfg(feature = "events")]
113impl<R: Runtime> Clone for EventHandle<R> {
114 fn clone(&self) -> Self {
115 EventHandle {
116 event_tx: self.event_tx.clone(),
117 inbound_bandwidth: Arc::clone(&self.inbound_bandwidth),
118 outbound_bandwidth: Arc::clone(&self.outbound_bandwidth),
119 num_connected_routers: Arc::clone(&self.num_connected_routers),
120 num_transit_tunnels: Arc::clone(&self.num_transit_tunnels),
121 num_tunnel_build_failures: Arc::clone(&self.num_tunnel_build_failures),
122 num_tunnels_built: Arc::clone(&self.num_tunnels_built),
123 transit_inbound_bandwidth: Arc::clone(&self.transit_inbound_bandwidth),
124 transit_outbound_bandwidth: Arc::clone(&self.transit_outbound_bandwidth),
125 update_interval: self.update_interval,
126 timer: Some(R::timer(self.update_interval)),
127 }
128 }
129}
130
131#[cfg(not(feature = "events"))]
133#[derive(Clone)]
134pub(crate) struct EventHandle<R: Runtime> {
135 _marker: core::marker::PhantomData<R>,
137}
138
139impl<R: Runtime> EventHandle<R> {
140 #[inline(always)]
145 pub(crate) fn num_transit_tunnels(&self, _num_tunnels: usize) {
146 #[cfg(feature = "events")]
147 self.num_transit_tunnels.store(_num_tunnels, Ordering::Release);
148 }
149
150 pub(crate) fn transit_inbound_bandwidth(&self, _bandwidth: usize) {
155 #[cfg(feature = "events")]
156 self.transit_inbound_bandwidth.fetch_add(_bandwidth, Ordering::Release);
157 }
158
159 #[inline(always)]
164 pub(crate) fn transit_outbound_bandwidth(&self, _bandwidth: usize) {
165 #[cfg(feature = "events")]
166 self.transit_outbound_bandwidth.fetch_add(_bandwidth, Ordering::Release);
167 }
168
169 #[inline(always)]
174 pub(crate) fn transport_inbound_bandwidth(&self, _bandwidth: usize) {
175 #[cfg(feature = "events")]
176 self.inbound_bandwidth.fetch_add(_bandwidth, Ordering::Release);
177 }
178
179 #[inline(always)]
184 pub(crate) fn transport_outbound_bandwidth(&self, _bandwidth: usize) {
185 #[cfg(feature = "events")]
186 self.outbound_bandwidth.fetch_add(_bandwidth, Ordering::Release);
187 }
188
189 #[inline(always)]
194 pub(crate) fn num_connected_routers(&self, _num_connected_routers: usize) {
195 #[cfg(feature = "events")]
196 self.num_connected_routers.store(_num_connected_routers, Ordering::Release);
197 }
198
199 #[inline(always)]
204 pub(crate) fn tunnel_status(
205 &self,
206 _num_tunnels_built: usize,
207 _num_tunnel_build_failures: usize,
208 ) {
209 #[cfg(feature = "events")]
210 self.num_tunnels_built.fetch_add(_num_tunnels_built, Ordering::Release);
211 #[cfg(feature = "events")]
212 self.num_tunnel_build_failures
213 .fetch_add(_num_tunnel_build_failures, Ordering::Release);
214 }
215
216 #[inline(always)]
218 pub(crate) fn server_destination_started(&self, _name: String, _address: String) {
219 #[cfg(feature = "events")]
220 let _ = self.event_tx.try_send(SubsystemEvent::ServerDestinationStarted {
221 name: _name,
222 address: _address,
223 });
224 }
225
226 #[inline(always)]
228 pub(crate) fn client_destination_started(&self, _name: String) {
229 #[cfg(feature = "events")]
230 let _ = self.event_tx.try_send(SubsystemEvent::ClientDestinationStarted { name: _name });
231 }
232}
233
234impl<R: Runtime> Future for EventHandle<R> {
235 type Output = ();
236
237 #[cfg(feature = "events")]
238 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
239 match &mut self.timer {
240 None => Poll::Pending,
241 Some(timer) => {
242 futures::ready!(timer.poll_unpin(cx));
243
244 let mut timer = R::timer(self.update_interval);
246 let _ = timer.poll_unpin(cx);
247 self.timer = Some(timer);
248
249 Poll::Ready(())
250 }
251 }
252 }
253
254 #[cfg(not(feature = "events"))]
255 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
256 Poll::Pending
257 }
258}
259
260#[derive(Debug, Clone, Default)]
262pub struct ClientDestinationStarted {
263 pub name: String,
265}
266
267#[derive(Debug, Clone, Default)]
269pub struct ServerDestinationStarted {
270 pub name: String,
272
273 pub address: String,
275}
276
277#[derive(Debug, Clone, Default)]
279pub struct TransitTunnelStatus {
280 pub num_tunnels: usize,
282
283 pub inbound_bandwidth: usize,
285
286 pub outbound_bandwidth: usize,
288}
289
290#[derive(Debug, Clone, Default)]
292pub struct TransportStatus {
293 pub num_connected_routers: usize,
295
296 pub inbound_bandwidth: usize,
298
299 pub outbound_bandwidth: usize,
301}
302
303#[derive(Debug, Clone, Default)]
305pub struct TunnelStatus {
306 pub num_tunnels_built: usize,
308
309 pub num_tunnel_build_failures: usize,
311}
312
313#[derive(Debug, Clone, Default)]
315pub enum Event {
316 RouterStatus {
317 client_destinations: Vec<String>,
319
320 server_destinations: Vec<(String, String)>,
322
323 transit: TransitTunnelStatus,
325
326 transport: TransportStatus,
328
329 tunnel: TunnelStatus,
331 },
332
333 ShuttingDown,
335
336 #[default]
338 ShutDown,
339}
340
341#[cfg(feature = "events")]
343enum State {
344 Active,
346
347 ShuttingDown,
349
350 ShutDown,
352}
353
354#[cfg(feature = "events")]
356pub(crate) struct EventManager<R: Runtime> {
357 event_rx: Receiver<SubsystemEvent>,
359
360 handle: EventHandle<R>,
362
363 pending_client_updates: Vec<String>,
365
366 pending_server_updates: Vec<(String, String)>,
368
369 state: State,
371
372 status_tx: Sender<Event>,
374
375 timer: R::Timer,
377}
378
379#[cfg(not(feature = "events"))]
381pub(crate) struct EventManager<R: Runtime> {
382 _marker: core::marker::PhantomData<R>,
383}
384
385impl<R: Runtime> EventManager<R> {
386 #[cfg(feature = "events")]
388 pub(crate) fn new(
389 update_interval: Option<Duration>,
390 ) -> (Self, EventSubscriber, EventHandle<R>) {
391 let (event_tx, event_rx) = channel(64);
392 let (status_tx, status_rx) = channel(64);
393 let update_interval = update_interval.unwrap_or(UPDATE_INTERVAL);
394 let handle = EventHandle {
395 event_tx,
396 inbound_bandwidth: Default::default(),
397 outbound_bandwidth: Default::default(),
398 num_connected_routers: Default::default(),
399 num_transit_tunnels: Default::default(),
400 num_tunnel_build_failures: Default::default(),
401 num_tunnels_built: Default::default(),
402 transit_inbound_bandwidth: Default::default(),
403 transit_outbound_bandwidth: Default::default(),
404 update_interval,
405 timer: None,
406 };
407
408 (
409 Self {
410 event_rx,
411 state: State::Active,
412 handle: EventHandle {
413 event_tx: handle.event_tx.clone(),
414 inbound_bandwidth: Arc::clone(&handle.inbound_bandwidth),
415 outbound_bandwidth: Arc::clone(&handle.outbound_bandwidth),
416 num_connected_routers: Arc::clone(&handle.num_connected_routers),
417 num_transit_tunnels: Arc::clone(&handle.num_transit_tunnels),
418 num_tunnel_build_failures: Arc::clone(&handle.num_tunnel_build_failures),
419 num_tunnels_built: Arc::clone(&handle.num_tunnels_built),
420 transit_inbound_bandwidth: Arc::clone(&handle.transit_inbound_bandwidth),
421 transit_outbound_bandwidth: Arc::clone(&handle.transit_outbound_bandwidth),
422 update_interval,
423 timer: None,
424 },
425 pending_client_updates: Vec::new(),
426 pending_server_updates: Vec::new(),
427 status_tx,
428 timer: R::timer(update_interval),
429 },
430 EventSubscriber { status_rx },
431 handle,
432 )
433 }
434
435 #[cfg(not(feature = "events"))]
437 pub(crate) fn new(
438 _update_interval: Option<Duration>,
439 ) -> (Self, EventSubscriber, EventHandle<R>) {
440 (
441 Self {
442 _marker: Default::default(),
443 },
444 EventSubscriber {},
445 EventHandle {
446 _marker: Default::default(),
447 },
448 )
449 }
450
451 pub(crate) fn shutdown(&mut self) {
453 #[cfg(feature = "events")]
454 match self.state {
455 State::Active => {
456 let _ = self.status_tx.try_send(Event::ShuttingDown);
457
458 self.state = State::ShuttingDown;
459 }
460 State::ShuttingDown => {
461 let _ = self.status_tx.try_send(Event::ShutDown);
462
463 self.state = State::ShutDown;
464 }
465 State::ShutDown => {}
466 }
467 }
468}
469
470impl<R: Runtime> Future for EventManager<R> {
471 type Output = ();
472
473 #[cfg(feature = "events")]
474 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
475 loop {
476 match self.event_rx.poll_recv(cx) {
477 Poll::Pending => break,
478 Poll::Ready(None) => return Poll::Ready(()),
479 Poll::Ready(Some(SubsystemEvent::ClientDestinationStarted { name })) => {
480 self.pending_client_updates.push(name);
481 }
482 Poll::Ready(Some(SubsystemEvent::ServerDestinationStarted { name, address })) => {
483 self.pending_server_updates.push((name, address));
484 }
485 }
486 }
487
488 if self.timer.poll_unpin(cx).is_ready() {
489 let server_destinations = mem::take(&mut self.pending_server_updates);
490 let client_destinations = mem::take(&mut self.pending_client_updates);
491
492 let _ = self.status_tx.try_send(Event::RouterStatus {
493 transit: TransitTunnelStatus {
494 num_tunnels: self.handle.num_transit_tunnels.load(Ordering::Acquire),
495 inbound_bandwidth: self
496 .handle
497 .transit_inbound_bandwidth
498 .load(Ordering::Acquire),
499 outbound_bandwidth: self
500 .handle
501 .transit_outbound_bandwidth
502 .load(Ordering::Acquire),
503 },
504 transport: TransportStatus {
505 num_connected_routers: self
506 .handle
507 .num_connected_routers
508 .load(Ordering::Acquire),
509 outbound_bandwidth: self.handle.outbound_bandwidth.load(Ordering::Acquire),
510 inbound_bandwidth: self.handle.inbound_bandwidth.load(Ordering::Acquire),
511 },
512 tunnel: TunnelStatus {
513 num_tunnels_built: self.handle.num_tunnels_built.load(Ordering::Acquire),
514 num_tunnel_build_failures: self
515 .handle
516 .num_tunnel_build_failures
517 .load(Ordering::Acquire),
518 },
519 server_destinations,
520 client_destinations,
521 });
522
523 self.timer = R::timer(self.handle.update_interval);
524 let _ = self.timer.poll_unpin(cx);
525 }
526
527 Poll::Pending
528 }
529
530 #[cfg(not(feature = "events"))]
531 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
532 Poll::Pending
533 }
534}
535
536pub struct EventSubscriber {
538 #[cfg(feature = "events")]
540 status_rx: Receiver<Event>,
541}
542
543impl EventSubscriber {
544 #[cfg(feature = "events")]
546 pub fn router_status(&mut self) -> Option<Event> {
547 self.status_rx.try_recv().ok()
548 }
549
550 #[cfg(not(feature = "events"))]
552 pub fn router_status(&mut self) -> Option<Event> {
553 None
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use crate::runtime::mock::MockRuntime;
561
562 #[tokio::test]
563 async fn event_handle_timer_works() {
564 let (_manager, _subscriber, handle) =
565 EventManager::<MockRuntime>::new(Some(Duration::from_secs(1)));
566
567 let mut new_handle = handle.clone();
569
570 for _ in 0..3 {
572 assert!(tokio::time::timeout(Duration::from_secs(5), &mut new_handle).await.is_ok());
573 }
574 }
575}