async_profiler_agent/reporter/
multi.rs

1//! A reporter that reports profiling results to several destinations.
2
3use async_trait::async_trait;
4
5use crate::metadata::ReportMetadata;
6
7use super::Reporter;
8
9use std::fmt;
10
11/// An aggregated error that contains an error per reporter. A reporter is identified
12/// by the result of its Debug impl.
13#[derive(Debug, thiserror::Error)]
14struct MultiError {
15    errors: Vec<(String, Box<dyn std::error::Error + Send>)>,
16}
17
18impl fmt::Display for MultiError {
19    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20        write!(f, "{{")?;
21        let mut first = true;
22        for (reporter, err) in self.errors.iter() {
23            if !first {
24                write!(f, ", ")?;
25            }
26            first = false;
27            write!(f, "{reporter}: {err}")?;
28        }
29        write!(f, "}}")
30    }
31}
32
33#[derive(Debug)]
34/// A reporter that reports profiling results to several destinations.
35///
36/// If one of the destinations errors, it will continue reporting to the other ones.
37///
38/// ## Example
39///
40/// Output to both S3 and a local directory:
41///
42#[cfg_attr(feature = "s3-no-defaults", doc = "```no_run")]
43#[cfg_attr(not(feature = "s3-no-defaults"), doc = "```compile_fail")]
44/// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
45/// # use async_profiler_agent::reporter::Reporter;
46/// # use async_profiler_agent::reporter::local::LocalReporter;
47/// # use async_profiler_agent::reporter::multi::MultiReporter;
48/// # use async_profiler_agent::reporter::s3::{S3Reporter, S3ReporterConfig};
49/// # use aws_config::BehaviorVersion;
50/// # use std::path::PathBuf;
51/// #
52/// # #[tokio::main]
53/// # async fn main() -> Result<(), SpawnError> {
54/// let bucket_owner = "<your account id>";
55/// let bucket_name = "<your bucket name>";
56/// let profiling_group = "a-name-to-give-the-uploaded-data";
57/// let path = PathBuf::from("path/to/write/jfrs");
58///
59/// let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
60///
61/// let reporter = MultiReporter::new(vec![
62///     Box::new(LocalReporter::new(path)),
63///     Box::new(S3Reporter::new(S3ReporterConfig {
64///        sdk_config: &sdk_config,
65///        bucket_owner: bucket_owner.into(),
66///        bucket_name: bucket_name.into(),
67///        profiling_group_name: profiling_group.into(),
68///     })),
69/// ]);
70/// let profiler = ProfilerBuilder::default()
71///    .with_reporter(reporter)
72///    .build();
73///
74/// profiler.spawn()?;
75/// # Ok(())
76/// # }
77/// ```
78pub struct MultiReporter {
79    reporters: Vec<Box<dyn Reporter + Send + Sync>>,
80}
81
82impl MultiReporter {
83    /// Create a new MultiReporter from a set of reporters
84    pub fn new(reporters: Vec<Box<dyn Reporter + Send + Sync>>) -> Self {
85        MultiReporter { reporters }
86    }
87}
88
89#[async_trait]
90impl Reporter for MultiReporter {
91    async fn report(
92        &self,
93        jfr: Vec<u8>,
94        metadata: &ReportMetadata,
95    ) -> Result<(), Box<dyn std::error::Error + Send>> {
96        let jfr_ref = &jfr[..];
97        let errors = futures::future::join_all(self.reporters.iter().map(|reporter| async move {
98            reporter
99                .report(jfr_ref.to_owned(), metadata)
100                .await
101                .map_err(move |e| (format!("{reporter:?}"), e))
102        }))
103        .await;
104        // return all errors
105        let errors: Vec<_> = errors.into_iter().flat_map(|e| e.err()).collect();
106        if errors.is_empty() {
107            Ok(())
108        } else {
109            Err(Box::new(MultiError { errors }))
110        }
111    }
112}
113
114#[cfg(test)]
115mod test {
116    use std::{
117        sync::{
118            Arc,
119            atomic::{self, AtomicBool},
120        },
121        time::Duration,
122    };
123
124    use async_trait::async_trait;
125
126    use crate::{
127        metadata::{DUMMY_METADATA, ReportMetadata},
128        reporter::Reporter,
129    };
130
131    use super::MultiReporter;
132
133    #[derive(Debug)]
134    struct OkReporter(Arc<AtomicBool>);
135    #[async_trait]
136    impl Reporter for OkReporter {
137        async fn report(
138            &self,
139            _jfr: Vec<u8>,
140            _metadata: &ReportMetadata,
141        ) -> Result<(), Box<dyn std::error::Error + Send>> {
142            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
143            self.0.store(true, atomic::Ordering::Relaxed);
144            Ok(())
145        }
146    }
147
148    #[derive(Debug, thiserror::Error)]
149    enum Error {
150        #[error("failed: {0}")]
151        Failed(String),
152    }
153
154    #[derive(Debug)]
155    struct ErrReporter(String);
156    #[async_trait]
157    impl Reporter for ErrReporter {
158        async fn report(
159            &self,
160            _jfr: Vec<u8>,
161            _metadata: &ReportMetadata,
162        ) -> Result<(), Box<dyn std::error::Error + Send>> {
163            Err(Box::new(Error::Failed(self.0.clone())))
164        }
165    }
166
167    #[tokio::test(start_paused = true)]
168    async fn test_multi_reporter_ok() {
169        let signals: Vec<_> = (0..10).map(|_| Arc::new(AtomicBool::new(false))).collect();
170        let reporter = MultiReporter::new(
171            signals
172                .iter()
173                .map(|signal| {
174                    Box::new(OkReporter(signal.clone())) as Box<dyn Reporter + Send + Sync>
175                })
176                .collect(),
177        );
178        // test that reports are done in parallel
179        tokio::time::timeout(
180            Duration::from_secs(2),
181            reporter.report(vec![], &DUMMY_METADATA),
182        )
183        .await
184        .unwrap()
185        .unwrap();
186        // test that reports are done
187        assert!(signals.iter().all(|s| s.load(atomic::Ordering::Relaxed)));
188    }
189
190    #[tokio::test(start_paused = true)]
191    async fn test_multi_reporter_err() {
192        let signal_before = Arc::new(AtomicBool::new(false));
193        let signal_after = Arc::new(AtomicBool::new(false));
194        let reporter = MultiReporter::new(vec![
195            Box::new(OkReporter(signal_before.clone())) as Box<dyn Reporter + Send + Sync>,
196            Box::new(ErrReporter("foo".to_owned())) as Box<dyn Reporter + Send + Sync>,
197            Box::new(ErrReporter("bar".to_owned())) as Box<dyn Reporter + Send + Sync>,
198            Box::new(OkReporter(signal_after.clone())) as Box<dyn Reporter + Send + Sync>,
199        ]);
200        // test that reports are done and return an error
201        let err = format!(
202            "{}",
203            reporter.report(vec![], &DUMMY_METADATA).await.unwrap_err()
204        );
205        assert_eq!(
206            err,
207            "{ErrReporter(\"foo\"): failed: foo, ErrReporter(\"bar\"): failed: bar}"
208        );
209        // test that reports are done even though a reporter errored
210        assert!(signal_before.load(atomic::Ordering::Relaxed));
211        assert!(signal_after.load(atomic::Ordering::Relaxed));
212    }
213}