memvid_cli/analytics/
flush.rs

1//! Background flush for analytics events
2//!
3//! Sends queued events to the server in batches.
4//! Runs asynchronously to not block CLI commands.
5
6#[cfg(test)]
7use super::queue::AnalyticsEvent;
8use super::queue::{clear_queue, pending_count, read_pending_events};
9use std::sync::OnceLock;
10use std::time::Duration;
11
12/// Analytics API endpoint
13const ANALYTICS_ENDPOINT: &str = "https://memvid.com/api/analytics/ingest";
14
15/// Development endpoint (for testing)
16#[cfg(debug_assertions)]
17const DEV_ANALYTICS_ENDPOINT: &str = "http://localhost:3001/api/analytics/ingest";
18
19/// Maximum events per batch
20const MAX_BATCH_SIZE: usize = 100;
21
22/// Minimum interval between flush attempts (seconds)
23const FLUSH_INTERVAL_SECS: u64 = 30;
24
25/// Last flush timestamp
26static LAST_FLUSH: OnceLock<std::sync::Mutex<std::time::Instant>> = OnceLock::new();
27
28/// Get the analytics endpoint URL
29fn get_endpoint() -> String {
30    // Check for explicit analytics endpoint override
31    if let Ok(url) = std::env::var("MEMVID_ANALYTICS_URL") {
32        return url;
33    }
34
35    // Derive from dashboard URL if set (env var first, then config file)
36    if let Ok(dashboard_url) = std::env::var("MEMVID_DASHBOARD_URL") {
37        return format!(
38            "{}/api/analytics/ingest",
39            dashboard_url.trim_end_matches('/')
40        );
41    }
42
43    // Check config file for dashboard_url
44    if let Ok(config) = crate::commands::config::PersistentConfig::load() {
45        if let Some(dashboard_url) = config.dashboard_url {
46            return format!(
47                "{}/api/analytics/ingest",
48                dashboard_url.trim_end_matches('/')
49            );
50        }
51    }
52
53    // Use dev endpoint in debug builds if MEMVID_DEV is set
54    #[cfg(debug_assertions)]
55    if std::env::var("MEMVID_DEV").is_ok() {
56        return DEV_ANALYTICS_ENDPOINT.to_string();
57    }
58
59    ANALYTICS_ENDPOINT.to_string()
60}
61
62/// Initialize analytics (no-op, kept for API compatibility)
63pub fn start_background_flush() {
64    // Initialize last flush time
65    LAST_FLUSH.get_or_init(|| std::sync::Mutex::new(std::time::Instant::now()));
66}
67
68/// Force flush analytics synchronously (ignores interval check)
69/// Call this at the end of main() to ensure events are sent
70pub fn force_flush_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
71    do_flush_sync(true)
72}
73
74/// Flush analytics synchronously (for background thread)
75fn flush_analytics_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
76    do_flush_sync(false)
77}
78
79/// Internal flush implementation
80fn do_flush_sync(force: bool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
81    // Check if enough time has passed (skip if force=true)
82    if !force {
83        if let Some(last) = LAST_FLUSH.get() {
84            if let Ok(last_time) = last.lock() {
85                if last_time.elapsed() < Duration::from_secs(FLUSH_INTERVAL_SECS) {
86                    return Ok(()); // Too soon
87                }
88            }
89        }
90    }
91
92    // Check if there are events to flush
93    let count = pending_count();
94    if count == 0 {
95        return Ok(());
96    }
97
98    // Read events
99    let events = read_pending_events();
100    if events.is_empty() {
101        return Ok(());
102    }
103
104    // Send in batches using reqwest blocking client
105    let endpoint = get_endpoint();
106    let client = reqwest::blocking::Client::builder()
107        .connect_timeout(Duration::from_secs(5))
108        .timeout(Duration::from_secs(10))
109        .build()
110        .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
111
112    for chunk in events.chunks(MAX_BATCH_SIZE) {
113        let payload = serde_json::json!({
114            "events": chunk
115        });
116
117        match client
118            .post(&endpoint)
119            .header("Content-Type", "application/json")
120            .json(&payload)
121            .send()
122        {
123            Ok(response) => {
124                if response.status().is_success() {
125                    #[cfg(debug_assertions)]
126                    eprintln!("[analytics] Flushed {} events", chunk.len());
127                } else {
128                    #[cfg(debug_assertions)]
129                    eprintln!("[analytics] Server returned {}", response.status());
130                }
131            }
132            Err(e) => {
133                // Log but don't fail - we'll retry next time
134                #[cfg(debug_assertions)]
135                eprintln!("[analytics] Flush error: {}", e);
136
137                // Update last flush time even on error to prevent hammering
138                if let Some(last) = LAST_FLUSH.get() {
139                    if let Ok(mut last_time) = last.lock() {
140                        *last_time = std::time::Instant::now();
141                    }
142                }
143
144                return Err(Box::new(e));
145            }
146        }
147    }
148
149    // Clear queue on success
150    clear_queue();
151
152    // Update last flush time
153    if let Some(last) = LAST_FLUSH.get() {
154        if let Ok(mut last_time) = last.lock() {
155            *last_time = std::time::Instant::now();
156        }
157    }
158
159    Ok(())
160}
161
162/// Public function to trigger a flush (async spawn)
163pub fn flush_analytics() {
164    std::thread::spawn(|| {
165        let _ = flush_analytics_sync();
166    });
167}
168
169/// Send events directly without queuing (for testing)
170#[cfg(test)]
171#[allow(dead_code)]
172pub fn send_events_direct(events: Vec<AnalyticsEvent>) -> Result<(), Box<dyn std::error::Error>> {
173    let endpoint = get_endpoint();
174    let payload = serde_json::json!({ "events": events });
175
176    reqwest::blocking::Client::new()
177        .post(&endpoint)
178        .header("Content-Type", "application/json")
179        .json(&payload)
180        .send()?;
181
182    Ok(())
183}