memvid_cli/analytics/
flush.rs

1//! Background flush for analytics events
2//!
3//! Sends queued events to the server in batches.
4//! Runs asynchronously to not block CLI commands.
5
6use super::queue::{clear_queue, pending_count, read_pending_events};
7#[cfg(test)]
8use super::queue::AnalyticsEvent;
9use std::sync::OnceLock;
10use std::time::Duration;
11
12/// Analytics API endpoint
13const ANALYTICS_ENDPOINT: &str = "https://memvid.com/api/analytics/ingest";
14
15/// Development endpoint (for testing)
16#[cfg(debug_assertions)]
17const DEV_ANALYTICS_ENDPOINT: &str = "http://localhost:3001/api/analytics/ingest";
18
19/// Maximum events per batch
20const MAX_BATCH_SIZE: usize = 100;
21
22/// Minimum interval between flush attempts (seconds)
23const FLUSH_INTERVAL_SECS: u64 = 30;
24
25/// Last flush timestamp
26static LAST_FLUSH: OnceLock<std::sync::Mutex<std::time::Instant>> = OnceLock::new();
27
28/// Get the analytics endpoint URL
29fn get_endpoint() -> String {
30    // Check for explicit analytics endpoint override
31    if let Ok(url) = std::env::var("MEMVID_ANALYTICS_URL") {
32        return url;
33    }
34
35    // Derive from dashboard URL if set (for local dev)
36    if let Ok(dashboard_url) = std::env::var("MEMVID_DASHBOARD_URL") {
37        return format!("{}/api/analytics/ingest", dashboard_url.trim_end_matches('/'));
38    }
39
40    // Use dev endpoint in debug builds if MEMVID_DEV is set
41    #[cfg(debug_assertions)]
42    if std::env::var("MEMVID_DEV").is_ok() {
43        return DEV_ANALYTICS_ENDPOINT.to_string();
44    }
45
46    ANALYTICS_ENDPOINT.to_string()
47}
48
49/// Initialize analytics (no-op, kept for API compatibility)
50pub fn start_background_flush() {
51    // Initialize last flush time
52    LAST_FLUSH.get_or_init(|| std::sync::Mutex::new(std::time::Instant::now()));
53}
54
55/// Force flush analytics synchronously (ignores interval check)
56/// Call this at the end of main() to ensure events are sent
57pub fn force_flush_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
58    do_flush_sync(true)
59}
60
61/// Flush analytics synchronously (for background thread)
62fn flush_analytics_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
63    do_flush_sync(false)
64}
65
66/// Internal flush implementation
67fn do_flush_sync(force: bool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
68    // Check if enough time has passed (skip if force=true)
69    if !force {
70        if let Some(last) = LAST_FLUSH.get() {
71            if let Ok(last_time) = last.lock() {
72                if last_time.elapsed() < Duration::from_secs(FLUSH_INTERVAL_SECS) {
73                    return Ok(()); // Too soon
74                }
75            }
76        }
77    }
78
79    // Check if there are events to flush
80    let count = pending_count();
81    if count == 0 {
82        return Ok(());
83    }
84
85    // Read events
86    let events = read_pending_events();
87    if events.is_empty() {
88        return Ok(());
89    }
90
91    // Send in batches using reqwest blocking client
92    let endpoint = get_endpoint();
93    let client = reqwest::blocking::Client::builder()
94        .connect_timeout(Duration::from_secs(5))
95        .timeout(Duration::from_secs(10))
96        .build()
97        .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
98
99    for chunk in events.chunks(MAX_BATCH_SIZE) {
100        let payload = serde_json::json!({
101            "events": chunk
102        });
103
104        match client
105            .post(&endpoint)
106            .header("Content-Type", "application/json")
107            .json(&payload)
108            .send()
109        {
110            Ok(response) => {
111                if response.status().is_success() {
112                    #[cfg(debug_assertions)]
113                    eprintln!("[analytics] Flushed {} events", chunk.len());
114                } else {
115                    #[cfg(debug_assertions)]
116                    eprintln!("[analytics] Server returned {}", response.status());
117                }
118            }
119            Err(e) => {
120                // Log but don't fail - we'll retry next time
121                #[cfg(debug_assertions)]
122                eprintln!("[analytics] Flush error: {}", e);
123
124                // Update last flush time even on error to prevent hammering
125                if let Some(last) = LAST_FLUSH.get() {
126                    if let Ok(mut last_time) = last.lock() {
127                        *last_time = std::time::Instant::now();
128                    }
129                }
130
131                return Err(Box::new(e));
132            }
133        }
134    }
135
136    // Clear queue on success
137    clear_queue();
138
139    // Update last flush time
140    if let Some(last) = LAST_FLUSH.get() {
141        if let Ok(mut last_time) = last.lock() {
142            *last_time = std::time::Instant::now();
143        }
144    }
145
146    Ok(())
147}
148
149/// Public function to trigger a flush (async spawn)
150pub fn flush_analytics() {
151    std::thread::spawn(|| {
152        let _ = flush_analytics_sync();
153    });
154}
155
156/// Send events directly without queuing (for testing)
157#[cfg(test)]
158#[allow(dead_code)]
159pub fn send_events_direct(events: Vec<AnalyticsEvent>) -> Result<(), Box<dyn std::error::Error>> {
160    let endpoint = get_endpoint();
161    let payload = serde_json::json!({ "events": events });
162
163    reqwest::blocking::Client::new()
164        .post(&endpoint)
165        .header("Content-Type", "application/json")
166        .json(&payload)
167        .send()?;
168
169    Ok(())
170}