#[cfg(test)]
use super::queue::AnalyticsEvent;
use super::queue::{clear_queue, pending_count, read_pending_events};
use std::sync::OnceLock;
use std::time::Duration;
const ANALYTICS_ENDPOINT: &str = "https://memvid.com/api/analytics/ingest";
#[cfg(debug_assertions)]
const DEV_ANALYTICS_ENDPOINT: &str = "http://localhost:3001/api/analytics/ingest";
const MAX_BATCH_SIZE: usize = 100;
const FLUSH_INTERVAL_SECS: u64 = 30;
static LAST_FLUSH: OnceLock<std::sync::Mutex<std::time::Instant>> = OnceLock::new();
fn get_endpoint() -> String {
if let Ok(url) = std::env::var("MEMVID_ANALYTICS_URL") {
return url;
}
if let Ok(dashboard_url) = std::env::var("MEMVID_DASHBOARD_URL") {
return format!(
"{}/api/analytics/ingest",
dashboard_url.trim_end_matches('/')
);
}
if let Ok(config) = crate::commands::config::PersistentConfig::load() {
if let Some(dashboard_url) = config.dashboard_url {
return format!(
"{}/api/analytics/ingest",
dashboard_url.trim_end_matches('/')
);
}
}
#[cfg(debug_assertions)]
if std::env::var("MEMVID_DEV").is_ok() {
return DEV_ANALYTICS_ENDPOINT.to_string();
}
ANALYTICS_ENDPOINT.to_string()
}
pub fn start_background_flush() {
LAST_FLUSH.get_or_init(|| std::sync::Mutex::new(std::time::Instant::now()));
}
pub fn force_flush_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
do_flush_sync(true)
}
fn flush_analytics_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
do_flush_sync(false)
}
fn do_flush_sync(force: bool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if !force {
if let Some(last) = LAST_FLUSH.get() {
if let Ok(last_time) = last.lock() {
if last_time.elapsed() < Duration::from_secs(FLUSH_INTERVAL_SECS) {
return Ok(()); }
}
}
}
let count = pending_count();
if count == 0 {
return Ok(());
}
let events = read_pending_events();
if events.is_empty() {
return Ok(());
}
let endpoint = get_endpoint();
let client = reqwest::blocking::Client::builder()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(10))
.build()
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
for chunk in events.chunks(MAX_BATCH_SIZE) {
let payload = serde_json::json!({
"events": chunk
});
match client
.post(&endpoint)
.header("Content-Type", "application/json")
.json(&payload)
.send()
{
Ok(response) => {
if response.status().is_success() {
#[cfg(debug_assertions)]
eprintln!("[analytics] Flushed {} events", chunk.len());
} else {
#[cfg(debug_assertions)]
eprintln!("[analytics] Server returned {}", response.status());
}
}
Err(e) => {
#[cfg(debug_assertions)]
eprintln!("[analytics] Flush error: {}", e);
if let Some(last) = LAST_FLUSH.get() {
if let Ok(mut last_time) = last.lock() {
*last_time = std::time::Instant::now();
}
}
return Err(Box::new(e));
}
}
}
clear_queue();
if let Some(last) = LAST_FLUSH.get() {
if let Ok(mut last_time) = last.lock() {
*last_time = std::time::Instant::now();
}
}
Ok(())
}
pub fn flush_analytics() {
std::thread::spawn(|| {
let _ = flush_analytics_sync();
});
}
#[cfg(test)]
#[allow(dead_code)]
pub fn send_events_direct(events: Vec<AnalyticsEvent>) -> Result<(), Box<dyn std::error::Error>> {
let endpoint = get_endpoint();
let payload = serde_json::json!({ "events": events });
reqwest::blocking::Client::new()
.post(&endpoint)
.header("Content-Type", "application/json")
.json(&payload)
.send()?;
Ok(())
}