skywalking/reporter/
grpc.rs

1// Licensed to the Apache Software Foundation (ASF) under one or more
2// contributor license agreements.  See the NOTICE file distributed with
3// this work for additional information regarding copyright ownership.
4// The ASF licenses this file to You under the Apache License, Version 2.0
5// (the "License"); you may not use this file except in compliance with
6// the License.  You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15//
16
17//! Grpc implementation of [Report].
18
19use super::{CollectItemConsume, CollectItemProduce};
20#[cfg(feature = "management")]
21use crate::proto::v3::management_service_client::ManagementServiceClient;
22use crate::{
23    proto::v3::{
24        LogData, MeterData, SegmentObject, log_report_service_client::LogReportServiceClient,
25        meter_report_service_client::MeterReportServiceClient,
26        trace_segment_report_service_client::TraceSegmentReportServiceClient,
27    },
28    reporter::{CollectItem, Report},
29};
30use futures_core::Stream;
31use futures_util::future::{TryJoinAll, try_join_all};
32use std::{
33    error::Error,
34    future::{Future, pending},
35    pin::Pin,
36    sync::{
37        Arc,
38        atomic::{AtomicBool, Ordering},
39    },
40    task::{Context, Poll},
41    time::Duration,
42};
43use tokio::{
44    select,
45    sync::{
46        Mutex,
47        mpsc::{self, Receiver, Sender},
48    },
49    task::JoinHandle,
50    try_join,
51};
52use tokio_stream::StreamExt;
53use tonic::{
54    Request, Status,
55    metadata::{Ascii, MetadataValue},
56    service::{Interceptor, interceptor::InterceptedService},
57    transport::{self, Channel, Endpoint},
58};
59use tracing::error;
60
61type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
62type DynErrHandler = dyn Fn(&str, &dyn Error) + Send + Sync + 'static;
63type DynStatusHandler = dyn Fn(&str, &Status) + Send + Sync + 'static;
64
65fn default_err_handle(message: &str, err: &dyn Error) {
66    error!(?err, "{}", message);
67}
68
69fn default_status_handle(message: &str, status: &Status) {
70    error!(?status, "{}", message);
71}
72
73#[derive(Default, Clone)]
74struct CustomInterceptor {
75    authentication: Option<Arc<String>>,
76    custom_intercept: Option<Arc<DynInterceptHandler>>,
77}
78
79impl Interceptor for CustomInterceptor {
80    fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
81        if let Some(authentication) = &self.authentication {
82            if let Ok(authentication) = authentication.parse::<MetadataValue<Ascii>>() {
83                request
84                    .metadata_mut()
85                    .insert("authentication", authentication);
86            }
87        }
88        if let Some(custom_intercept) = &self.custom_intercept {
89            request = custom_intercept(request)?;
90        }
91        Ok(request)
92    }
93}
94
95struct State {
96    is_reporting: AtomicBool,
97    is_closing: AtomicBool,
98}
99
100impl State {
101    fn is_closing(&self) -> bool {
102        self.is_closing.load(Ordering::Relaxed)
103    }
104}
105
106/// Reporter which will report to Skywalking OAP server via grpc protocol.
107pub struct GrpcReporter<P, C> {
108    state: Arc<State>,
109    producer: Arc<P>,
110    consumer: Arc<Mutex<Option<C>>>,
111    err_handle: Arc<DynErrHandler>,
112    channel: Channel,
113    interceptor: CustomInterceptor,
114}
115
116impl GrpcReporter<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
117    /// New with exists [Channel], so you can clone the [Channel] for multiplex.
118    pub fn new(channel: Channel) -> Self {
119        let (p, c) = mpsc::unbounded_channel();
120        Self::new_with_pc(channel, p, c)
121    }
122
123    /// Connect to the Skywalking OAP server.
124    pub async fn connect(
125        address: impl TryInto<Endpoint, Error = transport::Error>,
126    ) -> crate::Result<Self> {
127        let endpoint = address.try_into()?;
128        let channel = endpoint.connect().await?;
129        Ok(Self::new(channel))
130    }
131}
132
133impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
134    /// Special purpose, used for user-defined produce and consume operations,
135    /// usually you can use [GrpcReporter::connect] and [GrpcReporter::new].
136    pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
137        Self {
138            state: Arc::new(State {
139                is_reporting: Default::default(),
140                is_closing: Default::default(),
141            }),
142            producer: Arc::new(producer),
143            consumer: Arc::new(Mutex::new(Some(consumer))),
144            err_handle: Arc::new(default_err_handle),
145            channel,
146            interceptor: Default::default(),
147        }
148    }
149
150    /// Set error handle. By default, the error will not be handle.
151    pub fn with_err_handle(
152        mut self,
153        handle: impl Fn(&str, &dyn Error) + Send + Sync + 'static,
154    ) -> Self {
155        self.err_handle = Arc::new(handle);
156        self
157    }
158
159    /// Set the authentication header value. By default, the authentication is
160    /// not set.
161    pub fn with_authentication(mut self, authentication: impl Into<String>) -> Self {
162        self.interceptor.authentication = Some(Arc::new(authentication.into()));
163        self
164    }
165
166    /// Set the custom intercept. By default, the custom intercept is not set.
167    pub fn with_custom_intercept(
168        mut self,
169        custom_intercept: impl Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static,
170    ) -> Self {
171        self.interceptor.custom_intercept = Some(Arc::new(custom_intercept));
172        self
173    }
174
175    /// Start to reporting.
176    ///
177    /// # Panics
178    ///
179    /// Panic if call more than once.
180    pub async fn reporting(&self) -> Reporting<C> {
181        if self.state.is_reporting.swap(true, Ordering::Relaxed) {
182            panic!("reporting already called");
183        }
184
185        let (trace_sender, trace_receiver) = mpsc::channel(255);
186        let (log_sender, log_receiver) = mpsc::channel(255);
187        let (meter_sender, meter_receiver) = mpsc::channel(255);
188
189        let status_handle = Arc::new(default_status_handle);
190
191        Reporting {
192            report_sender: ReportSender {
193                state: Arc::clone(&self.state),
194                inner_report_sender: InnerReportSender {
195                    status_handle: Arc::new(default_status_handle),
196                    err_handle: self.err_handle.clone(),
197                    trace_sender,
198                    log_sender,
199                    meter_sender,
200
201                    #[cfg(feature = "management")]
202                    management_client: ManagementServiceClient::with_interceptor(
203                        self.channel.clone(),
204                        self.interceptor.clone(),
205                    ),
206                },
207                shutdown_signal: Box::pin(pending()),
208                consumer: self.consumer.lock().await.take().unwrap(),
209            },
210
211            trace_receive_reporter: TraceReceiveReporter {
212                trace_client: TraceSegmentReportServiceClient::with_interceptor(
213                    self.channel.clone(),
214                    self.interceptor.clone(),
215                ),
216                trace_receiver,
217                status_handle: status_handle.clone(),
218            },
219
220            log_receive_reporter: LogReceiveReporter {
221                log_client: LogReportServiceClient::with_interceptor(
222                    self.channel.clone(),
223                    self.interceptor.clone(),
224                ),
225                log_receiver,
226                status_handle: status_handle.clone(),
227            },
228
229            meter_receive_reporter: MeterReceiveReporter {
230                meter_client: MeterReportServiceClient::with_interceptor(
231                    self.channel.clone(),
232                    self.interceptor.clone(),
233                ),
234                meter_receiver,
235                status_handle,
236            },
237        }
238    }
239}
240
241impl<P, C> Clone for GrpcReporter<P, C> {
242    fn clone(&self) -> Self {
243        Self {
244            state: self.state.clone(),
245            producer: self.producer.clone(),
246            consumer: self.consumer.clone(),
247            err_handle: self.err_handle.clone(),
248            channel: self.channel.clone(),
249            interceptor: self.interceptor.clone(),
250        }
251    }
252}
253
254impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C> {
255    fn report(&self, item: CollectItem) {
256        if !self.state.is_closing() {
257            if let Err(e) = self.producer.produce(item) {
258                (self.err_handle)("report collect item failed", &*e);
259            }
260        }
261    }
262}
263
264struct InnerReportSender {
265    status_handle: Arc<DynStatusHandler>,
266    err_handle: Arc<DynErrHandler>,
267
268    trace_sender: Sender<SegmentObject>,
269    log_sender: Sender<LogData>,
270    meter_sender: Sender<MeterData>,
271
272    #[cfg(feature = "management")]
273    management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
274}
275
276impl InnerReportSender {
277    async fn report(&mut self, item: CollectItem) {
278        match item {
279            CollectItem::Trace(item) => {
280                if let Err(e) = self.trace_sender.try_send(*item) {
281                    (self.err_handle)("report trace segment failed", &e as &dyn Error);
282                }
283            }
284            CollectItem::Log(item) => {
285                if let Err(e) = self.log_sender.try_send(*item) {
286                    (self.err_handle)("report log data failed", &e as &dyn Error);
287                }
288            }
289            CollectItem::Meter(item) => {
290                if let Err(e) = self.meter_sender.try_send(*item) {
291                    (self.err_handle)("report meter data failed", &e as &dyn Error);
292                }
293            }
294            #[cfg(feature = "management")]
295            CollectItem::Instance(item) => {
296                if let Err(e) = self
297                    .management_client
298                    .report_instance_properties(*item)
299                    .await
300                {
301                    (self.status_handle)("Report instance properties failed", &e);
302                }
303            }
304            #[cfg(feature = "management")]
305            CollectItem::Ping(item) => {
306                if let Err(e) = self.management_client.keep_alive(*item).await {
307                    (self.status_handle)("Ping failed", &e);
308                }
309            }
310        }
311    }
312}
313
314struct ReportSender<C> {
315    state: Arc<State>,
316    inner_report_sender: InnerReportSender,
317    consumer: C,
318    shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
319}
320
321impl<C: CollectItemConsume> ReportSender<C> {
322    async fn start(self) -> crate::Result<()> {
323        let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
324        let ReportSender {
325            state,
326            mut inner_report_sender,
327            consumer: mut collect_item_consumer,
328            shutdown_signal,
329            ..
330        } = self;
331
332        let work_fut = async move {
333            loop {
334                select! {
335                    item = collect_item_consumer.consume() => {
336                        match item {
337                            Ok(Some(item)) => {
338                                inner_report_sender.report(item).await;
339                            }
340                            Ok(None) => break,
341                            Err(err) => return Err(crate::Error::Other(err)),
342                        }
343                    }
344                    _ =  shutdown_rx.recv() => break,
345                }
346            }
347
348            state.is_closing.store(true, Ordering::Relaxed);
349
350            // Flush.
351            loop {
352                match collect_item_consumer.try_consume().await {
353                    Ok(Some(item)) => {
354                        inner_report_sender.report(item).await;
355                    }
356                    Ok(None) => break,
357                    Err(err) => return Err(err.into()),
358                }
359            }
360
361            Ok::<_, crate::Error>(())
362        };
363
364        let shutdown_fut = async move {
365            shutdown_signal.await;
366            shutdown_tx
367                .send(())
368                .map_err(|e| crate::Error::Other(Box::new(e)))?;
369            Ok(())
370        };
371
372        try_join!(work_fut, shutdown_fut)?;
373
374        Ok(())
375    }
376}
377
378/// Handle of [GrpcReporter::reporting].
379pub struct Reporting<C> {
380    report_sender: ReportSender<C>,
381    trace_receive_reporter: TraceReceiveReporter,
382    log_receive_reporter: LogReceiveReporter,
383    meter_receive_reporter: MeterReceiveReporter,
384}
385
386impl<C: CollectItemConsume> Reporting<C> {
387    /// Quit when shutdown_signal received.
388    ///
389    /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
390    pub fn with_graceful_shutdown(
391        mut self,
392        shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
393    ) -> Self {
394        self.report_sender.shutdown_signal = Box::pin(shutdown_signal);
395        self
396    }
397
398    /// Set the failed status handle. By default, the status will not be handle.
399    pub fn with_status_handle(
400        mut self,
401        handle: impl Fn(&str, &Status) + Send + Sync + 'static,
402    ) -> Self {
403        let handle = Arc::new(handle);
404        self.report_sender.inner_report_sender.status_handle = handle.clone();
405        self.trace_receive_reporter.status_handle = handle.clone();
406        self.log_receive_reporter.status_handle = handle.clone();
407        self.meter_receive_reporter.status_handle = handle;
408        self
409    }
410
411    /// Spawn the reporting in background.
412    pub fn spawn(self) -> ReportingJoinHandle {
413        ReportingJoinHandle {
414            handles: try_join_all(vec![
415                tokio::spawn(self.report_sender.start()),
416                tokio::spawn(self.trace_receive_reporter.start()),
417                tokio::spawn(self.log_receive_reporter.start()),
418                tokio::spawn(self.meter_receive_reporter.start()),
419            ]),
420        }
421    }
422}
423
424/// Handle of [Reporting::spawn].
425pub struct ReportingJoinHandle {
426    handles: TryJoinAll<JoinHandle<crate::Result<()>>>,
427}
428
429impl Future for ReportingJoinHandle {
430    type Output = crate::Result<()>;
431
432    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
433        Pin::new(&mut self.handles).poll(cx).map(|rs| {
434            let rs = rs?;
435            for r in rs {
436                r?;
437            }
438            Ok(())
439        })
440    }
441}
442
443struct TraceReceiveReporter {
444    trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
445    trace_receiver: Receiver<SegmentObject>,
446    status_handle: Arc<DynStatusHandler>,
447}
448
449impl TraceReceiveReporter {
450    async fn start(mut self) -> crate::Result<()> {
451        let rf = ReceiveFrom::new(self.trace_receiver);
452        while let Some(stream) = rf.stream() {
453            if let Err(err) = self.trace_client.collect(stream).await {
454                (self.status_handle)("Collect trace segment by stream failed", &err);
455            }
456        }
457        Ok(())
458    }
459}
460
461struct LogReceiveReporter {
462    log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
463    log_receiver: Receiver<LogData>,
464    status_handle: Arc<DynStatusHandler>,
465}
466
467impl LogReceiveReporter {
468    async fn start(mut self) -> crate::Result<()> {
469        let rf = ReceiveFrom::new(self.log_receiver);
470        while let Some(stream) = rf.stream() {
471            if let Err(err) = self.log_client.collect(stream).await {
472                (self.status_handle)("Collect log data by stream failed", &err);
473            }
474        }
475        Ok(())
476    }
477}
478
479struct MeterReceiveReporter {
480    meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
481    meter_receiver: Receiver<MeterData>,
482    status_handle: Arc<DynStatusHandler>,
483}
484
485impl MeterReceiveReporter {
486    async fn start(mut self) -> crate::Result<()> {
487        let rf = ReceiveFrom::new(self.meter_receiver);
488        while let Some(stream) = rf.stream() {
489            if let Err(err) = self.meter_client.collect(stream).await {
490                (self.status_handle)("Collect meter data by stream failed", &err);
491            }
492        }
493        Ok(())
494    }
495}
496
497struct ReceiveFrom<I> {
498    receiver: Arc<Mutex<Receiver<I>>>,
499    is_closed: Arc<AtomicBool>,
500}
501
502impl<I> ReceiveFrom<I> {
503    fn new(receiver: Receiver<I>) -> Self {
504        Self {
505            receiver: Arc::new(Mutex::new(receiver)),
506            is_closed: Default::default(),
507        }
508    }
509
510    fn stream(&self) -> Option<impl Stream<Item = I> + use<I>> {
511        if self.is_closed.load(Ordering::Relaxed) {
512            return None;
513        }
514
515        let is_closed = self.is_closed.clone();
516        let receiver = self.receiver.clone();
517
518        Some(
519            ReceiveFromStream {
520                receiver,
521                is_closed,
522            }
523            .timeout(Duration::from_secs(30))
524            .map_while(|item| item.ok()),
525        )
526    }
527}
528
529struct ReceiveFromStream<I> {
530    receiver: Arc<Mutex<Receiver<I>>>,
531    is_closed: Arc<AtomicBool>,
532}
533
534impl<I> Stream for ReceiveFromStream<I> {
535    type Item = I;
536
537    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
538        self.receiver.try_lock().unwrap().poll_recv(cx).map(|item| {
539            if item.is_none() {
540                self.is_closed.store(true, Ordering::Relaxed);
541            }
542            item
543        })
544    }
545}