use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{interval, sleep, Duration};
use crate::config::RumConfig;
use crate::error::{Result, RumError};
use crate::event::RumEvent;
use crate::exporter::payload::build_payloads;
pub(crate) enum ExporterCommand {
Event(Box<RumEvent>),
Flush(oneshot::Sender<()>),
Shutdown(oneshot::Sender<()>),
}
#[derive(Clone)]
pub(crate) struct ExporterHandle {
sender: mpsc::Sender<ExporterCommand>,
}
impl ExporterHandle {
pub fn start(config: RumConfig) -> Self {
let (sender, receiver) = mpsc::channel(config.exporter.max_queue_size);
tokio::spawn(run_worker(config, receiver));
Self { sender }
}
pub fn try_enqueue(&self, event: RumEvent) -> Result<()> {
self.sender
.try_send(ExporterCommand::Event(Box::new(event)))
.map_err(|error| match error {
TrySendError::Full(_) => RumError::QueueFull,
TrySendError::Closed(_) => RumError::ChannelClosed,
})
}
pub async fn flush(&self) -> Result<()> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(ExporterCommand::Flush(sender))
.await
.map_err(|_| RumError::ChannelClosed)?;
receiver.await.map_err(|_| RumError::ChannelClosed)
}
pub async fn shutdown(&self) -> Result<()> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(ExporterCommand::Shutdown(sender))
.await
.map_err(|_| RumError::ChannelClosed)?;
receiver.await.map_err(|_| RumError::ChannelClosed)
}
}
async fn run_worker(config: RumConfig, mut receiver: mpsc::Receiver<ExporterCommand>) {
let mut ticker = interval(config.exporter.flush_interval);
let mut buffer = Vec::new();
loop {
tokio::select! {
_ = ticker.tick() => {
flush_buffer(&config, &mut buffer).await;
}
command = receiver.recv() => {
match command {
Some(ExporterCommand::Event(event)) => {
buffer.push(*event);
if buffer.len() >= config.exporter.max_batch_size {
flush_buffer(&config, &mut buffer).await;
}
}
Some(ExporterCommand::Flush(done)) => {
flush_buffer(&config, &mut buffer).await;
let _ = done.send(());
}
Some(ExporterCommand::Shutdown(done)) => {
flush_buffer(&config, &mut buffer).await;
let _ = done.send(());
break;
}
None => {
flush_buffer(&config, &mut buffer).await;
break;
}
}
}
}
}
}
async fn flush_buffer(config: &RumConfig, buffer: &mut Vec<RumEvent>) {
if buffer.is_empty() {
return;
}
let events = std::mem::take(buffer);
let payloads = build_payloads(config, events);
for payload in payloads {
post_with_retry(config, payload).await;
}
}
async fn post_with_retry(config: &RumConfig, payload: serde_json::Value) {
let client = match reqwest::Client::builder()
.timeout(config.exporter.request_timeout)
.build()
{
Ok(client) => client,
Err(_) => return,
};
let body = payload.to_string();
let print_payload = should_print_payload();
if print_payload {
println!("raw rum payload sent to collector:");
println!("{body}");
}
let mut delay = Duration::from_millis(200);
for attempt in 0..3 {
let result = client
.post(config.config_address().clone())
.header("content-type", "application/json")
.header(
"user-agent",
format!("alibabacloud_rum_rust/{}", crate::config::SDK_VERSION),
)
.body(body.clone())
.send()
.await;
match result {
Ok(response) => {
let status = response.status();
if print_payload {
println!("rum payload post attempt {} status: {status}", attempt + 1);
match response.text().await {
Ok(text) if !text.is_empty() => {
println!("rum payload response body:");
println!("{text}");
}
Ok(_) => {}
Err(error) => {
println!("failed to read rum payload response body: {error}");
}
}
}
if status.is_success() {
return;
}
}
Err(error) => {
if print_payload {
println!("rum payload post attempt {} error: {error}", attempt + 1);
}
}
}
if attempt < 2 {
sleep(delay).await;
delay *= 2;
}
}
}
fn should_print_payload() -> bool {
std::env::var("ALIBABACLOUD_RUM_PRINT_PAYLOAD")
.map(|value| {
matches!(
value.to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
)
})
.unwrap_or(false)
}