use std::{
collections::HashMap,
sync::{Arc, Mutex, OnceLock},
};
use super::AsyncEngine;
use async_trait::async_trait;
use tokio::sync::oneshot;
use super::{Data, Error, PipelineError, PipelineIO};
mod sinks;
mod sources;
pub use sinks::{SegmentSink, ServiceBackend};
pub use sources::{SegmentSource, ServiceFrontend};
pub type Service<In, Out> = Arc<ServiceFrontend<In, Out>>;
mod private {
pub struct Token;
}
#[async_trait]
pub trait Source<T: PipelineIO>: Data {
async fn on_next(&self, data: T, _: private::Token) -> Result<(), Error>;
fn set_edge(&self, edge: Edge<T>, _: private::Token) -> Result<(), PipelineError>;
fn link<S: Sink<T> + 'static>(&self, sink: Arc<S>) -> Result<Arc<S>, PipelineError> {
let edge = Edge::new(sink.clone());
self.set_edge(edge, private::Token)?;
Ok(sink)
}
}
#[async_trait]
pub trait Sink<T: PipelineIO>: Data {
async fn on_data(&self, data: T, _: private::Token) -> Result<(), Error>;
}
pub struct Edge<T: PipelineIO> {
downstream: Arc<dyn Sink<T>>,
}
impl<T: PipelineIO> Edge<T> {
fn new(downstream: Arc<dyn Sink<T>>) -> Self {
Edge { downstream }
}
async fn write(&self, data: T) -> Result<(), Error> {
self.downstream.on_data(data, private::Token).await
}
}
type NodeFn<In, Out> = Box<dyn Fn(In) -> Result<Out, Error> + Send + Sync>;
#[async_trait]
pub trait Operator<UpIn: PipelineIO, UpOut: PipelineIO, DownIn: PipelineIO, DownOut: PipelineIO>:
Data
{
async fn generate(
&self,
req: UpIn,
next: Arc<dyn AsyncEngine<DownIn, DownOut, Error>>,
) -> Result<UpOut, Error>;
fn into_operator(self: &Arc<Self>) -> Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>
where
Self: Sized,
{
PipelineOperator::new(self.clone())
}
}
pub struct PipelineOperatorForwardEdge<
UpIn: PipelineIO,
UpOut: PipelineIO,
DownIn: PipelineIO,
DownOut: PipelineIO,
> {
parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
}
pub struct PipelineOperatorBackwardEdge<
UpIn: PipelineIO,
UpOut: PipelineIO,
DownIn: PipelineIO,
DownOut: PipelineIO,
> {
parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
}
pub struct PipelineOperator<
UpIn: PipelineIO,
UpOut: PipelineIO,
DownIn: PipelineIO,
DownOut: PipelineIO,
> {
operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>,
downstream: Arc<sources::Frontend<DownIn, DownOut>>,
upstream: sinks::SinkEdge<UpOut>,
}
impl<UpIn, UpOut, DownIn, DownOut> PipelineOperator<UpIn, UpOut, DownIn, DownOut>
where
UpIn: PipelineIO,
UpOut: PipelineIO,
DownIn: PipelineIO,
DownOut: PipelineIO,
{
pub fn new(operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>) -> Arc<Self> {
Arc::new(PipelineOperator {
operator,
downstream: Arc::new(sources::Frontend::default()),
upstream: sinks::SinkEdge::default(),
})
}
pub fn forward_edge(
self: &Arc<Self>,
) -> Arc<PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>> {
Arc::new(PipelineOperatorForwardEdge {
parent: self.clone(),
})
}
pub fn backward_edge(
self: &Arc<Self>,
) -> Arc<PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>> {
Arc::new(PipelineOperatorBackwardEdge {
parent: self.clone(),
})
}
}
#[async_trait]
impl<UpIn, UpOut, DownIn, DownOut> AsyncEngine<UpIn, UpOut, Error>
for PipelineOperator<UpIn, UpOut, DownIn, DownOut>
where
UpIn: PipelineIO + Sync,
DownIn: PipelineIO + Sync,
DownOut: PipelineIO,
UpOut: PipelineIO,
{
async fn generate(&self, req: UpIn) -> Result<UpOut, Error> {
self.operator.generate(req, self.downstream.clone()).await
}
}
#[async_trait]
impl<UpIn, UpOut, DownIn, DownOut> Sink<UpIn>
for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
where
UpIn: PipelineIO + Sync,
DownIn: PipelineIO + Sync,
DownOut: PipelineIO,
UpOut: PipelineIO,
{
async fn on_data(&self, data: UpIn, _token: private::Token) -> Result<(), Error> {
let stream = self.parent.generate(data).await?;
self.parent.upstream.on_next(stream, private::Token).await
}
}
#[async_trait]
impl<UpIn, UpOut, DownIn, DownOut> Source<DownIn>
for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
where
UpIn: PipelineIO,
DownIn: PipelineIO,
DownOut: PipelineIO,
UpOut: PipelineIO,
{
async fn on_next(&self, data: DownIn, token: private::Token) -> Result<(), Error> {
self.parent.downstream.on_next(data, token).await
}
fn set_edge(&self, edge: Edge<DownIn>, token: private::Token) -> Result<(), PipelineError> {
self.parent.downstream.set_edge(edge, token)
}
}
#[async_trait]
impl<UpIn, UpOut, DownIn, DownOut> Sink<DownOut>
for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
where
UpIn: PipelineIO,
DownIn: PipelineIO,
DownOut: PipelineIO,
UpOut: PipelineIO,
{
async fn on_data(&self, data: DownOut, token: private::Token) -> Result<(), Error> {
self.parent.downstream.on_data(data, token).await
}
}
#[async_trait]
impl<UpIn, UpOut, DownIn, DownOut> Source<UpOut>
for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
where
UpIn: PipelineIO,
DownIn: PipelineIO,
DownOut: PipelineIO,
UpOut: PipelineIO,
{
async fn on_next(&self, data: UpOut, token: private::Token) -> Result<(), Error> {
self.parent.upstream.on_next(data, token).await
}
fn set_edge(&self, edge: Edge<UpOut>, token: private::Token) -> Result<(), PipelineError> {
self.parent.upstream.set_edge(edge, token)
}
}
pub struct PipelineNode<In: PipelineIO, Out: PipelineIO> {
edge: OnceLock<Edge<Out>>,
map_fn: NodeFn<In, Out>,
}
impl<In: PipelineIO, Out: PipelineIO> PipelineNode<In, Out> {
pub fn new(map_fn: NodeFn<In, Out>) -> Arc<Self> {
Arc::new(PipelineNode::<In, Out> {
edge: OnceLock::new(),
map_fn,
})
}
}
#[async_trait]
impl<In: PipelineIO, Out: PipelineIO> Source<Out> for PipelineNode<In, Out> {
async fn on_next(&self, data: Out, _: private::Token) -> Result<(), Error> {
self.edge
.get()
.ok_or(PipelineError::NoEdge)?
.write(data)
.await
}
fn set_edge(&self, edge: Edge<Out>, _: private::Token) -> Result<(), PipelineError> {
self.edge
.set(edge)
.map_err(|_| PipelineError::EdgeAlreadySet)?;
Ok(())
}
}
#[async_trait]
impl<In: PipelineIO, Out: PipelineIO> Sink<In> for PipelineNode<In, Out> {
async fn on_data(&self, data: In, _: private::Token) -> Result<(), Error> {
self.on_next((self.map_fn)(data)?, private::Token).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pipeline::*;
#[tokio::test]
async fn test_pipeline_source_no_edge() {
let source = ServiceFrontend::<SingleIn<()>, ManyOut<()>>::new();
let stream = source.generate(().into()).await;
assert!(stream.is_err());
}
}