use std::{
error::Error,
future::{pending, Future},
pin::Pin,
};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::{mpsc, oneshot, watch},
};
use crate::{context::SupervisorMessage, Actor, AsyncAsk, Message};
pub mod fs;
pub mod read;
pub mod tcp;
pub trait DataFrameReceiver: Sized + Default + Message + Send + Sync + 'static {
type Request: Message;
type Frame: DataFrame;
fn recv(&mut self, frame: &Self::Frame) -> Option<Self::Request>;
}
pub trait DataFrame: Sized {
fn frame(&self) -> &[u8];
}
pub trait ComponentReader<DFR: DataFrameReceiver>: Send + Sync + 'static {
type Frame: Default + Message + std::fmt::Debug;
type Future: Future<Output = std::io::Result<Option<DFR::Request>>> + Send;
fn read(&self) -> Self::Future;
}
pub trait ComponentFuture {
type Payload: DataFrameReceiver;
type Reader: ComponentReader<Self::Payload>;
type Actor: Actor + AsyncAsk<<Self::Payload as DataFrameReceiver>::Request>;
type Error: Error + Send + Sync;
type Future<'a>: Future<Output = Result<(Self::Reader, Self::Actor), Self::Error>>
+ Send
+ Sync
+ 'a;
}
pub trait Component: ComponentFuture + Send + Sync {
type Shutdown: Future<Output = ()> + Send + Sync;
#[allow(clippy::needless_lifetimes)]
fn accept<'a>(&'a mut self) -> Self::Future<'a>;
fn shutdown(self) -> Self::Shutdown;
}
pub trait IoReadFuture {
type Future<'a>: Future<Output = Result<Option<()>, std::io::Error>> + Send + 'a;
}
pub trait IoRead<Stream: AsyncRead>:
IoReadFuture + Default + std::fmt::Debug + Send + Sync + 'static
{
fn read<'a>(&'a mut self, stream: &'a mut Stream) -> Self::Future<'a>;
}
#[derive(Debug)]
pub struct Reader<R: Default, O> {
inner: mpsc::Sender<(R, oneshot::Sender<std::io::Result<O>>)>,
}
impl<R: Default, O> Clone for Reader<R, O> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<R: Default, O> Reader<R, O> {
fn new(inner: mpsc::Sender<(R, oneshot::Sender<std::io::Result<O>>)>) -> Self {
Self { inner }
}
pub async fn read(&self) -> std::io::Result<Option<O>> {
let (tx, rx) = oneshot::channel();
if (self.inner.send((R::default(), tx)).await).is_err() {
Ok(None)
} else {
match rx.await {
Ok(response) => response.map(Some),
Err(_) => Ok(None),
}
}
}
}
impl<Pay, R> ComponentReader<Pay> for Reader<R, Pay::Request>
where
Pay: DataFrameReceiver<Frame = R>,
R: Default + Message + std::fmt::Debug + Send + Sync + 'static,
{
type Frame = R;
type Future = Pin<Box<dyn Future<Output = std::io::Result<Option<Pay::Request>>> + Send>>;
fn read(&self) -> Self::Future {
let this = (*self).clone();
Box::pin(async move { this.read().await })
}
}
#[derive(Clone, Debug)]
pub struct Writer {
inner: mpsc::Sender<(Vec<u8>, oneshot::Sender<std::io::Result<()>>)>,
}
impl Writer {
fn new(inner: mpsc::Sender<(Vec<u8>, oneshot::Sender<std::io::Result<()>>)>) -> Self {
Self { inner }
}
pub async fn write(&self, data: Vec<u8>) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.inner.send((data, tx)).await {
Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))
} else {
match rx.await {
Ok(result) => result,
Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
}
}
}
}
async fn supervisor<F: Future>(
rx: &mut watch::Receiver<Option<SupervisorMessage>>,
shutdown: impl Future,
fut: F,
) -> Supervise<F::Output> {
let output = tokio::select! {
result = rx.changed() => {
if result.is_err() {
return Supervise::Dead
}
match *rx.borrow() {
Some(SupervisorMessage::Shutdown) => Supervise::Shutdown,
None => Supervise::Continue
}
},
_ = shutdown => Supervise::Shutdown,
option = fut => Supervise::Success(option)
};
output
}
enum Supervise<Output> {
Success(Output),
Continue,
Shutdown,
Dead,
}
async fn create_reader_actor<
Pipe: AsyncRead + Unpin,
R: IoRead<Pipe> + Send,
Payload: DataFrameReceiver<Frame = R>,
>(
mut source: Pipe,
mut reader_rx: mpsc::Receiver<(R, oneshot::Sender<std::io::Result<Payload::Request>>)>,
mut parent_rx: watch::Receiver<Option<SupervisorMessage>>,
shutdown_tx: oneshot::Sender<()>,
) {
loop {
let (mut frame, sender) =
match supervisor(&mut parent_rx, pending::<()>(), reader_rx.recv()).await {
Supervise::Success(Some(output)) => output,
Supervise::Continue => continue,
Supervise::Success(None) | Supervise::Dead | Supervise::Shutdown => break,
};
let mut payload = Payload::default();
loop {
match supervisor(&mut parent_rx, pending::<()>(), frame.read(&mut source)).await {
Supervise::Success(Ok(Some(_))) => {
if let Some(object) = payload.recv(&frame) {
let _ = sender.send(Ok(object));
break;
}
}
Supervise::Continue => continue, Supervise::Success(Err(_))
| Supervise::Success(Ok(None))
| Supervise::Dead
| Supervise::Shutdown => break,
};
}
}
let _ = shutdown_tx.send(());
}
async fn create_writer_actor(
mut source: impl AsyncWrite + Unpin,
mut writer_rx: mpsc::Receiver<(Vec<u8>, oneshot::Sender<std::io::Result<()>>)>,
mut parent_rx: watch::Receiver<Option<SupervisorMessage>>,
mut shutdown_rx: oneshot::Receiver<()>,
) {
loop {
let (data, sender) =
match supervisor(&mut parent_rx, &mut shutdown_rx, writer_rx.recv()).await {
Supervise::Success(Some(output)) => output,
Supervise::Continue => continue,
Supervise::Success(None) | Supervise::Dead | Supervise::Shutdown => break,
};
match supervisor(&mut parent_rx, &mut shutdown_rx, source.write_all(&data)).await {
Supervise::Success(result) => {
let _ = sender.send(result);
}
Supervise::Continue => continue, Supervise::Dead | Supervise::Shutdown => break,
};
}
}