use crate::pillars::Pillar;
use chrono::Utc;
use once_cell::sync::Lazy;
use serde_json::Value;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
#[allow(clippy::type_complexity)]
static ANALYTICS_DB: Lazy<Arc<RwLock<Option<Arc<dyn PillarUsageRecorder>>>>> =
Lazy::new(|| Arc::new(RwLock::new(None)));
static FAILED_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
static LAST_FAILURE_WARN_AT: Lazy<RwLock<Instant>> = Lazy::new(|| RwLock::new(Instant::now()));
const FAILURE_WARN_INTERVAL: Duration = Duration::from_secs(60);
static IN_FLIGHT_RECORDS: AtomicU64 = AtomicU64::new(0);
const IN_FLIGHT_LIMIT: u64 = 20;
#[async_trait::async_trait]
pub trait PillarUsageRecorder: Send + Sync {
async fn record(
&self,
event: PillarUsageEvent,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
#[derive(Debug, Clone)]
pub struct PillarUsageEvent {
pub workspace_id: Option<String>,
pub org_id: Option<String>,
pub pillar: Pillar,
pub metric_name: String,
pub metric_value: Value,
pub timestamp: chrono::DateTime<Utc>,
}
pub async fn init(recorder: Arc<dyn PillarUsageRecorder>) {
let mut db = ANALYTICS_DB.write().await;
*db = Some(recorder);
}
pub async fn record_reality_usage(
workspace_id: Option<String>,
org_id: Option<String>,
metric_name: &str,
metric_value: Value,
) {
record_pillar_usage(workspace_id, org_id, Pillar::Reality, metric_name, metric_value).await;
}
pub async fn record_contracts_usage(
workspace_id: Option<String>,
org_id: Option<String>,
metric_name: &str,
metric_value: Value,
) {
record_pillar_usage(workspace_id, org_id, Pillar::Contracts, metric_name, metric_value).await;
}
pub async fn record_devx_usage(
workspace_id: Option<String>,
org_id: Option<String>,
metric_name: &str,
metric_value: Value,
) {
record_pillar_usage(workspace_id, org_id, Pillar::DevX, metric_name, metric_value).await;
}
pub async fn record_cloud_usage(
workspace_id: Option<String>,
org_id: Option<String>,
metric_name: &str,
metric_value: Value,
) {
record_pillar_usage(workspace_id, org_id, Pillar::Cloud, metric_name, metric_value).await;
}
pub async fn record_ai_usage(
workspace_id: Option<String>,
org_id: Option<String>,
metric_name: &str,
metric_value: Value,
) {
record_pillar_usage(workspace_id, org_id, Pillar::Ai, metric_name, metric_value).await;
}
async fn record_pillar_usage(
workspace_id: Option<String>,
org_id: Option<String>,
pillar: Pillar,
metric_name: &str,
metric_value: Value,
) {
let db = ANALYTICS_DB.read().await;
if let Some(recorder) = db.as_ref() {
let event = PillarUsageEvent {
workspace_id,
org_id,
pillar,
metric_name: metric_name.to_string(),
metric_value,
timestamp: Utc::now(),
};
let current = IN_FLIGHT_RECORDS.load(Ordering::Relaxed);
if current >= IN_FLIGHT_LIMIT {
FAILED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
maybe_flush_dropped_warning().await;
return;
}
IN_FLIGHT_RECORDS.fetch_add(1, Ordering::Relaxed);
let recorder = recorder.clone();
tokio::spawn(async move {
let result = recorder.record(event).await;
IN_FLIGHT_RECORDS.fetch_sub(1, Ordering::Relaxed);
if let Err(e) = result {
tracing::debug!("Failed to record pillar usage event: {}", e);
FAILED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
maybe_flush_dropped_warning().await;
}
});
}
}
async fn maybe_flush_dropped_warning() {
let last = *LAST_FAILURE_WARN_AT.read().await;
if last.elapsed() < FAILURE_WARN_INTERVAL {
return;
}
let mut last_w = LAST_FAILURE_WARN_AT.write().await;
if last_w.elapsed() < FAILURE_WARN_INTERVAL {
return;
}
let dropped = FAILED_EVENT_COUNT.swap(0, Ordering::Relaxed);
if dropped > 0 {
tracing::warn!(
dropped_events = dropped,
interval_secs = FAILURE_WARN_INTERVAL.as_secs(),
"pillar_tracking: dropped events in the last {}s due to analytics-DB pressure \
(analytics is best-effort; bench / serve behaviour is unaffected)",
FAILURE_WARN_INTERVAL.as_secs(),
);
}
*last_w = Instant::now();
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
struct TestRecorder {
events: Arc<RwLock<Vec<PillarUsageEvent>>>,
}
#[async_trait::async_trait]
impl PillarUsageRecorder for TestRecorder {
async fn record(
&self,
event: PillarUsageEvent,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut events = self.events.write().await;
events.push(event);
Ok(())
}
}
#[tokio::test]
async fn test_record_reality_usage() {
let events = Arc::new(RwLock::new(Vec::new()));
let recorder = Arc::new(TestRecorder {
events: events.clone(),
});
init(recorder).await;
record_reality_usage(
Some("workspace-1".to_string()),
None,
"blended_reality_ratio",
json!({"ratio": 0.5}),
)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let recorded = events.read().await;
assert_eq!(recorded.len(), 1);
assert_eq!(recorded[0].pillar, Pillar::Reality);
assert_eq!(recorded[0].metric_name, "blended_reality_ratio");
}
}