sansio_bootstrap/
lib.rs

1//! The helpful bootstrap APIs which enable an easy implementation of typical client side and server side pipeline initialization.
2
3use bytes::BytesMut;
4use log::{trace, warn};
5use std::{
6    cell::RefCell,
7    io::Error,
8    net::SocketAddr,
9    rc::Rc,
10    time::{Duration, Instant},
11};
12use tokio::{
13    io::{AsyncReadExt, AsyncWriteExt},
14    net::ToSocketAddrs,
15    sync::broadcast,
16};
17use wg::AsyncWaitGroup;
18
19use sansio::{InboundPipeline, OutboundPipeline, Pipeline};
20use sansio_executor::spawn_local;
21use sansio_transport::{TaggedBytesMut, TransportContext};
22
23mod bootstrap_tcp;
24mod bootstrap_udp;
25
26pub use bootstrap_tcp::{
27    bootstrap_tcp_client::BootstrapTcpClient, bootstrap_tcp_server::BootstrapTcpServer,
28};
29pub use bootstrap_udp::{
30    bootstrap_udp_client::BootstrapUdpClient, bootstrap_udp_server::BootstrapUdpServer,
31};
32
33/// Creates a new [Pipeline]
34pub type PipelineFactoryFn<R, W> = Box<dyn Fn() -> Rc<Pipeline<R, W>>>;
35
36const MIN_DURATION_IN_MS: u64 = 100; // 100 ms
37
38struct Bootstrap<W> {
39    max_payload_size: usize,
40    pipeline_factory_fn: Option<Rc<PipelineFactoryFn<TaggedBytesMut, W>>>,
41    close_tx: Rc<RefCell<Option<broadcast::Sender<()>>>>,
42    wg: Rc<RefCell<Option<AsyncWaitGroup>>>,
43}
44
45impl<W: 'static> Default for Bootstrap<W> {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl<W: 'static> Bootstrap<W> {
52    fn new() -> Self {
53        Self {
54            max_payload_size: 2048, // Typical internet MTU = 1500, rounded up to a power of 2
55            pipeline_factory_fn: None,
56            close_tx: Rc::new(RefCell::new(None)),
57            wg: Rc::new(RefCell::new(None)),
58        }
59    }
60
61    fn max_payload_size(&mut self, max_payload_size: usize) -> &mut Self {
62        self.max_payload_size = max_payload_size;
63        self
64    }
65
66    fn pipeline(&mut self, pipeline_factory_fn: PipelineFactoryFn<TaggedBytesMut, W>) -> &mut Self {
67        self.pipeline_factory_fn = Some(Rc::new(Box::new(pipeline_factory_fn)));
68        self
69    }
70
71    async fn stop(&self) {
72        let mut close_tx = self.close_tx.borrow_mut();
73        if let Some(close_tx) = close_tx.take() {
74            let _ = close_tx.send(());
75        }
76    }
77
78    async fn wait_for_stop(&self) {
79        let wg = {
80            let mut wg = self.wg.borrow_mut();
81            wg.take()
82        };
83        if let Some(wg) = wg {
84            wg.wait().await;
85        }
86    }
87
88    async fn graceful_stop(&self) {
89        self.stop().await;
90        self.wait_for_stop().await;
91    }
92}