memvid_cli/analytics/
flush.rs1use super::queue::{clear_queue, pending_count, read_pending_events};
7#[cfg(test)]
8use super::queue::AnalyticsEvent;
9use std::sync::OnceLock;
10use std::time::Duration;
11
12const ANALYTICS_ENDPOINT: &str = "https://memvid.com/api/analytics/ingest";
14
15#[cfg(debug_assertions)]
17const DEV_ANALYTICS_ENDPOINT: &str = "http://localhost:3001/api/analytics/ingest";
18
19const MAX_BATCH_SIZE: usize = 100;
21
22const FLUSH_INTERVAL_SECS: u64 = 30;
24
25static LAST_FLUSH: OnceLock<std::sync::Mutex<std::time::Instant>> = OnceLock::new();
27
28fn get_endpoint() -> String {
30 if let Ok(url) = std::env::var("MEMVID_ANALYTICS_URL") {
32 return url;
33 }
34
35 if let Ok(dashboard_url) = std::env::var("MEMVID_DASHBOARD_URL") {
37 return format!("{}/api/analytics/ingest", dashboard_url.trim_end_matches('/'));
38 }
39
40 if let Ok(config) = crate::commands::config::PersistentConfig::load() {
42 if let Some(dashboard_url) = config.dashboard_url {
43 return format!("{}/api/analytics/ingest", dashboard_url.trim_end_matches('/'));
44 }
45 }
46
47 #[cfg(debug_assertions)]
49 if std::env::var("MEMVID_DEV").is_ok() {
50 return DEV_ANALYTICS_ENDPOINT.to_string();
51 }
52
53 ANALYTICS_ENDPOINT.to_string()
54}
55
56pub fn start_background_flush() {
58 LAST_FLUSH.get_or_init(|| std::sync::Mutex::new(std::time::Instant::now()));
60}
61
62pub fn force_flush_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
65 do_flush_sync(true)
66}
67
68fn flush_analytics_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
70 do_flush_sync(false)
71}
72
73fn do_flush_sync(force: bool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
75 if !force {
77 if let Some(last) = LAST_FLUSH.get() {
78 if let Ok(last_time) = last.lock() {
79 if last_time.elapsed() < Duration::from_secs(FLUSH_INTERVAL_SECS) {
80 return Ok(()); }
82 }
83 }
84 }
85
86 let count = pending_count();
88 if count == 0 {
89 return Ok(());
90 }
91
92 let events = read_pending_events();
94 if events.is_empty() {
95 return Ok(());
96 }
97
98 let endpoint = get_endpoint();
100 let client = reqwest::blocking::Client::builder()
101 .connect_timeout(Duration::from_secs(5))
102 .timeout(Duration::from_secs(10))
103 .build()
104 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
105
106 for chunk in events.chunks(MAX_BATCH_SIZE) {
107 let payload = serde_json::json!({
108 "events": chunk
109 });
110
111 match client
112 .post(&endpoint)
113 .header("Content-Type", "application/json")
114 .json(&payload)
115 .send()
116 {
117 Ok(response) => {
118 if response.status().is_success() {
119 #[cfg(debug_assertions)]
120 eprintln!("[analytics] Flushed {} events", chunk.len());
121 } else {
122 #[cfg(debug_assertions)]
123 eprintln!("[analytics] Server returned {}", response.status());
124 }
125 }
126 Err(e) => {
127 #[cfg(debug_assertions)]
129 eprintln!("[analytics] Flush error: {}", e);
130
131 if let Some(last) = LAST_FLUSH.get() {
133 if let Ok(mut last_time) = last.lock() {
134 *last_time = std::time::Instant::now();
135 }
136 }
137
138 return Err(Box::new(e));
139 }
140 }
141 }
142
143 clear_queue();
145
146 if let Some(last) = LAST_FLUSH.get() {
148 if let Ok(mut last_time) = last.lock() {
149 *last_time = std::time::Instant::now();
150 }
151 }
152
153 Ok(())
154}
155
156pub fn flush_analytics() {
158 std::thread::spawn(|| {
159 let _ = flush_analytics_sync();
160 });
161}
162
163#[cfg(test)]
165#[allow(dead_code)]
166pub fn send_events_direct(events: Vec<AnalyticsEvent>) -> Result<(), Box<dyn std::error::Error>> {
167 let endpoint = get_endpoint();
168 let payload = serde_json::json!({ "events": events });
169
170 reqwest::blocking::Client::new()
171 .post(&endpoint)
172 .header("Content-Type", "application/json")
173 .json(&payload)
174 .send()?;
175
176 Ok(())
177}