alibabacloud-rum 0.1.0

Alibaba Cloud RUM SDK for native Rust applications.
Documentation
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{interval, sleep, Duration};

use crate::config::RumConfig;
use crate::error::{Result, RumError};
use crate::event::RumEvent;
use crate::exporter::payload::build_payloads;

pub(crate) enum ExporterCommand {
    Event(Box<RumEvent>),
    Flush(oneshot::Sender<()>),
    Shutdown(oneshot::Sender<()>),
}

#[derive(Clone)]
pub(crate) struct ExporterHandle {
    sender: mpsc::Sender<ExporterCommand>,
}

impl ExporterHandle {
    pub fn start(config: RumConfig) -> Self {
        let (sender, receiver) = mpsc::channel(config.exporter.max_queue_size);
        tokio::spawn(run_worker(config, receiver));
        Self { sender }
    }

    pub fn try_enqueue(&self, event: RumEvent) -> Result<()> {
        self.sender
            .try_send(ExporterCommand::Event(Box::new(event)))
            .map_err(|error| match error {
                TrySendError::Full(_) => RumError::QueueFull,
                TrySendError::Closed(_) => RumError::ChannelClosed,
            })
    }

    pub async fn flush(&self) -> Result<()> {
        let (sender, receiver) = oneshot::channel();
        self.sender
            .send(ExporterCommand::Flush(sender))
            .await
            .map_err(|_| RumError::ChannelClosed)?;
        receiver.await.map_err(|_| RumError::ChannelClosed)
    }

    pub async fn shutdown(&self) -> Result<()> {
        let (sender, receiver) = oneshot::channel();
        self.sender
            .send(ExporterCommand::Shutdown(sender))
            .await
            .map_err(|_| RumError::ChannelClosed)?;
        receiver.await.map_err(|_| RumError::ChannelClosed)
    }
}

async fn run_worker(config: RumConfig, mut receiver: mpsc::Receiver<ExporterCommand>) {
    let mut ticker = interval(config.exporter.flush_interval);
    let mut buffer = Vec::new();
    loop {
        tokio::select! {
            _ = ticker.tick() => {
                flush_buffer(&config, &mut buffer).await;
            }
            command = receiver.recv() => {
                match command {
                    Some(ExporterCommand::Event(event)) => {
                        buffer.push(*event);
                        if buffer.len() >= config.exporter.max_batch_size {
                            flush_buffer(&config, &mut buffer).await;
                        }
                    }
                    Some(ExporterCommand::Flush(done)) => {
                        flush_buffer(&config, &mut buffer).await;
                        let _ = done.send(());
                    }
                    Some(ExporterCommand::Shutdown(done)) => {
                        flush_buffer(&config, &mut buffer).await;
                        let _ = done.send(());
                        break;
                    }
                    None => {
                        flush_buffer(&config, &mut buffer).await;
                        break;
                    }
                }
            }
        }
    }
}

async fn flush_buffer(config: &RumConfig, buffer: &mut Vec<RumEvent>) {
    if buffer.is_empty() {
        return;
    }
    let events = std::mem::take(buffer);
    let payloads = build_payloads(config, events);
    for payload in payloads {
        post_with_retry(config, payload).await;
    }
}

async fn post_with_retry(config: &RumConfig, payload: serde_json::Value) {
    let client = match reqwest::Client::builder()
        .timeout(config.exporter.request_timeout)
        .build()
    {
        Ok(client) => client,
        Err(_) => return,
    };
    let body = payload.to_string();
    let print_payload = should_print_payload();
    if print_payload {
        println!("raw rum payload sent to collector:");
        println!("{body}");
    }
    let mut delay = Duration::from_millis(200);
    for attempt in 0..3 {
        let result = client
            .post(config.config_address().clone())
            .header("content-type", "application/json")
            .header(
                "user-agent",
                format!("alibabacloud_rum_rust/{}", crate::config::SDK_VERSION),
            )
            .body(body.clone())
            .send()
            .await;
        match result {
            Ok(response) => {
                let status = response.status();
                if print_payload {
                    println!("rum payload post attempt {} status: {status}", attempt + 1);
                    match response.text().await {
                        Ok(text) if !text.is_empty() => {
                            println!("rum payload response body:");
                            println!("{text}");
                        }
                        Ok(_) => {}
                        Err(error) => {
                            println!("failed to read rum payload response body: {error}");
                        }
                    }
                }
                if status.is_success() {
                    return;
                }
            }
            Err(error) => {
                if print_payload {
                    println!("rum payload post attempt {} error: {error}", attempt + 1);
                }
            }
        }
        if attempt < 2 {
            sleep(delay).await;
            delay *= 2;
        }
    }
}

fn should_print_payload() -> bool {
    std::env::var("ALIBABACLOUD_RUM_PRINT_PAYLOAD")
        .map(|value| {
            matches!(
                value.to_ascii_lowercase().as_str(),
                "1" | "true" | "yes" | "on"
            )
        })
        .unwrap_or(false)
}