use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use reqwest::Client as HttpClient;
use tokio::sync::mpsc;
use tokio::time::interval;
use crate::UsageEvent;
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
const MAX_BATCH_SIZE: usize = 100;
#[derive(Debug)]
pub(crate) struct Transport {
tx: Mutex<Option<mpsc::UnboundedSender<UsageEvent>>>,
#[allow(dead_code)]
worker: Arc<tokio::task::JoinHandle<()>>,
}
impl Transport {
pub(crate) fn new(http: HttpClient, endpoint: String, api_key: String) -> Self {
let (tx, mut rx) = mpsc::unbounded_channel::<UsageEvent>();
let worker = tokio::spawn(async move {
let url = format!("{}/v1/events", endpoint.trim_end_matches('/'));
let mut batch: Vec<UsageEvent> = Vec::with_capacity(MAX_BATCH_SIZE);
let mut ticker = interval(FLUSH_INTERVAL);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
maybe_event = rx.recv() => {
match maybe_event {
Some(event) => {
batch.push(event);
if batch.len() >= MAX_BATCH_SIZE {
send_batch(&http, &url, &api_key, &mut batch).await;
}
}
None => {
send_batch(&http, &url, &api_key, &mut batch).await;
return;
}
}
}
_ = ticker.tick() => {
if !batch.is_empty() {
send_batch(&http, &url, &api_key, &mut batch).await;
}
}
}
}
});
Self {
tx: Mutex::new(Some(tx)),
worker: Arc::new(worker),
}
}
pub(crate) fn enqueue(&self, event: UsageEvent) {
if let Ok(guard) = self.tx.lock() {
if let Some(tx) = guard.as_ref() {
let _ = tx.send(event);
}
}
}
}
async fn send_batch(
http: &HttpClient,
url: &str,
api_key: &str,
batch: &mut Vec<UsageEvent>,
) {
if batch.is_empty() {
return;
}
let payload = std::mem::take(batch);
let res = http
.post(url)
.bearer_auth(api_key)
.header("User-Agent", concat!("spectracost-sdk-rust/", env!("CARGO_PKG_VERSION")))
.json(&payload)
.send()
.await;
if let Err(err) = res {
eprintln!("spectracost: failed to post batch: {err}");
}
}