memvid_cli/analytics/
flush.rs1use super::queue::{clear_queue, pending_count, read_pending_events};
7#[cfg(test)]
8use super::queue::AnalyticsEvent;
9use std::sync::OnceLock;
10use std::time::Duration;
11
12const ANALYTICS_ENDPOINT: &str = "https://memvid.com/api/analytics/ingest";
14
15#[cfg(debug_assertions)]
17const DEV_ANALYTICS_ENDPOINT: &str = "http://localhost:3001/api/analytics/ingest";
18
19const MAX_BATCH_SIZE: usize = 100;
21
22const FLUSH_INTERVAL_SECS: u64 = 30;
24
25static LAST_FLUSH: OnceLock<std::sync::Mutex<std::time::Instant>> = OnceLock::new();
27
28fn get_endpoint() -> String {
30 if let Ok(url) = std::env::var("MEMVID_ANALYTICS_URL") {
32 return url;
33 }
34
35 if let Ok(dashboard_url) = std::env::var("MEMVID_DASHBOARD_URL") {
37 return format!("{}/api/analytics/ingest", dashboard_url.trim_end_matches('/'));
38 }
39
40 #[cfg(debug_assertions)]
42 if std::env::var("MEMVID_DEV").is_ok() {
43 return DEV_ANALYTICS_ENDPOINT.to_string();
44 }
45
46 ANALYTICS_ENDPOINT.to_string()
47}
48
49pub fn start_background_flush() {
51 LAST_FLUSH.get_or_init(|| std::sync::Mutex::new(std::time::Instant::now()));
53}
54
55pub fn force_flush_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
58 do_flush_sync(true)
59}
60
61fn flush_analytics_sync() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
63 do_flush_sync(false)
64}
65
66fn do_flush_sync(force: bool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
68 if !force {
70 if let Some(last) = LAST_FLUSH.get() {
71 if let Ok(last_time) = last.lock() {
72 if last_time.elapsed() < Duration::from_secs(FLUSH_INTERVAL_SECS) {
73 return Ok(()); }
75 }
76 }
77 }
78
79 let count = pending_count();
81 if count == 0 {
82 return Ok(());
83 }
84
85 let events = read_pending_events();
87 if events.is_empty() {
88 return Ok(());
89 }
90
91 let endpoint = get_endpoint();
93 let client = reqwest::blocking::Client::builder()
94 .connect_timeout(Duration::from_secs(5))
95 .timeout(Duration::from_secs(10))
96 .build()
97 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
98
99 for chunk in events.chunks(MAX_BATCH_SIZE) {
100 let payload = serde_json::json!({
101 "events": chunk
102 });
103
104 match client
105 .post(&endpoint)
106 .header("Content-Type", "application/json")
107 .json(&payload)
108 .send()
109 {
110 Ok(response) => {
111 if response.status().is_success() {
112 #[cfg(debug_assertions)]
113 eprintln!("[analytics] Flushed {} events", chunk.len());
114 } else {
115 #[cfg(debug_assertions)]
116 eprintln!("[analytics] Server returned {}", response.status());
117 }
118 }
119 Err(e) => {
120 #[cfg(debug_assertions)]
122 eprintln!("[analytics] Flush error: {}", e);
123
124 if let Some(last) = LAST_FLUSH.get() {
126 if let Ok(mut last_time) = last.lock() {
127 *last_time = std::time::Instant::now();
128 }
129 }
130
131 return Err(Box::new(e));
132 }
133 }
134 }
135
136 clear_queue();
138
139 if let Some(last) = LAST_FLUSH.get() {
141 if let Ok(mut last_time) = last.lock() {
142 *last_time = std::time::Instant::now();
143 }
144 }
145
146 Ok(())
147}
148
149pub fn flush_analytics() {
151 std::thread::spawn(|| {
152 let _ = flush_analytics_sync();
153 });
154}
155
156#[cfg(test)]
158#[allow(dead_code)]
159pub fn send_events_direct(events: Vec<AnalyticsEvent>) -> Result<(), Box<dyn std::error::Error>> {
160 let endpoint = get_endpoint();
161 let payload = serde_json::json!({ "events": events });
162
163 reqwest::blocking::Client::new()
164 .post(&endpoint)
165 .header("Content-Type", "application/json")
166 .json(&payload)
167 .send()?;
168
169 Ok(())
170}