use std::{
fmt::Display,
time::{Duration, Instant},
};
use log::*;
use tokio::time;
use tower::{Service, ServiceExt};
use crate::{bounded_executor::BoundedExecutor, pipeline::builder::OutboundPipelineConfig};
const LOG_TARGET: &str = "comms::pipeline::outbound";
pub struct Outbound<TPipeline, TItem> {
executor: BoundedExecutor,
config: OutboundPipelineConfig<TItem, TPipeline>,
}
impl<TPipeline, TItem> Outbound<TPipeline, TItem>
where
TItem: Send + 'static,
TPipeline: Service<TItem, Response = ()> + Clone + Send + 'static,
TPipeline::Error: Display + Send,
TPipeline::Future: Send,
{
pub fn new(executor: BoundedExecutor, config: OutboundPipelineConfig<TItem, TPipeline>) -> Self {
Self { executor, config }
}
pub async fn run(mut self) {
let mut current_id = 0;
while let Some(msg) = self.config.in_receiver.recv().await {
let num_available = self.executor.num_available();
let max_available = self.executor.max_available();
log!(
target: LOG_TARGET,
if num_available < max_available {
Level::Debug
} else {
Level::Trace
},
"Outbound pipeline usage: {}/{}",
max_available - num_available,
max_available
);
let pipeline = self.config.pipeline.clone();
let id = current_id;
current_id = (current_id + 1) % u64::MAX;
self.executor
.spawn(async move {
let timer = Instant::now();
trace!(target: LOG_TARGET, "Start outbound pipeline {id}");
match time::timeout(Duration::from_secs(10), pipeline.oneshot(msg)).await {
Ok(Ok(_)) => {},
Ok(Err(err)) => {
error!(
target: LOG_TARGET,
"Outbound pipeline {id} returned an error: '{err}'"
);
},
Err(err) => {
debug!(
target: LOG_TARGET,
"Outbound pipeline {id} timed out and was aborted. THIS SHOULD NOT HAPPEN: there was a \
deadlock or excessive delay in processing this pipeline. {err}"
);
},
}
trace!(
target: LOG_TARGET,
"Finished outbound pipeline {} in {:.2?}",
id,
timer.elapsed()
);
})
.await;
}
info!(
target: LOG_TARGET,
"Outbound pipeline is shutting down because the in channel closed"
);
}
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use tari_test_utils::collect_recv;
use tokio::sync::mpsc;
use super::*;
use crate::{message::OutboundMessage, pipeline::SinkService, utils};
#[tokio::test]
async fn run() {
const NUM_ITEMS: usize = 10;
let (tx, mut in_receiver) = mpsc::unbounded_channel();
utils::mpsc::send_all_unbounded(
&tx,
(0..NUM_ITEMS).map(|i| OutboundMessage::new(Default::default(), Bytes::copy_from_slice(&i.to_be_bytes()))),
)
.unwrap();
in_receiver.close();
let (out_tx, mut out_rx) = mpsc::unbounded_channel();
let executor = BoundedExecutor::new(100);
let pipeline = Outbound::new(executor, OutboundPipelineConfig {
in_receiver,
out_receiver: None,
pipeline: SinkService::new(out_tx),
});
let spawned_task = tokio::spawn(pipeline.run());
let requests = collect_recv!(out_rx, timeout = Duration::from_millis(5));
assert_eq!(requests.len(), NUM_ITEMS);
time::timeout(Duration::from_secs(5), spawned_task)
.await
.unwrap()
.expect("Task should end")
}
}