memvid_cli/analytics/
flush.rs1use super::queue::{clear_queue, pending_count, read_pending_events, AnalyticsEvent};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::OnceLock;
9use std::time::Duration;
10
11const ANALYTICS_ENDPOINT: &str = "https://memvid.com/api/analytics/ingest";
13
14#[cfg(debug_assertions)]
16const DEV_ANALYTICS_ENDPOINT: &str = "http://localhost:3001/api/analytics/ingest";
17
18const MAX_BATCH_SIZE: usize = 100;
20
21const FLUSH_INTERVAL_SECS: u64 = 30;
23
24static FLUSH_STARTED: AtomicBool = AtomicBool::new(false);
26
27static LAST_FLUSH: OnceLock<std::sync::Mutex<std::time::Instant>> = OnceLock::new();
29
30fn get_endpoint() -> String {
32 if let Ok(url) = std::env::var("MEMVID_ANALYTICS_URL") {
34 return url;
35 }
36
37 if let Ok(dashboard_url) = std::env::var("MEMVID_DASHBOARD_URL") {
39 return format!("{}/api/analytics/ingest", dashboard_url.trim_end_matches('/'));
40 }
41
42 #[cfg(debug_assertions)]
44 if std::env::var("MEMVID_DEV").is_ok() {
45 return DEV_ANALYTICS_ENDPOINT.to_string();
46 }
47
48 ANALYTICS_ENDPOINT.to_string()
49}
50
51pub fn start_background_flush() {
53 LAST_FLUSH.get_or_init(|| std::sync::Mutex::new(std::time::Instant::now()));
55}
56
57pub fn force_flush_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
60 do_flush_sync(true)
61}
62
63fn flush_analytics_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
65 do_flush_sync(false)
66}
67
68fn do_flush_sync(force: bool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
70 if !force {
72 if let Some(last) = LAST_FLUSH.get() {
73 if let Ok(last_time) = last.lock() {
74 if last_time.elapsed() < Duration::from_secs(FLUSH_INTERVAL_SECS) {
75 return Ok(()); }
77 }
78 }
79 }
80
81 let count = pending_count();
83 if count == 0 {
84 return Ok(());
85 }
86
87 let events = read_pending_events();
89 if events.is_empty() {
90 return Ok(());
91 }
92
93 let endpoint = get_endpoint();
95 let client = reqwest::blocking::Client::builder()
96 .connect_timeout(Duration::from_secs(5))
97 .timeout(Duration::from_secs(10))
98 .build()
99 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
100
101 for chunk in events.chunks(MAX_BATCH_SIZE) {
102 let payload = serde_json::json!({
103 "events": chunk
104 });
105
106 match client
107 .post(&endpoint)
108 .header("Content-Type", "application/json")
109 .json(&payload)
110 .send()
111 {
112 Ok(response) => {
113 if response.status().is_success() {
114 #[cfg(debug_assertions)]
115 eprintln!("[analytics] Flushed {} events", chunk.len());
116 } else {
117 #[cfg(debug_assertions)]
118 eprintln!("[analytics] Server returned {}", response.status());
119 }
120 }
121 Err(e) => {
122 #[cfg(debug_assertions)]
124 eprintln!("[analytics] Flush error: {}", e);
125
126 if let Some(last) = LAST_FLUSH.get() {
128 if let Ok(mut last_time) = last.lock() {
129 *last_time = std::time::Instant::now();
130 }
131 }
132
133 return Err(Box::new(e));
134 }
135 }
136 }
137
138 clear_queue();
140
141 if let Some(last) = LAST_FLUSH.get() {
143 if let Ok(mut last_time) = last.lock() {
144 *last_time = std::time::Instant::now();
145 }
146 }
147
148 Ok(())
149}
150
151pub fn flush_analytics() {
153 std::thread::spawn(|| {
154 let _ = flush_analytics_sync();
155 });
156}
157
158#[cfg(test)]
160pub fn send_events_direct(events: Vec<AnalyticsEvent>) -> Result<(), Box<dyn std::error::Error>> {
161 let endpoint = get_endpoint();
162 let payload = serde_json::json!({ "events": events });
163
164 reqwest::blocking::Client::new()
165 .post(&endpoint)
166 .header("Content-Type", "application/json")
167 .json(&payload)
168 .send()?;
169
170 Ok(())
171}