async_profiler_agent/reporter/
multi.rs1use async_trait::async_trait;
4
5use crate::metadata::ReportMetadata;
6
7use super::Reporter;
8
9use std::fmt;
10
11#[derive(Debug, thiserror::Error)]
14struct MultiError {
15 errors: Vec<(String, Box<dyn std::error::Error + Send>)>,
16}
17
18impl fmt::Display for MultiError {
19 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20 write!(f, "{{")?;
21 let mut first = true;
22 for (reporter, err) in self.errors.iter() {
23 if !first {
24 write!(f, ", ")?;
25 }
26 first = false;
27 write!(f, "{reporter}: {err}")?;
28 }
29 write!(f, "}}")
30 }
31}
32
33#[derive(Debug)]
34#[cfg_attr(feature = "s3-no-defaults", doc = "```no_run")]
43#[cfg_attr(not(feature = "s3-no-defaults"), doc = "```compile_fail")]
44pub struct MultiReporter {
79 reporters: Vec<Box<dyn Reporter + Send + Sync>>,
80}
81
82impl MultiReporter {
83 pub fn new(reporters: Vec<Box<dyn Reporter + Send + Sync>>) -> Self {
85 MultiReporter { reporters }
86 }
87}
88
89#[async_trait]
90impl Reporter for MultiReporter {
91 async fn report(
92 &self,
93 jfr: Vec<u8>,
94 metadata: &ReportMetadata,
95 ) -> Result<(), Box<dyn std::error::Error + Send>> {
96 let jfr_ref = &jfr[..];
97 let errors = futures::future::join_all(self.reporters.iter().map(|reporter| async move {
98 reporter
99 .report(jfr_ref.to_owned(), metadata)
100 .await
101 .map_err(move |e| (format!("{reporter:?}"), e))
102 }))
103 .await;
104 let errors: Vec<_> = errors.into_iter().flat_map(|e| e.err()).collect();
106 if errors.is_empty() {
107 Ok(())
108 } else {
109 Err(Box::new(MultiError { errors }))
110 }
111 }
112}
113
114#[cfg(test)]
115mod test {
116 use std::{
117 sync::{
118 Arc,
119 atomic::{self, AtomicBool},
120 },
121 time::Duration,
122 };
123
124 use async_trait::async_trait;
125
126 use crate::{
127 metadata::{DUMMY_METADATA, ReportMetadata},
128 reporter::Reporter,
129 };
130
131 use super::MultiReporter;
132
133 #[derive(Debug)]
134 struct OkReporter(Arc<AtomicBool>);
135 #[async_trait]
136 impl Reporter for OkReporter {
137 async fn report(
138 &self,
139 _jfr: Vec<u8>,
140 _metadata: &ReportMetadata,
141 ) -> Result<(), Box<dyn std::error::Error + Send>> {
142 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
143 self.0.store(true, atomic::Ordering::Relaxed);
144 Ok(())
145 }
146 }
147
148 #[derive(Debug, thiserror::Error)]
149 enum Error {
150 #[error("failed: {0}")]
151 Failed(String),
152 }
153
154 #[derive(Debug)]
155 struct ErrReporter(String);
156 #[async_trait]
157 impl Reporter for ErrReporter {
158 async fn report(
159 &self,
160 _jfr: Vec<u8>,
161 _metadata: &ReportMetadata,
162 ) -> Result<(), Box<dyn std::error::Error + Send>> {
163 Err(Box::new(Error::Failed(self.0.clone())))
164 }
165 }
166
167 #[tokio::test(start_paused = true)]
168 async fn test_multi_reporter_ok() {
169 let signals: Vec<_> = (0..10).map(|_| Arc::new(AtomicBool::new(false))).collect();
170 let reporter = MultiReporter::new(
171 signals
172 .iter()
173 .map(|signal| {
174 Box::new(OkReporter(signal.clone())) as Box<dyn Reporter + Send + Sync>
175 })
176 .collect(),
177 );
178 tokio::time::timeout(
180 Duration::from_secs(2),
181 reporter.report(vec![], &DUMMY_METADATA),
182 )
183 .await
184 .unwrap()
185 .unwrap();
186 assert!(signals.iter().all(|s| s.load(atomic::Ordering::Relaxed)));
188 }
189
190 #[tokio::test(start_paused = true)]
191 async fn test_multi_reporter_err() {
192 let signal_before = Arc::new(AtomicBool::new(false));
193 let signal_after = Arc::new(AtomicBool::new(false));
194 let reporter = MultiReporter::new(vec![
195 Box::new(OkReporter(signal_before.clone())) as Box<dyn Reporter + Send + Sync>,
196 Box::new(ErrReporter("foo".to_owned())) as Box<dyn Reporter + Send + Sync>,
197 Box::new(ErrReporter("bar".to_owned())) as Box<dyn Reporter + Send + Sync>,
198 Box::new(OkReporter(signal_after.clone())) as Box<dyn Reporter + Send + Sync>,
199 ]);
200 let err = format!(
202 "{}",
203 reporter.report(vec![], &DUMMY_METADATA).await.unwrap_err()
204 );
205 assert_eq!(
206 err,
207 "{ErrReporter(\"foo\"): failed: foo, ErrReporter(\"bar\"): failed: bar}"
208 );
209 assert!(signal_before.load(atomic::Ordering::Relaxed));
211 assert!(signal_after.load(atomic::Ordering::Relaxed));
212 }
213}