spectracost 0.1.0

AI cost observability SDK - see the full spectrum of your AI spend
Documentation
//! Background transport for UsageEvent batches.

use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use reqwest::Client as HttpClient;
use tokio::sync::mpsc;
use tokio::time::interval;

use crate::UsageEvent;

const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
const MAX_BATCH_SIZE: usize = 100;

/// Background queue + spawner. Owned by [`crate::Spectracost`].
#[derive(Debug)]
pub(crate) struct Transport {
    tx: Mutex<Option<mpsc::UnboundedSender<UsageEvent>>>,
    #[allow(dead_code)]
    worker: Arc<tokio::task::JoinHandle<()>>,
}

impl Transport {
    pub(crate) fn new(http: HttpClient, endpoint: String, api_key: String) -> Self {
        let (tx, mut rx) = mpsc::unbounded_channel::<UsageEvent>();
        let worker = tokio::spawn(async move {
            let url = format!("{}/v1/events", endpoint.trim_end_matches('/'));
            let mut batch: Vec<UsageEvent> = Vec::with_capacity(MAX_BATCH_SIZE);
            let mut ticker = interval(FLUSH_INTERVAL);
            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

            loop {
                tokio::select! {
                    maybe_event = rx.recv() => {
                        match maybe_event {
                            Some(event) => {
                                batch.push(event);
                                if batch.len() >= MAX_BATCH_SIZE {
                                    send_batch(&http, &url, &api_key, &mut batch).await;
                                }
                            }
                            None => {
                                // Sender dropped; flush remainder and exit.
                                send_batch(&http, &url, &api_key, &mut batch).await;
                                return;
                            }
                        }
                    }
                    _ = ticker.tick() => {
                        if !batch.is_empty() {
                            send_batch(&http, &url, &api_key, &mut batch).await;
                        }
                    }
                }
            }
        });
        Self {
            tx: Mutex::new(Some(tx)),
            worker: Arc::new(worker),
        }
    }

    /// Enqueue an event. Never blocks the caller beyond a channel send.
    pub(crate) fn enqueue(&self, event: UsageEvent) {
        if let Ok(guard) = self.tx.lock() {
            if let Some(tx) = guard.as_ref() {
                let _ = tx.send(event);
            }
        }
    }
}

async fn send_batch(
    http: &HttpClient,
    url: &str,
    api_key: &str,
    batch: &mut Vec<UsageEvent>,
) {
    if batch.is_empty() {
        return;
    }
    let payload = std::mem::take(batch);
    let res = http
        .post(url)
        .bearer_auth(api_key)
        .header("User-Agent", concat!("spectracost-sdk-rust/", env!("CARGO_PKG_VERSION")))
        .json(&payload)
        .send()
        .await;
    if let Err(err) = res {
        // Swallow — telemetry must never affect caller. A future
        // enhancement could retry with backoff.
        eprintln!("spectracost: failed to post batch: {err}");
    }
}