sansio_bootstrap/
lib.rs

1//! The helpful bootstrap APIs which enable an easy implementation of typical client side and server side pipeline initialization.
2
3use bytes::BytesMut;
4use log::{trace, warn};
5use std::{
6    cell::RefCell,
7    io::Error,
8    net::SocketAddr,
9    rc::Rc,
10    sync::Arc,
11    time::{Duration, Instant},
12};
13use tokio::{
14    io::{AsyncReadExt, AsyncWriteExt},
15    net::ToSocketAddrs,
16    sync::{Notify, broadcast},
17};
18use wg::AsyncWaitGroup;
19
20use sansio::{InboundPipeline, OutboundPipeline, Pipeline};
21use sansio_executor::spawn_local;
22use sansio_transport::{TaggedBytesMut, TransportContext};
23
24mod bootstrap_tcp;
25mod bootstrap_udp;
26
27pub use bootstrap_tcp::{
28    bootstrap_tcp_client::BootstrapTcpClient, bootstrap_tcp_server::BootstrapTcpServer,
29};
30pub use bootstrap_udp::{
31    bootstrap_udp_client::BootstrapUdpClient, bootstrap_udp_server::BootstrapUdpServer,
32};
33
34/// A pipeline bundled with its write notification mechanism
35pub(crate) struct PipelineWithNotify<R, W> {
36    pub(crate) pipeline: Rc<Pipeline<R, W>>,
37    pub(crate) write_notify: Arc<Notify>,
38}
39
40impl<R, W> PipelineWithNotify<R, W> {
41    /// Create a new pipeline with automatic write notification setup
42    pub(crate) fn new(pipeline: Rc<Pipeline<R, W>>) -> Self
43    where
44        R: 'static,
45        W: 'static,
46    {
47        let write_notify = Arc::new(Notify::new());
48        let notify_clone = write_notify.clone();
49
50        pipeline.set_write_notify(Arc::new(move || {
51            notify_clone.notify_one();
52        }));
53
54        Self {
55            pipeline,
56            write_notify,
57        }
58    }
59
60    /// Get a reference to the pipeline for outbound operations
61    pub(crate) fn pipeline(&self) -> &Rc<Pipeline<R, W>> {
62        &self.pipeline
63    }
64}
65
66/// Creates a new [Pipeline]
67pub type PipelineFactoryFn<R, W> =
68    Box<dyn Fn(SocketAddr, Option<SocketAddr>) -> Rc<Pipeline<R, W>>>;
69
70const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(86400); // 1 day duration
71
72struct Bootstrap<W> {
73    max_payload_size: usize,
74    pipeline_factory_fn: Option<Rc<PipelineFactoryFn<TaggedBytesMut, W>>>,
75    close_tx: Rc<RefCell<Option<broadcast::Sender<()>>>>,
76    wg: Rc<RefCell<Option<AsyncWaitGroup>>>,
77}
78
79impl<W: 'static> Default for Bootstrap<W> {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl<W: 'static> Bootstrap<W> {
86    fn new() -> Self {
87        Self {
88            max_payload_size: 2048, // Typical internet MTU = 1500, rounded up to a power of 2
89            pipeline_factory_fn: None,
90            close_tx: Rc::new(RefCell::new(None)),
91            wg: Rc::new(RefCell::new(None)),
92        }
93    }
94
95    fn max_payload_size(&mut self, max_payload_size: usize) -> &mut Self {
96        self.max_payload_size = max_payload_size;
97        self
98    }
99
100    fn pipeline(&mut self, pipeline_factory_fn: PipelineFactoryFn<TaggedBytesMut, W>) -> &mut Self {
101        self.pipeline_factory_fn = Some(Rc::new(Box::new(pipeline_factory_fn)));
102        self
103    }
104
105    async fn stop(&self) {
106        let mut close_tx = self.close_tx.borrow_mut();
107        if let Some(close_tx) = close_tx.take() {
108            let _ = close_tx.send(());
109        }
110    }
111
112    async fn wait_for_stop(&self) {
113        let wg = {
114            let mut wg = self.wg.borrow_mut();
115            wg.take()
116        };
117        if let Some(wg) = wg {
118            wg.wait().await;
119        }
120    }
121
122    async fn graceful_stop(&self) {
123        self.stop().await;
124        self.wait_for_stop().await;
125    }
126}