skywalking 0.10.0

Apache SkyWalking Rust Agent
Documentation
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

//! Manager methods.

use super::instance::Properties;
use crate::reporter::{CollectItem, DynReport, Report};
use std::{
    future::Future,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
    time::Duration,
};
use tokio::{
    spawn,
    task::{JoinError, JoinHandle},
    time,
};

/// Manager handles skywalking management operations, integrate with reporter.
pub struct Manager {
    service_name: String,
    instance_name: String,
    reporter: Arc<DynReport>,
}

impl Manager {
    /// New with service info and reporter.
    pub fn new(
        service_name: impl Into<String>,
        instance_name: impl Into<String>,
        reporter: impl Report + Send + Sync + 'static,
    ) -> Self {
        Self {
            service_name: service_name.into(),
            instance_name: instance_name.into(),
            reporter: Arc::new(reporter),
        }
    }

    /// Get service name.
    pub fn service_name(&self) -> &str {
        &self.service_name
    }

    /// Get instance name.
    pub fn instance_name(&self) -> &str {
        &self.instance_name
    }

    /// Report instance properties.
    pub fn report_properties(&self, properties: Properties) {
        Self::reporter_report_properties(
            &self.reporter,
            self.service_name.clone(),
            self.instance_name.clone(),
            properties,
        );
    }

    fn reporter_report_properties(
        reporter: &Arc<DynReport>,
        service_name: String,
        instance_name: String,
        properties: Properties,
    ) {
        let props = properties.convert_to_instance_properties(service_name, instance_name);
        reporter.report(CollectItem::Instance(Box::new(props)));
    }

    /// Do keep alive once.
    pub fn keep_alive(&self) {
        Self::reporter_keep_alive(
            &self.reporter,
            self.service_name.clone(),
            self.instance_name.clone(),
        );
    }

    fn reporter_keep_alive(reporter: &Arc<DynReport>, service_name: String, instance_name: String) {
        reporter.report(CollectItem::Ping(Box::new(
            crate::proto::v3::InstancePingPkg {
                service: service_name,
                service_instance: instance_name,
                layer: Default::default(),
            },
        )));
    }

    /// Continuously report instance properties and keep alive. Run in
    /// background.
    ///
    /// Parameter `heartbeat_period` represents agent heartbeat report period.
    ///
    /// Parameter `properties_report_period_factor` represents agent sends the
    /// instance properties to the backend every `heartbeat_period` *
    /// `properties_report_period_factor` seconds.
    pub fn report_and_keep_alive(
        &self,
        properties: impl Fn() -> Properties + Send + 'static,
        heartbeat_period: Duration,
        properties_report_period_factor: usize,
    ) -> ReportAndKeepAlive {
        let service_name = self.service_name.clone();
        let instance_name = self.instance_name.clone();
        let reporter = self.reporter.clone();

        let handle = spawn(async move {
            let mut counter = 0;

            let mut ticker = time::interval(heartbeat_period);
            loop {
                ticker.tick().await;

                if counter == 0 {
                    Self::reporter_report_properties(
                        &reporter,
                        service_name.clone(),
                        instance_name.clone(),
                        properties(),
                    );
                } else {
                    Self::reporter_keep_alive(
                        &reporter,
                        service_name.clone(),
                        instance_name.clone(),
                    );
                }

                counter += 1;

                if counter >= properties_report_period_factor {
                    counter = 0;
                }
            }
        });
        ReportAndKeepAlive { handle }
    }
}

/// Handle of [Manager::report_and_keep_alive].
pub struct ReportAndKeepAlive {
    handle: JoinHandle<()>,
}

impl ReportAndKeepAlive {
    /// Get the inner tokio join handle.
    pub fn handle(&self) -> &JoinHandle<()> {
        &self.handle
    }
}

impl Future for ReportAndKeepAlive {
    type Output = Result<(), JoinError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.handle).poll(cx)
    }
}