pub(crate) mod client;
pub(crate) mod server;
use crate::error::{StreamError, TaskError};
use crate::util::watch;
use tokio::sync::{oneshot, mpsc};
use tokio::task::JoinHandle;
pub(crate) enum SendBack<P> {
None,
Packet(P),
Close,
CloseWithPacket
}
pub(crate) struct TaskHandle {
pub close: oneshot::Sender<()>,
pub task: JoinHandle<Result<(), TaskError>>
}
impl TaskHandle {
pub async fn wait(self) -> Result<(), TaskError> {
self.task.await
.map_err(TaskError::Join)?
}
pub async fn close(self) -> Result<(), TaskError> {
let _ = self.close.send(());
self.task.await
.map_err(TaskError::Join)?
}
#[cfg(test)]
pub fn abort(self) {
self.task.abort();
}
}
#[derive(Debug, Clone)]
pub struct StreamSender<P> {
pub(crate) inner: mpsc::Sender<P>
}
impl<P> StreamSender<P> {
pub(crate) fn new(inner: mpsc::Sender<P>) -> Self {
Self { inner }
}
pub async fn send(&self, packet: P) -> Result<(), StreamError> {
self.inner.send(packet).await
.map_err(|_| StreamError::StreamAlreadyClosed)
}
}
#[derive(Debug)]
pub struct StreamReceiver<P> {
pub(crate) inner: mpsc::Receiver<P>
}
impl<P> StreamReceiver<P> {
pub(crate) fn new(inner: mpsc::Receiver<P>) -> Self {
Self { inner }
}
pub async fn receive(&mut self) -> Option<P> {
self.inner.recv().await
}
pub fn close(&mut self) {
self.inner.close();
}
}
#[derive(Debug, Clone)]
pub struct Configurator<C> {
inner: watch::Sender<C>
}
impl<C> Configurator<C> {
pub(crate) fn new(inner: watch::Sender<C>) -> Self {
Self { inner }
}
pub fn update(&self, cfg: C) {
self.inner.send(cfg);
}
pub fn read(&self) -> C
where C: Clone {
self.inner.newest()
}
}