use async_net::AsyncToSocketAddrs;
use bytes::BytesMut;
use futures_lite::{AsyncReadExt, AsyncWriteExt};
use log::{trace, warn};
use smol::Timer;
use std::{
cell::RefCell,
io::Error,
net::SocketAddr,
rc::Rc,
time::{Duration, Instant},
};
use waitgroup::{WaitGroup, Worker};
use crate::channel::{InboundPipeline, OutboundPipeline, Pipeline};
use crate::executor::spawn_local;
use crate::transport::{TaggedBytesMut, TransportContext};
mod bootstrap_tcp;
mod bootstrap_udp;
pub use bootstrap_tcp::{
bootstrap_tcp_client::BootstrapTcpClient, bootstrap_tcp_server::BootstrapTcpServer,
};
pub use bootstrap_udp::{
bootstrap_udp_client::BootstrapUdpClient, bootstrap_udp_server::BootstrapUdpServer,
};
pub type PipelineFactoryFn<R, W> = Box<dyn (Fn() -> Rc<Pipeline<R, W>>)>;
const MAX_DURATION_IN_SECS: u64 = 86400;
struct Bootstrap<W> {
max_payload_size: usize,
pipeline_factory_fn: Option<Rc<PipelineFactoryFn<TaggedBytesMut, W>>>,
close_tx: Rc<RefCell<Option<async_broadcast::Sender<()>>>>,
wg: Rc<RefCell<Option<WaitGroup>>>,
}
impl<W: 'static> Default for Bootstrap<W> {
fn default() -> Self {
Self::new()
}
}
impl<W: 'static> Bootstrap<W> {
fn new() -> Self {
Self {
max_payload_size: 2048, pipeline_factory_fn: None,
close_tx: Rc::new(RefCell::new(None)),
wg: Rc::new(RefCell::new(None)),
}
}
fn max_payload_size(&mut self, max_payload_size: usize) -> &mut Self {
self.max_payload_size = max_payload_size;
self
}
fn pipeline(&mut self, pipeline_factory_fn: PipelineFactoryFn<TaggedBytesMut, W>) -> &mut Self {
self.pipeline_factory_fn = Some(Rc::new(Box::new(pipeline_factory_fn)));
self
}
async fn stop(&self) {
let mut close_tx = self.close_tx.borrow_mut();
if let Some(close_tx) = close_tx.take() {
let _ = close_tx.try_broadcast(());
}
}
async fn wait_for_stop(&self) {
let wg = {
let mut wg = self.wg.borrow_mut();
wg.take()
};
if let Some(wg) = wg {
wg.wait().await;
}
}
async fn graceful_stop(&self) {
self.stop().await;
self.wait_for_stop().await;
}
}