skywalking 0.5.0

Apache SkyWalking Rust Agent
Documentation
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

//! Grpc implementation of [Report].

#[cfg(feature = "management")]
use crate::skywalking_proto::v3::management_service_client::ManagementServiceClient;
use crate::{
    reporter::{CollectItem, Report},
    skywalking_proto::v3::{
        log_report_service_client::LogReportServiceClient,
        meter_report_service_client::MeterReportServiceClient,
        trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, MeterData,
        SegmentObject,
    },
};
use futures_util::stream;
use std::{
    collections::LinkedList,
    error::Error,
    future::{pending, Future},
    mem::take,
    pin::Pin,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    task::{Context, Poll},
};
use tokio::{
    select,
    sync::{mpsc, Mutex},
    task::JoinHandle,
    try_join,
};
use tonic::{
    async_trait,
    metadata::{Ascii, MetadataValue},
    service::{interceptor::InterceptedService, Interceptor},
    transport::{self, Channel, Endpoint},
    Request, Status,
};

/// Special purpose, used for user-defined production operations. Generally, it
/// does not need to be handled.
pub trait CollectItemProduce: Send + Sync + 'static {
    /// Produce the collect item non-blocking.
    fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>>;
}

impl CollectItemProduce for () {
    fn produce(&self, _item: CollectItem) -> Result<(), Box<dyn Error>> {
        Ok(())
    }
}

impl CollectItemProduce for mpsc::UnboundedSender<CollectItem> {
    fn produce(&self, item: CollectItem) -> Result<(), Box<dyn Error>> {
        Ok(self.send(item)?)
    }
}

/// Special purpose, used for user-defined consume operations. Generally, it
/// does not need to be handled.
#[async_trait]
pub trait CollectItemConsume: Send + Sync + 'static {
    /// Consume the collect item blocking.
    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>>;

    /// Try to consume the collect item non-blocking.
    async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>>;
}

#[async_trait]
impl CollectItemConsume for () {
    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
        Ok(None)
    }

    async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
        Ok(None)
    }
}

#[async_trait]
impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
        Ok(self.recv().await)
    }

    async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
        use mpsc::error::TryRecvError;

        match self.try_recv() {
            Ok(item) => Ok(Some(item)),
            Err(e) => match e {
                TryRecvError::Empty => Ok(None),
                TryRecvError::Disconnected => Err(Box::new(e)),
            },
        }
    }
}

type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;

#[derive(Default, Clone)]
struct CustomInterceptor {
    authentication: Option<Arc<String>>,
    custom_intercept: Option<Arc<DynInterceptHandler>>,
}

impl Interceptor for CustomInterceptor {
    fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
        if let Some(authentication) = &self.authentication {
            if let Ok(authentication) = authentication.parse::<MetadataValue<Ascii>>() {
                request
                    .metadata_mut()
                    .insert("authentication", authentication);
            }
        }
        if let Some(custom_intercept) = &self.custom_intercept {
            request = custom_intercept(request)?;
        }
        Ok(request)
    }
}

struct Inner<P, C> {
    producer: P,
    consumer: Mutex<Option<C>>,
    is_reporting: AtomicBool,
    is_closed: AtomicBool,
}

/// Alias of dyn [Error] callback.
pub type DynErrHandle = dyn Fn(Box<dyn Error>) + Send + Sync + 'static;

/// Reporter which will report to Skywalking OAP server via grpc protocol.
pub struct GrpcReporter<P, C> {
    inner: Arc<Inner<P, C>>,
    err_handle: Arc<Option<Box<DynErrHandle>>>,
    channel: Channel,
    interceptor: CustomInterceptor,
}

impl GrpcReporter<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
    /// New with exists [Channel], so you can clone the [Channel] for multiplex.
    pub fn new(channel: Channel) -> Self {
        let (p, c) = mpsc::unbounded_channel();
        Self::new_with_pc(channel, p, c)
    }

    /// Connect to the Skywalking OAP server.
    pub async fn connect(
        address: impl TryInto<Endpoint, Error = transport::Error>,
    ) -> crate::Result<Self> {
        let endpoint = address.try_into()?;
        let channel = endpoint.connect().await?;
        Ok(Self::new(channel))
    }
}

impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
    /// Special purpose, used for user-defined produce and consume operations,
    /// usually you can use [GrpcReporter::connect] and [GrpcReporter::new].
    pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
        Self {
            inner: Arc::new(Inner {
                producer,
                consumer: Mutex::new(Some(consumer)),
                is_reporting: Default::default(),
                is_closed: Default::default(),
            }),
            err_handle: Default::default(),
            channel,
            interceptor: Default::default(),
        }
    }

    /// Set error handle. By default, the error will not be handle.
    pub fn with_err_handle(
        mut self,
        handle: impl Fn(Box<dyn Error>) + Send + Sync + 'static,
    ) -> Self {
        self.err_handle = Arc::new(Some(Box::new(handle)));
        self
    }

    /// Set the authentication header value. By default, the authentication is
    /// not set.
    pub fn with_authentication(mut self, authentication: impl Into<String>) -> Self {
        self.interceptor.authentication = Some(Arc::new(authentication.into()));
        self
    }

    /// Set the custom intercept. By default, the custom intercept is not set.
    pub fn with_custom_intercept(
        mut self,
        custom_intercept: impl Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static,
    ) -> Self {
        self.interceptor.custom_intercept = Some(Arc::new(custom_intercept));
        self
    }

    /// Start to reporting.
    ///
    /// # Panics
    ///
    /// Panic if call more than once.
    pub async fn reporting(&self) -> Reporting<P, C> {
        if self.inner.is_reporting.swap(true, Ordering::Relaxed) {
            panic!("reporting already called");
        }

        Reporting {
            rb: ReporterAndBuffer {
                inner: Arc::clone(&self.inner),
                status_handle: None,

                trace_buffer: Default::default(),
                log_buffer: Default::default(),
                meter_buffer: Default::default(),

                trace_client: TraceSegmentReportServiceClient::with_interceptor(
                    self.channel.clone(),
                    self.interceptor.clone(),
                ),
                log_client: LogReportServiceClient::with_interceptor(
                    self.channel.clone(),
                    self.interceptor.clone(),
                ),
                meter_client: MeterReportServiceClient::with_interceptor(
                    self.channel.clone(),
                    self.interceptor.clone(),
                ),
                #[cfg(feature = "management")]
                management_client: ManagementServiceClient::with_interceptor(
                    self.channel.clone(),
                    self.interceptor.clone(),
                ),
            },
            shutdown_signal: Box::pin(pending()),
            consumer: self.inner.consumer.lock().await.take().unwrap(),
        }
    }
}

impl<P, C> Clone for GrpcReporter<P, C> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            err_handle: self.err_handle.clone(),
            channel: self.channel.clone(),
            interceptor: self.interceptor.clone(),
        }
    }
}

impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C> {
    fn report(&self, item: CollectItem) {
        if !self.inner.is_closed.load(Ordering::Relaxed) {
            if let Err(e) = self.inner.producer.produce(item) {
                if let Some(handle) = self.err_handle.as_deref() {
                    handle(e);
                }
            }
        }
    }
}

struct ReporterAndBuffer<P, C> {
    inner: Arc<Inner<P, C>>,
    status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,

    trace_buffer: LinkedList<SegmentObject>,
    log_buffer: LinkedList<LogData>,
    meter_buffer: LinkedList<MeterData>,

    trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
    log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
    meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
    #[cfg(feature = "management")]
    #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
    management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
}

impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
    async fn report(&mut self, item: CollectItem) {
        // TODO Implement batch collect in future.
        match item {
            CollectItem::Trace(item) => {
                self.trace_buffer.push_back(*item);
            }
            CollectItem::Log(item) => {
                self.log_buffer.push_back(*item);
            }
            CollectItem::Meter(item) => {
                self.meter_buffer.push_back(*item);
            }
            #[cfg(feature = "management")]
            CollectItem::Instance(item) => {
                if let Err(e) = self
                    .management_client
                    .report_instance_properties(*item)
                    .await
                {
                    if let Some(status_handle) = &self.status_handle {
                        status_handle(e);
                    }
                }
            }
            #[cfg(feature = "management")]
            CollectItem::Ping(item) => {
                if let Err(e) = self.management_client.keep_alive(*item).await {
                    if let Some(status_handle) = &self.status_handle {
                        status_handle(e);
                    }
                }
            }
        }

        if !self.trace_buffer.is_empty() {
            let buffer = take(&mut self.trace_buffer);
            if let Err(e) = self.trace_client.collect(stream::iter(buffer)).await {
                if let Some(status_handle) = &self.status_handle {
                    status_handle(e);
                }
            }
        }
        if !self.log_buffer.is_empty() {
            let buffer = take(&mut self.log_buffer);
            if let Err(e) = self.log_client.collect(stream::iter(buffer)).await {
                if let Some(status_handle) = &self.status_handle {
                    status_handle(e);
                }
            }
        }

        if !self.meter_buffer.is_empty() {
            let buffer = take(&mut self.meter_buffer);
            if let Err(e) = self.meter_client.collect(stream::iter(buffer)).await {
                if let Some(status_handle) = &self.status_handle {
                    status_handle(e);
                }
            }
        }
    }
}

/// Handle of [GrpcReporter::reporting].
pub struct Reporting<P, C> {
    rb: ReporterAndBuffer<P, C>,
    consumer: C,
    shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
}

impl<P: CollectItemProduce, C: CollectItemConsume> Reporting<P, C> {
    /// Quit when shutdown_signal received.
    ///
    /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
    pub fn with_graceful_shutdown(
        mut self,
        shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
    ) -> Self {
        self.shutdown_signal = Box::pin(shutdown_signal);
        self
    }

    /// Set the failed status handle. By default, the status will not be handle.
    pub fn with_status_handle(mut self, handle: impl Fn(tonic::Status) + Send + 'static) -> Self {
        self.rb.status_handle = Some(Box::new(handle));
        self
    }

    /// Spawn the reporting in background.
    pub fn spawn(self) -> ReportingJoinHandle {
        ReportingJoinHandle {
            handle: tokio::spawn(self.start()),
        }
    }

    /// Start the consume and report task.
    pub async fn start(self) -> crate::Result<()> {
        let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
        let Reporting {
            mut rb,
            mut consumer,
            shutdown_signal,
        } = self;

        let work_fut = async move {
            loop {
                select! {
                    item = consumer.consume() => {
                        match item {
                            Ok(Some(item)) => {
                                rb.report(item).await;
                            }
                            Ok(None) => break,
                            Err(err) => return Err(crate::Error::Other(err)),
                        }
                    }
                    _ =  shutdown_rx.recv() => break,
                }
            }

            rb.inner.is_closed.store(true, Ordering::Relaxed);

            // Flush.
            loop {
                match consumer.try_consume().await {
                    Ok(Some(item)) => {
                        rb.report(item).await;
                    }
                    Ok(None) => break,
                    Err(err) => return Err(err.into()),
                }
            }

            Ok::<_, crate::Error>(())
        };

        let shutdown_fut = async move {
            shutdown_signal.await;
            shutdown_tx
                .send(())
                .map_err(|e| crate::Error::Other(Box::new(e)))?;
            Ok(())
        };

        try_join!(work_fut, shutdown_fut)?;

        Ok(())
    }
}

/// Handle of [Reporting::spawn].
pub struct ReportingJoinHandle {
    handle: JoinHandle<crate::Result<()>>,
}

impl Future for ReportingJoinHandle {
    type Output = crate::Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.handle).poll(cx).map(|r| r?)
    }
}