use std::sync::atomic::{AtomicBool, Ordering};
use super::Collect;
use crate::common::{
filter_senders_by_indices, replicate, send_pipe_error, senders_as_map, spawn_send,
wait_join_handles, ConfigInto, Context, HasContext, Pipe, PipeChannels, PipeError, Result,
State, SubscribeError,
};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::{
sync::{
mpsc::{error::SendError, Sender},
Mutex,
},
task::JoinHandle,
};
use tracing::{error, info};
pub struct Collector<'a> {
name: &'a str,
context: Arc<Context>,
etx: Option<Sender<PipeError>>,
}
#[async_trait]
impl<'a, T, U, V, C> Pipe<T, U, V, C> for Collector<'a>
where
T: Clone + Send + Sync + 'static,
U: Clone + Send + 'static,
V: Collect<T, U, C> + 'static,
C: ConfigInto<V> + Send + Sync + 'static,
{
async fn run(self, config: C, channels: PipeChannels<T, U>) -> Result<()> {
let name = self.name;
let context = self.context;
let etx = self.etx;
let (mut rx, txs) = channels.into_channels();
assert!(rx.is_some(), "collector '{}' has no upstreams", name);
assert!(!txs.is_empty(), "collector '{}' has no downstreams", name);
let collector: Arc<Mutex<V>> = Arc::new(Mutex::new(config.config_into().await?));
let collector_clone = collector.to_owned();
let exit_c = Arc::new(AtomicBool::new(false));
let exit_c_clone = exit_c.to_owned();
let exit_f = Arc::new(AtomicBool::new(false));
let exit_f_clone = exit_f.to_owned();
let pipe_name = name.to_owned();
let pipe_etx = etx.clone();
let join_collect = tokio::spawn(async move {
let rx = rx.as_mut().unwrap();
info!(
name = pipe_name.as_str(),
ty = "collector",
thread = "collect",
"run ..."
);
loop {
if exit_f_clone.load(Ordering::Acquire) {
break;
}
let t = match (*rx).recv().await {
Some(t) => t,
None => {
exit_c.store(true, Ordering::Release);
break;
}
};
let mut c = collector_clone.lock().await;
match c.collect(t).await {
Ok(()) => continue,
Err(err) => {
error!(
name = pipe_name.as_str(),
ty = "collector",
thread = "collect",
"error '{}' ...",
err
);
send_pipe_error(pipe_etx.as_ref(), PipeError::new(pipe_name.clone(), err))
.await;
}
}
}
info!(
name = pipe_name.as_str(),
ty = "collector",
thread = "collect",
"exit ..."
);
});
let mut txs = senders_as_map(txs);
let pipe_name = name.to_owned();
let join_flush = tokio::spawn(async move {
info!(
name = pipe_name.as_str(),
ty = "collector",
thread = "flush",
"run ..."
);
let mut interval = {
let c = collector.lock().await;
c.get_flush_interval()
};
loop {
context.set_state(State::Receive);
match txs.is_empty() {
true => {
break;
}
false => (),
}
interval.tick().await;
let u = {
let mut c = collector.lock().await;
let u = match c.flush().await {
Ok(u) => u,
Err(err) => {
error!(
name = pipe_name.as_str(),
ty = "collector",
thread = "flush",
"error '{}' ...",
err
);
context.inc_failure_run();
context.inc_total_run();
send_pipe_error(etx.as_ref(), PipeError::new(pipe_name.clone(), err))
.await;
continue;
}
};
match u {
Some(u) => u,
None => continue,
}
};
context.set_state(State::Send);
let mut u_replicas = replicate(u, txs.len());
assert!(!u_replicas.is_empty(), "empty replicas");
let jhs: HashMap<usize, JoinHandle<core::result::Result<(), SendError<U>>>> = txs
.iter()
.map(|(idx, tx)| {
(
idx.to_owned(),
spawn_send(tx.to_owned(), u_replicas.pop().expect("no replica left")),
)
})
.collect();
assert!(u_replicas.is_empty(), "replica leftover");
let drop_sender_indices = wait_join_handles(jhs).await;
filter_senders_by_indices(&mut txs, drop_sender_indices);
context.inc_total_run();
if exit_c_clone.load(Ordering::Acquire) {
break;
}
}
info!(
name = pipe_name.as_str(),
ty = "collector",
thread = "flush",
"exit ..."
);
exit_f.store(true, Ordering::Release);
context.set_state(State::Done);
});
let pipe_name = name.to_owned();
match tokio::spawn(async move { tokio::join!(join_collect, join_flush) }).await {
Ok(_) => (),
Err(err) => {
error!(
name = pipe_name.as_str(),
ty = "collector",
thread = "join",
"join error '{:#?}'",
err
)
}
}
Ok(())
}
}
impl<'a> HasContext for Collector<'a> {
fn get_name(&self) -> String {
self.name.to_owned()
}
fn get_context(&self) -> Arc<Context> {
self.context.to_owned()
}
}
impl<'a> Collector<'a> {
pub fn new(name: &'a str) -> Self {
Collector {
name,
context: Default::default(),
etx: None,
}
}
}
impl<'a> SubscribeError for Collector<'a> {
fn subscribe_error(&mut self, tx: Sender<crate::common::PipeError>) {
self.etx = Some(tx)
}
}
#[macro_export]
macro_rules! collector {
(
$name:expr
) => {{
Collector::new($name)
}};
}