memvid_cli/analytics/
flush.rs

1//! Background flush for analytics events
2//!
3//! Sends queued events to the server in batches.
4//! Runs asynchronously to not block CLI commands.
5
6use super::queue::{clear_queue, pending_count, read_pending_events};
7#[cfg(test)]
8use super::queue::AnalyticsEvent;
9use std::sync::OnceLock;
10use std::time::Duration;
11
12/// Analytics API endpoint
13const ANALYTICS_ENDPOINT: &str = "https://memvid.com/api/analytics/ingest";
14
15/// Development endpoint (for testing)
16#[cfg(debug_assertions)]
17const DEV_ANALYTICS_ENDPOINT: &str = "http://localhost:3001/api/analytics/ingest";
18
19/// Maximum events per batch
20const MAX_BATCH_SIZE: usize = 100;
21
22/// Minimum interval between flush attempts (seconds)
23const FLUSH_INTERVAL_SECS: u64 = 30;
24
25/// Last flush timestamp
26static LAST_FLUSH: OnceLock<std::sync::Mutex<std::time::Instant>> = OnceLock::new();
27
28/// Get the analytics endpoint URL
29fn get_endpoint() -> String {
30    // Check for explicit analytics endpoint override
31    if let Ok(url) = std::env::var("MEMVID_ANALYTICS_URL") {
32        return url;
33    }
34
35    // Derive from dashboard URL if set (env var first, then config file)
36    if let Ok(dashboard_url) = std::env::var("MEMVID_DASHBOARD_URL") {
37        return format!("{}/api/analytics/ingest", dashboard_url.trim_end_matches('/'));
38    }
39
40    // Check config file for dashboard_url
41    if let Ok(config) = crate::commands::config::PersistentConfig::load() {
42        if let Some(dashboard_url) = config.dashboard_url {
43            return format!("{}/api/analytics/ingest", dashboard_url.trim_end_matches('/'));
44        }
45    }
46
47    // Use dev endpoint in debug builds if MEMVID_DEV is set
48    #[cfg(debug_assertions)]
49    if std::env::var("MEMVID_DEV").is_ok() {
50        return DEV_ANALYTICS_ENDPOINT.to_string();
51    }
52
53    ANALYTICS_ENDPOINT.to_string()
54}
55
56/// Initialize analytics (no-op, kept for API compatibility)
57pub fn start_background_flush() {
58    // Initialize last flush time
59    LAST_FLUSH.get_or_init(|| std::sync::Mutex::new(std::time::Instant::now()));
60}
61
62/// Force flush analytics synchronously (ignores interval check)
63/// Call this at the end of main() to ensure events are sent
64pub fn force_flush_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
65    do_flush_sync(true)
66}
67
68/// Flush analytics synchronously (for background thread)
69fn flush_analytics_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
70    do_flush_sync(false)
71}
72
73/// Internal flush implementation
74fn do_flush_sync(force: bool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
75    // Check if enough time has passed (skip if force=true)
76    if !force {
77        if let Some(last) = LAST_FLUSH.get() {
78            if let Ok(last_time) = last.lock() {
79                if last_time.elapsed() < Duration::from_secs(FLUSH_INTERVAL_SECS) {
80                    return Ok(()); // Too soon
81                }
82            }
83        }
84    }
85
86    // Check if there are events to flush
87    let count = pending_count();
88    if count == 0 {
89        return Ok(());
90    }
91
92    // Read events
93    let events = read_pending_events();
94    if events.is_empty() {
95        return Ok(());
96    }
97
98    // Send in batches using reqwest blocking client
99    let endpoint = get_endpoint();
100    let client = reqwest::blocking::Client::builder()
101        .connect_timeout(Duration::from_secs(5))
102        .timeout(Duration::from_secs(10))
103        .build()
104        .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
105
106    for chunk in events.chunks(MAX_BATCH_SIZE) {
107        let payload = serde_json::json!({
108            "events": chunk
109        });
110
111        match client
112            .post(&endpoint)
113            .header("Content-Type", "application/json")
114            .json(&payload)
115            .send()
116        {
117            Ok(response) => {
118                if response.status().is_success() {
119                    #[cfg(debug_assertions)]
120                    eprintln!("[analytics] Flushed {} events", chunk.len());
121                } else {
122                    #[cfg(debug_assertions)]
123                    eprintln!("[analytics] Server returned {}", response.status());
124                }
125            }
126            Err(e) => {
127                // Log but don't fail - we'll retry next time
128                #[cfg(debug_assertions)]
129                eprintln!("[analytics] Flush error: {}", e);
130
131                // Update last flush time even on error to prevent hammering
132                if let Some(last) = LAST_FLUSH.get() {
133                    if let Ok(mut last_time) = last.lock() {
134                        *last_time = std::time::Instant::now();
135                    }
136                }
137
138                return Err(Box::new(e));
139            }
140        }
141    }
142
143    // Clear queue on success
144    clear_queue();
145
146    // Update last flush time
147    if let Some(last) = LAST_FLUSH.get() {
148        if let Ok(mut last_time) = last.lock() {
149            *last_time = std::time::Instant::now();
150        }
151    }
152
153    Ok(())
154}
155
156/// Public function to trigger a flush (async spawn)
157pub fn flush_analytics() {
158    std::thread::spawn(|| {
159        let _ = flush_analytics_sync();
160    });
161}
162
163/// Send events directly without queuing (for testing)
164#[cfg(test)]
165#[allow(dead_code)]
166pub fn send_events_direct(events: Vec<AnalyticsEvent>) -> Result<(), Box<dyn std::error::Error>> {
167    let endpoint = get_endpoint();
168    let payload = serde_json::json!({ "events": events });
169
170    reqwest::blocking::Client::new()
171        .post(&endpoint)
172        .header("Content-Type", "application/json")
173        .json(&payload)
174        .send()?;
175
176    Ok(())
177}