use std::{
collections::HashMap,
io,
ops::Deref,
sync::{Arc, OnceLock},
};
use parking_lot::{Mutex, RwLock};
use quinn::{ClientConfig, Endpoint, ServerConfig, StreamId};
use tokio::task::JoinHandle;
use crate::conn::{Conn, ConnId, Streams};
#[derive(Clone)]
pub struct Node(Arc<InnerNode>);
impl Deref for Node {
type Target = Arc<InnerNode>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[doc(hidden)]
pub struct InnerNode {
pub(crate) config: Config,
pub(crate) endpoint: OnceLock<Endpoint>,
pub(crate) conns: RwLock<HashMap<ConnId, Conn>>,
pub(crate) tasks: Mutex<Vec<JoinHandle<()>>>,
}
impl Node {
pub fn new(config: Config) -> Self {
Self(Arc::new(InnerNode {
config,
endpoint: Default::default(),
conns: Default::default(),
tasks: Default::default(),
}))
}
pub(crate) fn get_endpoint(&self) -> io::Result<&Endpoint> {
self.endpoint.get().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"no existing socket found; you must call `Node::start` first",
)
})
}
pub(crate) fn register_task(&self, handle: JoinHandle<()>) {
self.tasks.lock().push(handle);
}
pub(crate) fn register_conn_task(&self, conn_id: ConnId, handle: JoinHandle<()>) {
if let Some(tasks) = self.conns.read().get(&conn_id).map(|c| c.tasks.clone()) {
tasks.lock().push(handle);
}
}
pub(crate) fn get_streams(&self, conn_id: ConnId) -> Option<Arc<Streams>> {
self.conns.read().get(&conn_id).map(|c| c.streams.clone())
}
pub(crate) fn register_msg_rx(
&self,
conn_id: ConnId,
stream_id: Option<StreamId>,
size: usize,
) {
if let Some(stream_id) = stream_id {
if let Some(stats) = self
.get_streams(conn_id)
.and_then(|streams| streams.read().get(&stream_id).map(|s| s.stats.clone()))
{
stats.register_msg_rx(size);
}
} else if let Some(stats) = self
.conns
.read()
.get(&conn_id)
.map(|c| c.datagram_stats.clone())
{
stats.register_msg_rx(size);
}
}
pub(crate) fn register_msg_tx(
&self,
conn_id: ConnId,
stream_id: Option<StreamId>,
size: usize,
) {
if let Some(stream_id) = stream_id {
if let Some(stats) = self
.get_streams(conn_id)
.and_then(|streams| streams.read().get(&stream_id).map(|s| s.stats.clone()))
{
stats.register_msg_tx(size);
}
} else if let Some(stats) = self
.conns
.read()
.get(&conn_id)
.map(|c| c.datagram_stats.clone())
{
stats.register_msg_tx(size);
}
}
}
#[derive(Debug, Clone)]
pub struct Config {
pub(crate) client: Option<ClientConfig>,
pub(crate) server: Option<ServerConfig>,
}
impl Config {
pub fn new(client: Option<ClientConfig>, server: Option<ServerConfig>) -> Self {
if client.is_none() && server.is_none() {
panic!("the node can't function without any config");
}
Self { client, server }
}
}