1use crate::config::{ResolvedConfig, TelemetryState};
4use crate::event::*;
5use crate::identity::load_or_create;
6use crate::queue::Queue;
7use crate::sender::{http::HttpSender, SenderRuntime};
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::{mpsc, oneshot, Mutex};
13use tokio::task::JoinHandle;
14use tokio::time::interval;
15use tracing::warn;
16use uuid::Uuid;
17
18#[derive(Debug, Clone, Default)]
24pub struct CurrentContext {
25 pub turn_id: Option<Uuid>,
26 pub provider: Option<String>,
27 pub provider_host: Option<String>,
28 pub model: Option<String>,
29 pub repo_origin: Option<RepoOrigin>,
30 pub mode: Option<crate::event::SessionMode>,
31 pub session_id: Option<Uuid>,
32}
33
34tokio::task_local! {
35 static CTX: CurrentContext;
36}
37
38pub fn resolve_provider_host(vendor: &str, base_url: Option<&str>) -> Option<String> {
47 if let Some(raw) = base_url {
48 if let Some(host) = url::Url::parse(raw)
49 .ok()
50 .and_then(|u| u.host_str().map(str::to_string))
51 {
52 return Some(host);
53 }
54 }
55 default_host_for_vendor(vendor)
56}
57
58fn default_host_for_vendor(vendor: &str) -> Option<String> {
59 match vendor {
60 "claude" => Some("api.anthropic.com".into()),
61 "openai" => Some("api.openai.com".into()),
62 "ollama" => Some("localhost".into()),
63 _ => None,
64 }
65}
66
67impl CurrentContext {
68 pub async fn scope<F, Fut, R>(ctx: CurrentContext, fut: F) -> R
69 where
70 F: FnOnce() -> Fut,
71 Fut: std::future::Future<Output = R>,
72 {
73 CTX.scope(ctx, fut()).await
74 }
75
76 pub fn current() -> CurrentContext {
78 CTX.try_with(|c| c.clone()).unwrap_or_default()
79 }
80}
81
82#[derive(Default)]
84pub struct Counters {
85 pub events_tracked: AtomicU64, pub events_dropped_mpsc: AtomicU64, pub events_dropped_disk: AtomicU64, pub segments_posted: AtomicU64,
89 pub bytes_sent: AtomicU64, pub last_post_unix_ms: AtomicI64, }
92
93impl Counters {
94 pub fn snapshot(&self) -> CountersSnapshot {
95 let last_post_unix_ms = self.last_post_unix_ms.load(Ordering::Relaxed);
96 let last_post_iso = if last_post_unix_ms > 0 {
97 chrono::DateTime::from_timestamp_millis(last_post_unix_ms)
98 .map(|utc| utc.with_timezone(&chrono::Local).to_rfc3339())
99 .unwrap_or_default()
100 } else {
101 String::new()
102 };
103 CountersSnapshot {
104 events_tracked: self.events_tracked.load(Ordering::Relaxed),
105 events_dropped_mpsc: self.events_dropped_mpsc.load(Ordering::Relaxed),
106 events_dropped_disk: self.events_dropped_disk.load(Ordering::Relaxed),
107 segments_posted: self.segments_posted.load(Ordering::Relaxed),
108 bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
109 last_post_unix_ms,
110 last_post_iso,
111 }
112 }
113}
114
115#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
116pub struct CountersSnapshot {
117 pub events_tracked: u64,
118 pub events_dropped_mpsc: u64,
119 pub events_dropped_disk: u64,
120 pub segments_posted: u64,
121 pub bytes_sent: u64,
122 pub last_post_unix_ms: i64,
123 #[serde(skip_serializing_if = "String::is_empty", default)]
126 pub last_post_iso: String,
127}
128
129pub struct Telemetry {
130 enabled: bool,
131 tx: Option<mpsc::Sender<Record>>,
132 device_id: Uuid,
133 launch_id: Uuid,
134 session_id: std::sync::Arc<std::sync::RwLock<Uuid>>,
135 account_id: std::sync::Arc<std::sync::RwLock<Option<String>>>,
136 app_version: String,
137 os: &'static str,
138 arch: &'static str,
139 locale: String,
140 started: Instant,
141 sender_task: Mutex<Option<JoinHandle<()>>>,
142 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
143 pub counters: Arc<Counters>,
144 health_path: Option<PathBuf>,
145}
146
147impl Telemetry {
148 pub fn init(cfg: ResolvedConfig, app_version: String) -> Arc<Self> {
149 let locale = sys_locale::get_locale().unwrap_or_else(|| "en-US".into());
150 let os = os_str();
151 let arch = arch_str();
152 let launch_id = Uuid::new_v4();
153
154 if matches!(cfg.state, TelemetryState::Disabled(_)) {
155 return Arc::new(Self {
156 enabled: false,
157 tx: None,
158 device_id: Uuid::nil(),
159 launch_id,
160 session_id: std::sync::Arc::new(std::sync::RwLock::new(launch_id)),
161 account_id: std::sync::Arc::new(std::sync::RwLock::new(None)),
162 app_version,
163 os,
164 arch,
165 locale,
166 started: Instant::now(),
167 sender_task: Mutex::new(None),
168 shutdown_tx: Mutex::new(None),
169 counters: Arc::new(Counters::default()),
170 health_path: None,
171 });
172 }
173
174 let device_id = match load_or_create(&cfg.atomcode_dir) {
175 Ok(id) => id,
176 Err(e) => {
177 warn!(?e, "device_id init failed; disabling");
178 Uuid::nil()
179 }
180 };
181 let qdir = cfg.atomcode_dir.join("telemetry/queue");
182 let queue = match Queue::open(qdir) {
183 Ok(q) => Arc::new(Mutex::new(q)),
184 Err(e) => {
185 warn!(?e, "queue init failed; disabling");
186 return Arc::new(Self {
187 enabled: false,
188 tx: None,
189 device_id: Uuid::nil(),
190 launch_id,
191 session_id: std::sync::Arc::new(std::sync::RwLock::new(launch_id)),
192 account_id: std::sync::Arc::new(std::sync::RwLock::new(None)),
193 app_version,
194 os,
195 arch,
196 locale,
197 started: Instant::now(),
198 sender_task: Mutex::new(None),
199 shutdown_tx: Mutex::new(None),
200 counters: Arc::new(Counters::default()),
201 health_path: None,
202 });
203 }
204 };
205 let (tx, rx) = mpsc::channel::<Record>(1024);
206 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
207 let http = HttpSender::new(cfg.endpoint.clone(), app_version.clone());
208 let counters = Arc::new(Counters::default());
209 let health_path = cfg.atomcode_dir.join("telemetry/health.json");
210 let rt = SenderRuntime::new(
211 queue.clone(),
212 http,
213 counters.clone(),
214 health_path.clone(),
215 );
216 let queue_task = queue.clone();
217 let handle = tokio::spawn(async move {
218 run_sender(rx, rt, queue_task, shutdown_rx).await;
219 });
220
221 tracing::info!("telemetry initialized (enabled)");
222
223 Arc::new(Self {
224 enabled: true,
225 tx: Some(tx),
226 device_id,
227 launch_id,
228 session_id: std::sync::Arc::new(std::sync::RwLock::new(launch_id)),
229 account_id: std::sync::Arc::new(std::sync::RwLock::new(None)),
230 app_version,
231 os,
232 arch,
233 locale,
234 started: Instant::now(),
235 sender_task: Mutex::new(Some(handle)),
236 shutdown_tx: Mutex::new(Some(shutdown_tx)),
237 counters,
238 health_path: Some(health_path),
239 })
240 }
241
242 pub fn track(&self, event: Event) {
244 if !self.enabled {
245 return;
246 }
247 let tx = match &self.tx {
248 Some(t) => t,
249 None => return,
250 };
251 let ctx = CurrentContext::current();
252 let env = Envelope {
253 device_id: self.device_id,
254 launch_id: self.launch_id,
255 account_id: self.account_id.read().ok().and_then(|g| g.clone()),
256 session_id: ctx.session_id
257 .or_else(|| self.session_id.read().ok().map(|g| *g))
258 .unwrap_or(self.launch_id),
259 turn_id: ctx.turn_id,
260 ts: now_ms(),
261 schema_version: crate::SCHEMA_VERSION,
262 app_version: self.app_version.clone(),
263 os: self.os.to_string(),
264 arch: self.arch.to_string(),
265 locale: self.locale.clone(),
266 provider: ctx.provider,
267 provider_host: ctx.provider_host,
268 model: ctx.model,
269 repo_origin: ctx.repo_origin,
270 mode: ctx.mode,
271 };
272 match tx.try_send(Record {
273 envelope: env,
274 event,
275 }) {
276 Ok(()) => {
277 self.counters
278 .events_tracked
279 .fetch_add(1, Ordering::Relaxed);
280 tracing::debug!("telemetry event queued");
281 }
282 Err(_) => {
283 self.counters
284 .events_dropped_mpsc
285 .fetch_add(1, Ordering::Relaxed);
286 tracing::warn!("telemetry mpsc full, event dropped");
287 }
288 }
289 }
290
291 pub async fn shutdown(&self, timeout: Duration) {
301 if let Some(tx) = self.shutdown_tx.lock().await.take() {
302 let _ = tx.send(());
303 }
304 let handle = self.sender_task.lock().await.take();
305 if let Some(h) = handle {
306 let _ = tokio::time::timeout(timeout, h).await;
307 }
308 self.persist_health();
310 tracing::info!("telemetry shutdown complete");
311 }
312
313 pub fn set_account_id(&self, id: Option<String>) {
318 if let Ok(mut g) = self.account_id.write() {
319 *g = id;
320 }
321 }
322
323 pub fn set_session_id(&self, id: Uuid) {
326 if let Ok(mut g) = self.session_id.write() {
327 *g = id;
328 }
329 }
330
331 pub fn is_enabled(&self) -> bool {
332 self.enabled
333 }
334 pub fn device_id(&self) -> Uuid {
335 self.device_id
336 }
337 pub fn launch_id(&self) -> Uuid {
338 self.launch_id
339 }
340 pub fn uptime(&self) -> Duration {
341 self.started.elapsed()
342 }
343
344 pub fn counters_snapshot(&self) -> CountersSnapshot {
345 self.counters.snapshot()
346 }
347
348 fn persist_health(&self) {
349 if let Some(path) = self.health_path.as_ref() {
350 let snap = self.counters.snapshot();
351 if let Ok(json) = serde_json::to_string(&snap) {
352 if let Some(parent) = path.parent() {
353 let _ = std::fs::create_dir_all(parent);
354 }
355 let _ = std::fs::write(path, json);
356 }
357 }
358 }
359
360 #[cfg(any(test, feature = "test-util"))]
362 pub fn in_memory(app_version: String) -> (Arc<Self>, Arc<Mutex<Vec<Record>>>) {
363 let captured = Arc::new(Mutex::new(Vec::new()));
364 let (tx, mut rx) = mpsc::channel::<Record>(1024);
365 let cap = captured.clone();
366 tokio::spawn(async move {
367 while let Some(r) = rx.recv().await {
368 cap.lock().await.push(r);
369 }
370 });
371 let launch_id = Uuid::nil();
372 let t = Arc::new(Self {
373 enabled: true,
374 tx: Some(tx),
375 device_id: Uuid::nil(),
376 launch_id,
377 session_id: std::sync::Arc::new(std::sync::RwLock::new(launch_id)),
378 account_id: std::sync::Arc::new(std::sync::RwLock::new(None)),
379 app_version,
380 os: os_str(),
381 arch: arch_str(),
382 locale: "en-US".into(),
383 started: Instant::now(),
384 sender_task: Mutex::new(None),
385 shutdown_tx: Mutex::new(None),
386 counters: Arc::new(Counters::default()),
387 health_path: None,
388 });
389 (t, captured)
390 }
391}
392
393async fn run_sender(
394 mut rx: mpsc::Receiver<Record>,
395 rt: SenderRuntime,
396 queue: Arc<Mutex<Queue>>,
397 shutdown: oneshot::Receiver<()>,
398) {
399 let mut tick = interval(Duration::from_secs(60));
400 tick.tick().await; let mut shutdown = shutdown;
402 loop {
403 tokio::select! {
404 biased;
405 _ = &mut shutdown => {
406 while let Ok(r) = rx.try_recv() {
408 let mut q = queue.lock().await;
409 if let Err(e) = q.append(&r) { warn!(?e, "telemetry append failed"); }
410 }
411 { let mut q = queue.lock().await; let _ = q.force_roll(); }
412 loop {
418 match rt.flush_one().await {
419 Ok(None) => break,
420 Ok(Some(_)) => continue,
421 Err(e) => {
422 warn!(?e, "telemetry shutdown flush failed; remaining segments retained");
423 break;
424 }
425 }
426 }
427 break;
428 }
429 maybe = rx.recv() => {
430 match maybe {
431 Some(r) => {
432 let mut q = queue.lock().await;
433 if let Err(e) = q.append(&r) { warn!(?e, "telemetry append failed"); }
434 }
435 None => {
436 rt.drain_with_backoff().await;
438 break;
439 }
440 }
441 }
442 _ = tick.tick() => {
443 { let mut q = queue.lock().await; let _ = q.force_roll(); }
444 rt.drain_with_backoff().await;
445 }
446 }
447 }
448}
449
450fn now_ms() -> i64 {
451 SystemTime::now()
452 .duration_since(UNIX_EPOCH)
453 .map(|d| d.as_millis() as i64)
454 .unwrap_or(0)
455}
456
457fn os_str() -> &'static str {
458 if cfg!(target_os = "macos") {
459 "macos"
460 } else if cfg!(target_os = "linux") {
461 "linux"
462 } else if cfg!(target_os = "windows") {
463 "windows"
464 } else {
465 "other"
466 }
467}
468
469fn arch_str() -> &'static str {
470 if cfg!(target_arch = "x86_64") {
471 "x86_64"
472 } else if cfg!(target_arch = "aarch64") {
473 "aarch64"
474 } else {
475 "other"
476 }
477}
478
479mod sys_locale {
480 pub fn get_locale() -> Option<String> {
482 let raw = std::env::var("LANG")
483 .ok()
484 .or_else(|| std::env::var("LC_ALL").ok());
485 match raw.as_deref() {
486 Some("C") | Some("POSIX") | None => {
489 #[cfg(target_os = "macos")]
491 {
492 if let Ok(output) = std::process::Command::new("defaults")
493 .args(["read", "-g", "AppleLocale"])
494 .output()
495 {
496 if output.status.success() {
497 let locale = String::from_utf8_lossy(&output.stdout)
498 .trim()
499 .replace('_', "-");
500 if !locale.is_empty() {
501 return Some(locale);
502 }
503 }
504 }
505 }
506 Some("en-US".to_string())
507 }
508 Some(val) => Some(val.split('.').next().unwrap_or(val).replace('_', "-")),
509 }
510 }
511}
512
513#[cfg(test)]
514mod resolve_host_tests {
515 use super::resolve_provider_host;
516
517 #[test]
518 fn parses_host_from_full_url() {
519 assert_eq!(
520 resolve_provider_host("openai", Some("https://api-ai.gitcode.com/v1")),
521 Some("api-ai.gitcode.com".into())
522 );
523 }
524
525 #[test]
526 fn drops_port_path_userinfo() {
527 assert_eq!(
529 resolve_provider_host("openai", Some("https://user:pass@api.example.com:8443/v1/foo?bar=baz")),
530 Some("api.example.com".into())
531 );
532 }
533
534 #[test]
535 fn falls_back_to_vendor_default_when_url_missing() {
536 assert_eq!(resolve_provider_host("claude", None), Some("api.anthropic.com".into()));
537 assert_eq!(resolve_provider_host("openai", None), Some("api.openai.com".into()));
538 assert_eq!(resolve_provider_host("ollama", None), Some("localhost".into()));
539 }
540
541 #[test]
542 fn falls_back_to_vendor_default_when_url_unparseable() {
543 assert_eq!(
544 resolve_provider_host("claude", Some("not a url")),
545 Some("api.anthropic.com".into())
546 );
547 }
548
549 #[test]
550 fn unknown_vendor_with_no_url_yields_none() {
551 assert_eq!(resolve_provider_host("unknown_vendor", None), None);
552 }
553
554 #[test]
555 fn unknown_vendor_with_url_still_uses_url_host() {
556 assert_eq!(
557 resolve_provider_host("unknown_vendor", Some("https://api.example.com")),
558 Some("api.example.com".into())
559 );
560 }
561}
562
563#[cfg(test)]
564mod session_id_tests {
565 use super::*;
566 use crate::event::Event;
567
568 #[tokio::test]
569 async fn current_context_session_id_override_wins_over_telemetry_field() {
570 let (tel, captured) = Telemetry::in_memory("test".into());
571
572 let launch = tel.launch_id();
574 tel.set_session_id(launch);
575
576 let override_uuid = Uuid::new_v4();
578 CurrentContext::scope(
579 CurrentContext {
580 session_id: Some(override_uuid),
581 ..Default::default()
582 },
583 || async {
584 tel.track(Event::OpenAtomcode);
585 },
586 )
587 .await;
588
589 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
591
592 let records = captured.lock().await;
593 assert_eq!(records.len(), 1);
594 assert_eq!(
595 records[0].envelope.session_id, override_uuid,
596 "CurrentContext.session_id should override the Telemetry-level session_id"
597 );
598 }
599}