memvid_cli/analytics/
flush.rs

1//! Background flush for analytics events
2//!
3//! Sends queued events to the server in batches.
4//! Runs asynchronously to not block CLI commands.
5
6use super::queue::{clear_queue, pending_count, read_pending_events, AnalyticsEvent};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::OnceLock;
9use std::time::Duration;
10
11/// Analytics API endpoint
12const ANALYTICS_ENDPOINT: &str = "https://memvid.com/api/analytics/ingest";
13
14/// Development endpoint (for testing)
15#[cfg(debug_assertions)]
16const DEV_ANALYTICS_ENDPOINT: &str = "http://localhost:3001/api/analytics/ingest";
17
18/// Maximum events per batch
19const MAX_BATCH_SIZE: usize = 100;
20
21/// Minimum interval between flush attempts (seconds)
22const FLUSH_INTERVAL_SECS: u64 = 30;
23
24/// Flag to track if background thread is started
25static FLUSH_STARTED: AtomicBool = AtomicBool::new(false);
26
27/// Last flush timestamp
28static LAST_FLUSH: OnceLock<std::sync::Mutex<std::time::Instant>> = OnceLock::new();
29
30/// Get the analytics endpoint URL
31fn get_endpoint() -> String {
32    // Check for explicit analytics endpoint override
33    if let Ok(url) = std::env::var("MEMVID_ANALYTICS_URL") {
34        return url;
35    }
36
37    // Derive from dashboard URL if set (for local dev)
38    if let Ok(dashboard_url) = std::env::var("MEMVID_DASHBOARD_URL") {
39        return format!("{}/api/analytics/ingest", dashboard_url.trim_end_matches('/'));
40    }
41
42    // Use dev endpoint in debug builds if MEMVID_DEV is set
43    #[cfg(debug_assertions)]
44    if std::env::var("MEMVID_DEV").is_ok() {
45        return DEV_ANALYTICS_ENDPOINT.to_string();
46    }
47
48    ANALYTICS_ENDPOINT.to_string()
49}
50
51/// Initialize analytics (no-op, kept for API compatibility)
52pub fn start_background_flush() {
53    // Initialize last flush time
54    LAST_FLUSH.get_or_init(|| std::sync::Mutex::new(std::time::Instant::now()));
55}
56
57/// Force flush analytics synchronously (ignores interval check)
58/// Call this at the end of main() to ensure events are sent
59pub fn force_flush_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
60    do_flush_sync(true)
61}
62
63/// Flush analytics synchronously (for background thread)
64fn flush_analytics_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
65    do_flush_sync(false)
66}
67
68/// Internal flush implementation
69fn do_flush_sync(force: bool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
70    // Check if enough time has passed (skip if force=true)
71    if !force {
72        if let Some(last) = LAST_FLUSH.get() {
73            if let Ok(last_time) = last.lock() {
74                if last_time.elapsed() < Duration::from_secs(FLUSH_INTERVAL_SECS) {
75                    return Ok(()); // Too soon
76                }
77            }
78        }
79    }
80
81    // Check if there are events to flush
82    let count = pending_count();
83    if count == 0 {
84        return Ok(());
85    }
86
87    // Read events
88    let events = read_pending_events();
89    if events.is_empty() {
90        return Ok(());
91    }
92
93    // Send in batches using reqwest blocking client
94    let endpoint = get_endpoint();
95    let client = reqwest::blocking::Client::builder()
96        .connect_timeout(Duration::from_secs(5))
97        .timeout(Duration::from_secs(10))
98        .build()
99        .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
100
101    for chunk in events.chunks(MAX_BATCH_SIZE) {
102        let payload = serde_json::json!({
103            "events": chunk
104        });
105
106        match client
107            .post(&endpoint)
108            .header("Content-Type", "application/json")
109            .json(&payload)
110            .send()
111        {
112            Ok(response) => {
113                if response.status().is_success() {
114                    #[cfg(debug_assertions)]
115                    eprintln!("[analytics] Flushed {} events", chunk.len());
116                } else {
117                    #[cfg(debug_assertions)]
118                    eprintln!("[analytics] Server returned {}", response.status());
119                }
120            }
121            Err(e) => {
122                // Log but don't fail - we'll retry next time
123                #[cfg(debug_assertions)]
124                eprintln!("[analytics] Flush error: {}", e);
125
126                // Update last flush time even on error to prevent hammering
127                if let Some(last) = LAST_FLUSH.get() {
128                    if let Ok(mut last_time) = last.lock() {
129                        *last_time = std::time::Instant::now();
130                    }
131                }
132
133                return Err(Box::new(e));
134            }
135        }
136    }
137
138    // Clear queue on success
139    clear_queue();
140
141    // Update last flush time
142    if let Some(last) = LAST_FLUSH.get() {
143        if let Ok(mut last_time) = last.lock() {
144            *last_time = std::time::Instant::now();
145        }
146    }
147
148    Ok(())
149}
150
151/// Public function to trigger a flush (async spawn)
152pub fn flush_analytics() {
153    std::thread::spawn(|| {
154        let _ = flush_analytics_sync();
155    });
156}
157
158/// Send events directly without queuing (for testing)
159#[cfg(test)]
160pub fn send_events_direct(events: Vec<AnalyticsEvent>) -> Result<(), Box<dyn std::error::Error>> {
161    let endpoint = get_endpoint();
162    let payload = serde_json::json!({ "events": events });
163
164    reqwest::blocking::Client::new()
165        .post(&endpoint)
166        .header("Content-Type", "application/json")
167        .json(&payload)
168        .send()?;
169
170    Ok(())
171}