use async_trait::async_trait;
use crate::metadata::ReportMetadata;
use super::Reporter;
use std::fmt;
use std::path::Path;
#[derive(Debug, thiserror::Error)]
struct MultiError {
errors: Vec<(String, Box<dyn std::error::Error + Send>)>,
}
impl fmt::Display for MultiError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{")?;
let mut first = true;
for (reporter, err) in self.errors.iter() {
if !first {
write!(f, ", ")?;
}
first = false;
write!(f, "{reporter}: {err}")?;
}
write!(f, "}}")
}
}
#[derive(Debug)]
#[cfg_attr(feature = "s3-no-defaults", doc = "```no_run")]
#[cfg_attr(not(feature = "s3-no-defaults"), doc = "```compile_fail")]
pub struct MultiReporter {
reporters: Vec<Box<dyn Reporter + Send + Sync>>,
}
impl MultiReporter {
pub fn new(reporters: Vec<Box<dyn Reporter + Send + Sync>>) -> Self {
MultiReporter { reporters }
}
}
#[async_trait]
impl Reporter for MultiReporter {
async fn report(
&self,
jfr: Vec<u8>,
metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let jfr_ref = &jfr[..];
let errors = futures::future::join_all(self.reporters.iter().map(|reporter| async move {
reporter
.report(jfr_ref.to_owned(), metadata)
.await
.map_err(move |e| (format!("{reporter:?}"), e))
}))
.await;
let errors: Vec<_> = errors.into_iter().flat_map(|e| e.err()).collect();
if errors.is_empty() {
Ok(())
} else {
Err(Box::new(MultiError { errors }))
}
}
fn report_blocking(
&self,
jfr_path: &Path,
metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let errors: Vec<_> = self
.reporters
.iter()
.filter_map(|reporter| {
reporter
.report_blocking(jfr_path, metadata)
.err()
.map(|e| (format!("{reporter:?}"), e))
})
.collect();
if errors.is_empty() {
Ok(())
} else {
Err(Box::new(MultiError { errors }))
}
}
}
#[cfg(test)]
mod test {
use std::{
sync::{
Arc,
atomic::{self, AtomicBool},
},
time::Duration,
};
use async_trait::async_trait;
use crate::{
metadata::{DUMMY_METADATA, ReportMetadata},
reporter::Reporter,
};
use super::MultiReporter;
#[derive(Debug)]
struct OkReporter(Arc<AtomicBool>);
#[async_trait]
impl Reporter for OkReporter {
async fn report(
&self,
_jfr: Vec<u8>,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
self.0.store(true, atomic::Ordering::Relaxed);
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
enum Error {
#[error("failed: {0}")]
Failed(String),
}
#[derive(Debug)]
struct ErrReporter(String);
#[async_trait]
impl Reporter for ErrReporter {
async fn report(
&self,
_jfr: Vec<u8>,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
Err(Box::new(Error::Failed(self.0.clone())))
}
}
#[tokio::test(start_paused = true)]
async fn test_multi_reporter_ok() {
let signals: Vec<_> = (0..10).map(|_| Arc::new(AtomicBool::new(false))).collect();
let reporter = MultiReporter::new(
signals
.iter()
.map(|signal| {
Box::new(OkReporter(signal.clone())) as Box<dyn Reporter + Send + Sync>
})
.collect(),
);
tokio::time::timeout(
Duration::from_secs(2),
reporter.report(vec![], &DUMMY_METADATA),
)
.await
.unwrap()
.unwrap();
assert!(signals.iter().all(|s| s.load(atomic::Ordering::Relaxed)));
}
#[tokio::test(start_paused = true)]
async fn test_multi_reporter_err() {
let signal_before = Arc::new(AtomicBool::new(false));
let signal_after = Arc::new(AtomicBool::new(false));
let reporter = MultiReporter::new(vec![
Box::new(OkReporter(signal_before.clone())) as Box<dyn Reporter + Send + Sync>,
Box::new(ErrReporter("foo".to_owned())) as Box<dyn Reporter + Send + Sync>,
Box::new(ErrReporter("bar".to_owned())) as Box<dyn Reporter + Send + Sync>,
Box::new(OkReporter(signal_after.clone())) as Box<dyn Reporter + Send + Sync>,
]);
let err = format!(
"{}",
reporter.report(vec![], &DUMMY_METADATA).await.unwrap_err()
);
assert_eq!(
err,
"{ErrReporter(\"foo\"): failed: foo, ErrReporter(\"bar\"): failed: bar}"
);
assert!(signal_before.load(atomic::Ordering::Relaxed));
assert!(signal_after.load(atomic::Ordering::Relaxed));
}
}