1use super::{CollectItemConsume, CollectItemProduce};
20#[cfg(feature = "management")]
21use crate::proto::v3::management_service_client::ManagementServiceClient;
22use crate::{
23 proto::v3::{
24 LogData, MeterData, SegmentObject, log_report_service_client::LogReportServiceClient,
25 meter_report_service_client::MeterReportServiceClient,
26 trace_segment_report_service_client::TraceSegmentReportServiceClient,
27 },
28 reporter::{CollectItem, Report},
29};
30use futures_core::Stream;
31use futures_util::future::{TryJoinAll, try_join_all};
32use std::{
33 error::Error,
34 future::{Future, pending},
35 pin::Pin,
36 sync::{
37 Arc,
38 atomic::{AtomicBool, Ordering},
39 },
40 task::{Context, Poll},
41 time::Duration,
42};
43use tokio::{
44 select,
45 sync::{
46 Mutex,
47 mpsc::{self, Receiver, Sender},
48 },
49 task::JoinHandle,
50 try_join,
51};
52use tokio_stream::StreamExt;
53use tonic::{
54 Request, Status,
55 metadata::{Ascii, MetadataValue},
56 service::{Interceptor, interceptor::InterceptedService},
57 transport::{self, Channel, Endpoint},
58};
59use tracing::error;
60
61type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
62type DynErrHandler = dyn Fn(&str, &dyn Error) + Send + Sync + 'static;
63type DynStatusHandler = dyn Fn(&str, &Status) + Send + Sync + 'static;
64
65fn default_err_handle(message: &str, err: &dyn Error) {
66 error!(?err, "{}", message);
67}
68
69fn default_status_handle(message: &str, status: &Status) {
70 error!(?status, "{}", message);
71}
72
73#[derive(Default, Clone)]
74struct CustomInterceptor {
75 authentication: Option<Arc<String>>,
76 custom_intercept: Option<Arc<DynInterceptHandler>>,
77}
78
79impl Interceptor for CustomInterceptor {
80 fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
81 if let Some(authentication) = &self.authentication {
82 if let Ok(authentication) = authentication.parse::<MetadataValue<Ascii>>() {
83 request
84 .metadata_mut()
85 .insert("authentication", authentication);
86 }
87 }
88 if let Some(custom_intercept) = &self.custom_intercept {
89 request = custom_intercept(request)?;
90 }
91 Ok(request)
92 }
93}
94
95struct State {
96 is_reporting: AtomicBool,
97 is_closing: AtomicBool,
98}
99
100impl State {
101 fn is_closing(&self) -> bool {
102 self.is_closing.load(Ordering::Relaxed)
103 }
104}
105
106pub struct GrpcReporter<P, C> {
108 state: Arc<State>,
109 producer: Arc<P>,
110 consumer: Arc<Mutex<Option<C>>>,
111 err_handle: Arc<DynErrHandler>,
112 channel: Channel,
113 interceptor: CustomInterceptor,
114}
115
116impl GrpcReporter<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
117 pub fn new(channel: Channel) -> Self {
119 let (p, c) = mpsc::unbounded_channel();
120 Self::new_with_pc(channel, p, c)
121 }
122
123 pub async fn connect(
125 address: impl TryInto<Endpoint, Error = transport::Error>,
126 ) -> crate::Result<Self> {
127 let endpoint = address.try_into()?;
128 let channel = endpoint.connect().await?;
129 Ok(Self::new(channel))
130 }
131}
132
133impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
134 pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
137 Self {
138 state: Arc::new(State {
139 is_reporting: Default::default(),
140 is_closing: Default::default(),
141 }),
142 producer: Arc::new(producer),
143 consumer: Arc::new(Mutex::new(Some(consumer))),
144 err_handle: Arc::new(default_err_handle),
145 channel,
146 interceptor: Default::default(),
147 }
148 }
149
150 pub fn with_err_handle(
152 mut self,
153 handle: impl Fn(&str, &dyn Error) + Send + Sync + 'static,
154 ) -> Self {
155 self.err_handle = Arc::new(handle);
156 self
157 }
158
159 pub fn with_authentication(mut self, authentication: impl Into<String>) -> Self {
162 self.interceptor.authentication = Some(Arc::new(authentication.into()));
163 self
164 }
165
166 pub fn with_custom_intercept(
168 mut self,
169 custom_intercept: impl Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static,
170 ) -> Self {
171 self.interceptor.custom_intercept = Some(Arc::new(custom_intercept));
172 self
173 }
174
175 pub async fn reporting(&self) -> Reporting<C> {
181 if self.state.is_reporting.swap(true, Ordering::Relaxed) {
182 panic!("reporting already called");
183 }
184
185 let (trace_sender, trace_receiver) = mpsc::channel(255);
186 let (log_sender, log_receiver) = mpsc::channel(255);
187 let (meter_sender, meter_receiver) = mpsc::channel(255);
188
189 let status_handle = Arc::new(default_status_handle);
190
191 Reporting {
192 report_sender: ReportSender {
193 state: Arc::clone(&self.state),
194 inner_report_sender: InnerReportSender {
195 status_handle: Arc::new(default_status_handle),
196 err_handle: self.err_handle.clone(),
197 trace_sender,
198 log_sender,
199 meter_sender,
200
201 #[cfg(feature = "management")]
202 management_client: ManagementServiceClient::with_interceptor(
203 self.channel.clone(),
204 self.interceptor.clone(),
205 ),
206 },
207 shutdown_signal: Box::pin(pending()),
208 consumer: self.consumer.lock().await.take().unwrap(),
209 },
210
211 trace_receive_reporter: TraceReceiveReporter {
212 trace_client: TraceSegmentReportServiceClient::with_interceptor(
213 self.channel.clone(),
214 self.interceptor.clone(),
215 ),
216 trace_receiver,
217 status_handle: status_handle.clone(),
218 },
219
220 log_receive_reporter: LogReceiveReporter {
221 log_client: LogReportServiceClient::with_interceptor(
222 self.channel.clone(),
223 self.interceptor.clone(),
224 ),
225 log_receiver,
226 status_handle: status_handle.clone(),
227 },
228
229 meter_receive_reporter: MeterReceiveReporter {
230 meter_client: MeterReportServiceClient::with_interceptor(
231 self.channel.clone(),
232 self.interceptor.clone(),
233 ),
234 meter_receiver,
235 status_handle,
236 },
237 }
238 }
239}
240
241impl<P, C> Clone for GrpcReporter<P, C> {
242 fn clone(&self) -> Self {
243 Self {
244 state: self.state.clone(),
245 producer: self.producer.clone(),
246 consumer: self.consumer.clone(),
247 err_handle: self.err_handle.clone(),
248 channel: self.channel.clone(),
249 interceptor: self.interceptor.clone(),
250 }
251 }
252}
253
254impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C> {
255 fn report(&self, item: CollectItem) {
256 if !self.state.is_closing() {
257 if let Err(e) = self.producer.produce(item) {
258 (self.err_handle)("report collect item failed", &*e);
259 }
260 }
261 }
262}
263
264struct InnerReportSender {
265 status_handle: Arc<DynStatusHandler>,
266 err_handle: Arc<DynErrHandler>,
267
268 trace_sender: Sender<SegmentObject>,
269 log_sender: Sender<LogData>,
270 meter_sender: Sender<MeterData>,
271
272 #[cfg(feature = "management")]
273 management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
274}
275
276impl InnerReportSender {
277 async fn report(&mut self, item: CollectItem) {
278 match item {
279 CollectItem::Trace(item) => {
280 if let Err(e) = self.trace_sender.try_send(*item) {
281 (self.err_handle)("report trace segment failed", &e as &dyn Error);
282 }
283 }
284 CollectItem::Log(item) => {
285 if let Err(e) = self.log_sender.try_send(*item) {
286 (self.err_handle)("report log data failed", &e as &dyn Error);
287 }
288 }
289 CollectItem::Meter(item) => {
290 if let Err(e) = self.meter_sender.try_send(*item) {
291 (self.err_handle)("report meter data failed", &e as &dyn Error);
292 }
293 }
294 #[cfg(feature = "management")]
295 CollectItem::Instance(item) => {
296 if let Err(e) = self
297 .management_client
298 .report_instance_properties(*item)
299 .await
300 {
301 (self.status_handle)("Report instance properties failed", &e);
302 }
303 }
304 #[cfg(feature = "management")]
305 CollectItem::Ping(item) => {
306 if let Err(e) = self.management_client.keep_alive(*item).await {
307 (self.status_handle)("Ping failed", &e);
308 }
309 }
310 }
311 }
312}
313
314struct ReportSender<C> {
315 state: Arc<State>,
316 inner_report_sender: InnerReportSender,
317 consumer: C,
318 shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
319}
320
321impl<C: CollectItemConsume> ReportSender<C> {
322 async fn start(self) -> crate::Result<()> {
323 let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
324 let ReportSender {
325 state,
326 mut inner_report_sender,
327 consumer: mut collect_item_consumer,
328 shutdown_signal,
329 ..
330 } = self;
331
332 let work_fut = async move {
333 loop {
334 select! {
335 item = collect_item_consumer.consume() => {
336 match item {
337 Ok(Some(item)) => {
338 inner_report_sender.report(item).await;
339 }
340 Ok(None) => break,
341 Err(err) => return Err(crate::Error::Other(err)),
342 }
343 }
344 _ = shutdown_rx.recv() => break,
345 }
346 }
347
348 state.is_closing.store(true, Ordering::Relaxed);
349
350 loop {
352 match collect_item_consumer.try_consume().await {
353 Ok(Some(item)) => {
354 inner_report_sender.report(item).await;
355 }
356 Ok(None) => break,
357 Err(err) => return Err(err.into()),
358 }
359 }
360
361 Ok::<_, crate::Error>(())
362 };
363
364 let shutdown_fut = async move {
365 shutdown_signal.await;
366 shutdown_tx
367 .send(())
368 .map_err(|e| crate::Error::Other(Box::new(e)))?;
369 Ok(())
370 };
371
372 try_join!(work_fut, shutdown_fut)?;
373
374 Ok(())
375 }
376}
377
378pub struct Reporting<C> {
380 report_sender: ReportSender<C>,
381 trace_receive_reporter: TraceReceiveReporter,
382 log_receive_reporter: LogReceiveReporter,
383 meter_receive_reporter: MeterReceiveReporter,
384}
385
386impl<C: CollectItemConsume> Reporting<C> {
387 pub fn with_graceful_shutdown(
391 mut self,
392 shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
393 ) -> Self {
394 self.report_sender.shutdown_signal = Box::pin(shutdown_signal);
395 self
396 }
397
398 pub fn with_status_handle(
400 mut self,
401 handle: impl Fn(&str, &Status) + Send + Sync + 'static,
402 ) -> Self {
403 let handle = Arc::new(handle);
404 self.report_sender.inner_report_sender.status_handle = handle.clone();
405 self.trace_receive_reporter.status_handle = handle.clone();
406 self.log_receive_reporter.status_handle = handle.clone();
407 self.meter_receive_reporter.status_handle = handle;
408 self
409 }
410
411 pub fn spawn(self) -> ReportingJoinHandle {
413 ReportingJoinHandle {
414 handles: try_join_all(vec![
415 tokio::spawn(self.report_sender.start()),
416 tokio::spawn(self.trace_receive_reporter.start()),
417 tokio::spawn(self.log_receive_reporter.start()),
418 tokio::spawn(self.meter_receive_reporter.start()),
419 ]),
420 }
421 }
422}
423
424pub struct ReportingJoinHandle {
426 handles: TryJoinAll<JoinHandle<crate::Result<()>>>,
427}
428
429impl Future for ReportingJoinHandle {
430 type Output = crate::Result<()>;
431
432 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
433 Pin::new(&mut self.handles).poll(cx).map(|rs| {
434 let rs = rs?;
435 for r in rs {
436 r?;
437 }
438 Ok(())
439 })
440 }
441}
442
443struct TraceReceiveReporter {
444 trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
445 trace_receiver: Receiver<SegmentObject>,
446 status_handle: Arc<DynStatusHandler>,
447}
448
449impl TraceReceiveReporter {
450 async fn start(mut self) -> crate::Result<()> {
451 let rf = ReceiveFrom::new(self.trace_receiver);
452 while let Some(stream) = rf.stream() {
453 if let Err(err) = self.trace_client.collect(stream).await {
454 (self.status_handle)("Collect trace segment by stream failed", &err);
455 }
456 }
457 Ok(())
458 }
459}
460
461struct LogReceiveReporter {
462 log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
463 log_receiver: Receiver<LogData>,
464 status_handle: Arc<DynStatusHandler>,
465}
466
467impl LogReceiveReporter {
468 async fn start(mut self) -> crate::Result<()> {
469 let rf = ReceiveFrom::new(self.log_receiver);
470 while let Some(stream) = rf.stream() {
471 if let Err(err) = self.log_client.collect(stream).await {
472 (self.status_handle)("Collect log data by stream failed", &err);
473 }
474 }
475 Ok(())
476 }
477}
478
479struct MeterReceiveReporter {
480 meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
481 meter_receiver: Receiver<MeterData>,
482 status_handle: Arc<DynStatusHandler>,
483}
484
485impl MeterReceiveReporter {
486 async fn start(mut self) -> crate::Result<()> {
487 let rf = ReceiveFrom::new(self.meter_receiver);
488 while let Some(stream) = rf.stream() {
489 if let Err(err) = self.meter_client.collect(stream).await {
490 (self.status_handle)("Collect meter data by stream failed", &err);
491 }
492 }
493 Ok(())
494 }
495}
496
497struct ReceiveFrom<I> {
498 receiver: Arc<Mutex<Receiver<I>>>,
499 is_closed: Arc<AtomicBool>,
500}
501
502impl<I> ReceiveFrom<I> {
503 fn new(receiver: Receiver<I>) -> Self {
504 Self {
505 receiver: Arc::new(Mutex::new(receiver)),
506 is_closed: Default::default(),
507 }
508 }
509
510 fn stream(&self) -> Option<impl Stream<Item = I> + use<I>> {
511 if self.is_closed.load(Ordering::Relaxed) {
512 return None;
513 }
514
515 let is_closed = self.is_closed.clone();
516 let receiver = self.receiver.clone();
517
518 Some(
519 ReceiveFromStream {
520 receiver,
521 is_closed,
522 }
523 .timeout(Duration::from_secs(30))
524 .map_while(|item| item.ok()),
525 )
526 }
527}
528
529struct ReceiveFromStream<I> {
530 receiver: Arc<Mutex<Receiver<I>>>,
531 is_closed: Arc<AtomicBool>,
532}
533
534impl<I> Stream for ReceiveFromStream<I> {
535 type Item = I;
536
537 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
538 self.receiver.try_lock().unwrap().poll_recv(cx).map(|item| {
539 if item.is_none() {
540 self.is_closed.store(true, Ordering::Relaxed);
541 }
542 item
543 })
544 }
545}