memvid_cli/analytics/
flush.rs1#[cfg(test)]
7use super::queue::AnalyticsEvent;
8use super::queue::{clear_queue, pending_count, read_pending_events};
9use std::sync::OnceLock;
10use std::time::Duration;
11
12const ANALYTICS_ENDPOINT: &str = "https://memvid.com/api/analytics/ingest";
14
15#[cfg(debug_assertions)]
17const DEV_ANALYTICS_ENDPOINT: &str = "http://localhost:3001/api/analytics/ingest";
18
19const MAX_BATCH_SIZE: usize = 100;
21
22const FLUSH_INTERVAL_SECS: u64 = 30;
24
25static LAST_FLUSH: OnceLock<std::sync::Mutex<std::time::Instant>> = OnceLock::new();
27
28fn get_endpoint() -> String {
30 if let Ok(url) = std::env::var("MEMVID_ANALYTICS_URL") {
32 return url;
33 }
34
35 if let Ok(dashboard_url) = std::env::var("MEMVID_DASHBOARD_URL") {
37 return format!(
38 "{}/api/analytics/ingest",
39 dashboard_url.trim_end_matches('/')
40 );
41 }
42
43 if let Ok(config) = crate::commands::config::PersistentConfig::load() {
45 if let Some(dashboard_url) = config.dashboard_url {
46 return format!(
47 "{}/api/analytics/ingest",
48 dashboard_url.trim_end_matches('/')
49 );
50 }
51 }
52
53 #[cfg(debug_assertions)]
55 if std::env::var("MEMVID_DEV").is_ok() {
56 return DEV_ANALYTICS_ENDPOINT.to_string();
57 }
58
59 ANALYTICS_ENDPOINT.to_string()
60}
61
62pub fn start_background_flush() {
64 LAST_FLUSH.get_or_init(|| std::sync::Mutex::new(std::time::Instant::now()));
66}
67
68pub fn force_flush_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
71 do_flush_sync(true)
72}
73
74fn flush_analytics_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
76 do_flush_sync(false)
77}
78
79fn do_flush_sync(force: bool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
81 if !force {
83 if let Some(last) = LAST_FLUSH.get() {
84 if let Ok(last_time) = last.lock() {
85 if last_time.elapsed() < Duration::from_secs(FLUSH_INTERVAL_SECS) {
86 return Ok(()); }
88 }
89 }
90 }
91
92 let count = pending_count();
94 if count == 0 {
95 return Ok(());
96 }
97
98 let events = read_pending_events();
100 if events.is_empty() {
101 return Ok(());
102 }
103
104 let endpoint = get_endpoint();
106 let client = reqwest::blocking::Client::builder()
107 .connect_timeout(Duration::from_secs(5))
108 .timeout(Duration::from_secs(10))
109 .build()
110 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
111
112 for chunk in events.chunks(MAX_BATCH_SIZE) {
113 let payload = serde_json::json!({
114 "events": chunk
115 });
116
117 match client
118 .post(&endpoint)
119 .header("Content-Type", "application/json")
120 .json(&payload)
121 .send()
122 {
123 Ok(response) => {
124 if response.status().is_success() {
125 #[cfg(debug_assertions)]
126 eprintln!("[analytics] Flushed {} events", chunk.len());
127 } else {
128 #[cfg(debug_assertions)]
129 eprintln!("[analytics] Server returned {}", response.status());
130 }
131 }
132 Err(e) => {
133 #[cfg(debug_assertions)]
135 eprintln!("[analytics] Flush error: {}", e);
136
137 if let Some(last) = LAST_FLUSH.get() {
139 if let Ok(mut last_time) = last.lock() {
140 *last_time = std::time::Instant::now();
141 }
142 }
143
144 return Err(Box::new(e));
145 }
146 }
147 }
148
149 clear_queue();
151
152 if let Some(last) = LAST_FLUSH.get() {
154 if let Ok(mut last_time) = last.lock() {
155 *last_time = std::time::Instant::now();
156 }
157 }
158
159 Ok(())
160}
161
162pub fn flush_analytics() {
164 std::thread::spawn(|| {
165 let _ = flush_analytics_sync();
166 });
167}
168
169#[cfg(test)]
171#[allow(dead_code)]
172pub fn send_events_direct(events: Vec<AnalyticsEvent>) -> Result<(), Box<dyn std::error::Error>> {
173 let endpoint = get_endpoint();
174 let payload = serde_json::json!({ "events": events });
175
176 reqwest::blocking::Client::new()
177 .post(&endpoint)
178 .header("Content-Type", "application/json")
179 .json(&payload)
180 .send()?;
181
182 Ok(())
183}