Skip to main content

async_profiler_agent/reporter/
multi.rs

1//! A reporter that reports profiling results to several destinations.
2
3use async_trait::async_trait;
4
5use crate::metadata::ReportMetadata;
6
7use super::Reporter;
8
9use std::fmt;
10use std::path::Path;
11
12/// An aggregated error that contains an error per reporter. A reporter is identified
13/// by the result of its Debug impl.
14#[derive(Debug, thiserror::Error)]
15struct MultiError {
16    errors: Vec<(String, Box<dyn std::error::Error + Send>)>,
17}
18
19impl fmt::Display for MultiError {
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        write!(f, "{{")?;
22        let mut first = true;
23        for (reporter, err) in self.errors.iter() {
24            if !first {
25                write!(f, ", ")?;
26            }
27            first = false;
28            write!(f, "{reporter}: {err}")?;
29        }
30        write!(f, "}}")
31    }
32}
33
34#[derive(Debug)]
35/// A reporter that reports profiling results to several destinations.
36///
37/// If one of the destinations errors, it will continue reporting to the other ones.
38///
39/// ## Example
40///
41/// Output to both S3 and a local directory:
42///
43#[cfg_attr(feature = "s3-no-defaults", doc = "```no_run")]
44#[cfg_attr(not(feature = "s3-no-defaults"), doc = "```compile_fail")]
45/// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
46/// # use async_profiler_agent::reporter::Reporter;
47/// # use async_profiler_agent::reporter::local::LocalReporter;
48/// # use async_profiler_agent::reporter::multi::MultiReporter;
49/// # use async_profiler_agent::reporter::s3::{S3Reporter, S3ReporterConfig};
50/// # use aws_config::BehaviorVersion;
51/// # use std::path::PathBuf;
52/// #
53/// # #[tokio::main]
54/// # async fn main() -> Result<(), SpawnError> {
55/// let bucket_owner = "<your account id>";
56/// let bucket_name = "<your bucket name>";
57/// let profiling_group = "a-name-to-give-the-uploaded-data";
58/// let path = PathBuf::from("path/to/write/jfrs");
59///
60/// let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
61///
62/// let reporter = MultiReporter::new(vec![
63///     Box::new(LocalReporter::new(path)),
64///     Box::new(S3Reporter::new(S3ReporterConfig {
65///        sdk_config: &sdk_config,
66///        bucket_owner: bucket_owner.into(),
67///        bucket_name: bucket_name.into(),
68///        profiling_group_name: profiling_group.into(),
69///     })),
70/// ]);
71/// let profiler = ProfilerBuilder::default()
72///    .with_reporter(reporter)
73///    .build();
74///
75/// profiler.spawn()?;
76/// # Ok(())
77/// # }
78/// ```
79pub struct MultiReporter {
80    reporters: Vec<Box<dyn Reporter + Send + Sync>>,
81}
82
83impl MultiReporter {
84    /// Create a new MultiReporter from a set of reporters
85    pub fn new(reporters: Vec<Box<dyn Reporter + Send + Sync>>) -> Self {
86        MultiReporter { reporters }
87    }
88}
89
90#[async_trait]
91impl Reporter for MultiReporter {
92    async fn report(
93        &self,
94        jfr: Vec<u8>,
95        metadata: &ReportMetadata,
96    ) -> Result<(), Box<dyn std::error::Error + Send>> {
97        let jfr_ref = &jfr[..];
98        let errors = futures::future::join_all(self.reporters.iter().map(|reporter| async move {
99            reporter
100                .report(jfr_ref.to_owned(), metadata)
101                .await
102                .map_err(move |e| (format!("{reporter:?}"), e))
103        }))
104        .await;
105        // return all errors
106        let errors: Vec<_> = errors.into_iter().flat_map(|e| e.err()).collect();
107        if errors.is_empty() {
108            Ok(())
109        } else {
110            Err(Box::new(MultiError { errors }))
111        }
112    }
113
114    fn report_blocking(
115        &self,
116        jfr_path: &Path,
117        metadata: &ReportMetadata,
118    ) -> Result<(), Box<dyn std::error::Error + Send>> {
119        let errors: Vec<_> = self
120            .reporters
121            .iter()
122            .filter_map(|reporter| {
123                reporter
124                    .report_blocking(jfr_path, metadata)
125                    .err()
126                    .map(|e| (format!("{reporter:?}"), e))
127            })
128            .collect();
129        if errors.is_empty() {
130            Ok(())
131        } else {
132            Err(Box::new(MultiError { errors }))
133        }
134    }
135}
136
137#[cfg(test)]
138mod test {
139    use std::{
140        sync::{
141            Arc,
142            atomic::{self, AtomicBool},
143        },
144        time::Duration,
145    };
146
147    use async_trait::async_trait;
148
149    use crate::{
150        metadata::{DUMMY_METADATA, ReportMetadata},
151        reporter::Reporter,
152    };
153
154    use super::MultiReporter;
155
156    #[derive(Debug)]
157    struct OkReporter(Arc<AtomicBool>);
158    #[async_trait]
159    impl Reporter for OkReporter {
160        async fn report(
161            &self,
162            _jfr: Vec<u8>,
163            _metadata: &ReportMetadata,
164        ) -> Result<(), Box<dyn std::error::Error + Send>> {
165            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
166            self.0.store(true, atomic::Ordering::Relaxed);
167            Ok(())
168        }
169    }
170
171    #[derive(Debug, thiserror::Error)]
172    enum Error {
173        #[error("failed: {0}")]
174        Failed(String),
175    }
176
177    #[derive(Debug)]
178    struct ErrReporter(String);
179    #[async_trait]
180    impl Reporter for ErrReporter {
181        async fn report(
182            &self,
183            _jfr: Vec<u8>,
184            _metadata: &ReportMetadata,
185        ) -> Result<(), Box<dyn std::error::Error + Send>> {
186            Err(Box::new(Error::Failed(self.0.clone())))
187        }
188    }
189
190    #[tokio::test(start_paused = true)]
191    async fn test_multi_reporter_ok() {
192        let signals: Vec<_> = (0..10).map(|_| Arc::new(AtomicBool::new(false))).collect();
193        let reporter = MultiReporter::new(
194            signals
195                .iter()
196                .map(|signal| {
197                    Box::new(OkReporter(signal.clone())) as Box<dyn Reporter + Send + Sync>
198                })
199                .collect(),
200        );
201        // test that reports are done in parallel
202        tokio::time::timeout(
203            Duration::from_secs(2),
204            reporter.report(vec![], &DUMMY_METADATA),
205        )
206        .await
207        .unwrap()
208        .unwrap();
209        // test that reports are done
210        assert!(signals.iter().all(|s| s.load(atomic::Ordering::Relaxed)));
211    }
212
213    #[tokio::test(start_paused = true)]
214    async fn test_multi_reporter_err() {
215        let signal_before = Arc::new(AtomicBool::new(false));
216        let signal_after = Arc::new(AtomicBool::new(false));
217        let reporter = MultiReporter::new(vec![
218            Box::new(OkReporter(signal_before.clone())) as Box<dyn Reporter + Send + Sync>,
219            Box::new(ErrReporter("foo".to_owned())) as Box<dyn Reporter + Send + Sync>,
220            Box::new(ErrReporter("bar".to_owned())) as Box<dyn Reporter + Send + Sync>,
221            Box::new(OkReporter(signal_after.clone())) as Box<dyn Reporter + Send + Sync>,
222        ]);
223        // test that reports are done and return an error
224        let err = format!(
225            "{}",
226            reporter.report(vec![], &DUMMY_METADATA).await.unwrap_err()
227        );
228        assert_eq!(
229            err,
230            "{ErrReporter(\"foo\"): failed: foo, ErrReporter(\"bar\"): failed: bar}"
231        );
232        // test that reports are done even though a reporter errored
233        assert!(signal_before.load(atomic::Ordering::Relaxed));
234        assert!(signal_after.load(atomic::Ordering::Relaxed));
235    }
236}