async_profiler_agent/reporter/
multi.rs1use async_trait::async_trait;
4
5use crate::metadata::ReportMetadata;
6
7use super::Reporter;
8
9use std::fmt;
10use std::path::Path;
11
12#[derive(Debug, thiserror::Error)]
15struct MultiError {
16 errors: Vec<(String, Box<dyn std::error::Error + Send>)>,
17}
18
19impl fmt::Display for MultiError {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 write!(f, "{{")?;
22 let mut first = true;
23 for (reporter, err) in self.errors.iter() {
24 if !first {
25 write!(f, ", ")?;
26 }
27 first = false;
28 write!(f, "{reporter}: {err}")?;
29 }
30 write!(f, "}}")
31 }
32}
33
34#[derive(Debug)]
35#[cfg_attr(feature = "s3-no-defaults", doc = "```no_run")]
44#[cfg_attr(not(feature = "s3-no-defaults"), doc = "```compile_fail")]
45pub struct MultiReporter {
80 reporters: Vec<Box<dyn Reporter + Send + Sync>>,
81}
82
83impl MultiReporter {
84 pub fn new(reporters: Vec<Box<dyn Reporter + Send + Sync>>) -> Self {
86 MultiReporter { reporters }
87 }
88}
89
90#[async_trait]
91impl Reporter for MultiReporter {
92 async fn report(
93 &self,
94 jfr: Vec<u8>,
95 metadata: &ReportMetadata,
96 ) -> Result<(), Box<dyn std::error::Error + Send>> {
97 let jfr_ref = &jfr[..];
98 let errors = futures::future::join_all(self.reporters.iter().map(|reporter| async move {
99 reporter
100 .report(jfr_ref.to_owned(), metadata)
101 .await
102 .map_err(move |e| (format!("{reporter:?}"), e))
103 }))
104 .await;
105 let errors: Vec<_> = errors.into_iter().flat_map(|e| e.err()).collect();
107 if errors.is_empty() {
108 Ok(())
109 } else {
110 Err(Box::new(MultiError { errors }))
111 }
112 }
113
114 fn report_blocking(
115 &self,
116 jfr_path: &Path,
117 metadata: &ReportMetadata,
118 ) -> Result<(), Box<dyn std::error::Error + Send>> {
119 let errors: Vec<_> = self
120 .reporters
121 .iter()
122 .filter_map(|reporter| {
123 reporter
124 .report_blocking(jfr_path, metadata)
125 .err()
126 .map(|e| (format!("{reporter:?}"), e))
127 })
128 .collect();
129 if errors.is_empty() {
130 Ok(())
131 } else {
132 Err(Box::new(MultiError { errors }))
133 }
134 }
135}
136
137#[cfg(test)]
138mod test {
139 use std::{
140 sync::{
141 Arc,
142 atomic::{self, AtomicBool},
143 },
144 time::Duration,
145 };
146
147 use async_trait::async_trait;
148
149 use crate::{
150 metadata::{DUMMY_METADATA, ReportMetadata},
151 reporter::Reporter,
152 };
153
154 use super::MultiReporter;
155
156 #[derive(Debug)]
157 struct OkReporter(Arc<AtomicBool>);
158 #[async_trait]
159 impl Reporter for OkReporter {
160 async fn report(
161 &self,
162 _jfr: Vec<u8>,
163 _metadata: &ReportMetadata,
164 ) -> Result<(), Box<dyn std::error::Error + Send>> {
165 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
166 self.0.store(true, atomic::Ordering::Relaxed);
167 Ok(())
168 }
169 }
170
171 #[derive(Debug, thiserror::Error)]
172 enum Error {
173 #[error("failed: {0}")]
174 Failed(String),
175 }
176
177 #[derive(Debug)]
178 struct ErrReporter(String);
179 #[async_trait]
180 impl Reporter for ErrReporter {
181 async fn report(
182 &self,
183 _jfr: Vec<u8>,
184 _metadata: &ReportMetadata,
185 ) -> Result<(), Box<dyn std::error::Error + Send>> {
186 Err(Box::new(Error::Failed(self.0.clone())))
187 }
188 }
189
190 #[tokio::test(start_paused = true)]
191 async fn test_multi_reporter_ok() {
192 let signals: Vec<_> = (0..10).map(|_| Arc::new(AtomicBool::new(false))).collect();
193 let reporter = MultiReporter::new(
194 signals
195 .iter()
196 .map(|signal| {
197 Box::new(OkReporter(signal.clone())) as Box<dyn Reporter + Send + Sync>
198 })
199 .collect(),
200 );
201 tokio::time::timeout(
203 Duration::from_secs(2),
204 reporter.report(vec![], &DUMMY_METADATA),
205 )
206 .await
207 .unwrap()
208 .unwrap();
209 assert!(signals.iter().all(|s| s.load(atomic::Ordering::Relaxed)));
211 }
212
213 #[tokio::test(start_paused = true)]
214 async fn test_multi_reporter_err() {
215 let signal_before = Arc::new(AtomicBool::new(false));
216 let signal_after = Arc::new(AtomicBool::new(false));
217 let reporter = MultiReporter::new(vec![
218 Box::new(OkReporter(signal_before.clone())) as Box<dyn Reporter + Send + Sync>,
219 Box::new(ErrReporter("foo".to_owned())) as Box<dyn Reporter + Send + Sync>,
220 Box::new(ErrReporter("bar".to_owned())) as Box<dyn Reporter + Send + Sync>,
221 Box::new(OkReporter(signal_after.clone())) as Box<dyn Reporter + Send + Sync>,
222 ]);
223 let err = format!(
225 "{}",
226 reporter.report(vec![], &DUMMY_METADATA).await.unwrap_err()
227 );
228 assert_eq!(
229 err,
230 "{ErrReporter(\"foo\"): failed: foo, ErrReporter(\"bar\"): failed: bar}"
231 );
232 assert!(signal_before.load(atomic::Ordering::Relaxed));
234 assert!(signal_after.load(atomic::Ordering::Relaxed));
235 }
236}