use thiserror::Error;
use tokio::sync::mpsc;
use tower::Service;
use crate::{
message::{InboundMessage, OutboundMessage},
pipeline::SinkService,
};
const DEFAULT_MAX_CONCURRENT_TASKS: usize = 50;
type OutboundMessageSinkService = SinkService<mpsc::UnboundedSender<OutboundMessage>>;
#[derive(Default)]
pub struct Builder<TInSvc, TOutSvc, TOutReq> {
max_concurrent_inbound_tasks: usize,
max_concurrent_outbound_tasks: Option<usize>,
inbound: Option<TInSvc>,
outbound_rx: Option<mpsc::UnboundedReceiver<TOutReq>>,
outbound_pipeline_factory: Option<Box<dyn FnOnce(OutboundMessageSinkService) -> TOutSvc>>,
}
impl Builder<(), (), ()> {
pub fn new() -> Self {
Self {
max_concurrent_inbound_tasks: DEFAULT_MAX_CONCURRENT_TASKS,
max_concurrent_outbound_tasks: None,
inbound: None,
outbound_rx: None,
outbound_pipeline_factory: None,
}
}
}
impl<TInSvc, TOutSvc, TOutReq> Builder<TInSvc, TOutSvc, TOutReq> {
pub fn max_concurrent_inbound_tasks(mut self, max_tasks: usize) -> Self {
self.max_concurrent_inbound_tasks = max_tasks;
self
}
pub fn max_concurrent_outbound_tasks(mut self, max_tasks: usize) -> Self {
self.max_concurrent_outbound_tasks = Some(max_tasks);
self
}
pub fn with_outbound_pipeline<F, S, R>(
self,
receiver: mpsc::UnboundedReceiver<R>,
factory: F,
) -> Builder<TInSvc, S, R>
where
F: FnOnce(OutboundMessageSinkService) -> S + 'static,
S: Service<R> + Clone + Send + 'static,
{
Builder {
outbound_rx: Some(receiver),
outbound_pipeline_factory: Some(Box::new(factory)),
max_concurrent_inbound_tasks: self.max_concurrent_inbound_tasks,
max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks,
inbound: self.inbound,
}
}
pub fn with_inbound_pipeline<S>(self, inbound: S) -> Builder<S, TOutSvc, TOutReq>
where S: Service<InboundMessage> + Clone + Send + 'static {
Builder {
inbound: Some(inbound),
max_concurrent_inbound_tasks: self.max_concurrent_inbound_tasks,
max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks,
outbound_rx: self.outbound_rx,
outbound_pipeline_factory: self.outbound_pipeline_factory,
}
}
}
impl<TInSvc, TOutSvc, TOutReq> Builder<TInSvc, TOutSvc, TOutReq>
where
TOutSvc: Service<TOutReq> + Clone + Send + 'static,
TInSvc: Service<InboundMessage> + Clone + Send + 'static,
{
fn build_outbound(&mut self) -> Result<OutboundPipelineConfig<TOutReq, TOutSvc>, PipelineBuilderError> {
let (out_sender, out_receiver) = mpsc::unbounded_channel();
let in_receiver = self
.outbound_rx
.take()
.ok_or(PipelineBuilderError::OutboundPipelineNotProvided)?;
let factory = self
.outbound_pipeline_factory
.take()
.ok_or(PipelineBuilderError::OutboundPipelineNotProvided)?;
let sink_service = SinkService::new(out_sender);
let pipeline = (factory)(sink_service);
Ok(OutboundPipelineConfig {
in_receiver,
out_receiver: Some(out_receiver),
pipeline,
})
}
pub fn try_finish(mut self) -> Result<Config<TInSvc, TOutSvc, TOutReq>, PipelineBuilderError> {
let inbound = self.inbound.take().ok_or(PipelineBuilderError::InboundNotProvided)?;
let outbound = self.build_outbound()?;
Ok(Config {
max_concurrent_inbound_tasks: self.max_concurrent_inbound_tasks,
max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks,
inbound,
outbound,
})
}
pub fn build(self) -> Config<TInSvc, TOutSvc, TOutReq> {
self.try_finish().unwrap()
}
}
pub struct OutboundPipelineConfig<TInItem, TPipeline> {
pub in_receiver: mpsc::UnboundedReceiver<TInItem>,
pub out_receiver: Option<mpsc::UnboundedReceiver<OutboundMessage>>,
pub pipeline: TPipeline,
}
pub struct Config<TInSvc, TOutSvc, TOutReq> {
pub max_concurrent_inbound_tasks: usize,
pub max_concurrent_outbound_tasks: Option<usize>,
pub inbound: TInSvc,
pub outbound: OutboundPipelineConfig<TOutReq, TOutSvc>,
}
#[derive(Debug, Error)]
pub enum PipelineBuilderError {
#[error("Inbound pipeline was not provided")]
InboundNotProvided,
#[error("Outbound pipeline was not provided")]
OutboundPipelineNotProvided,
}