Skip to main content

atomcode_telemetry/
runtime.rs

1//! `Telemetry` handle: public API used by hosts.
2
3use crate::config::{ResolvedConfig, TelemetryState};
4use crate::event::*;
5use crate::identity::load_or_create;
6use crate::queue::Queue;
7use crate::sender::{http::HttpSender, SenderRuntime};
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::{mpsc, oneshot, Mutex};
13use tokio::task::JoinHandle;
14use tokio::time::interval;
15use tracing::warn;
16use uuid::Uuid;
17
18/// Per-event context auto-filled by `track`. Set with `CurrentContext::scope(...)`.
19///
20/// Note: `account_id` lives on `Telemetry` itself (see `set_account_id`) rather
21/// than here, because login state outlives any single scope and must apply to
22/// all events emitted after sign-in, including `login_success` itself.
23#[derive(Debug, Clone, Default)]
24pub struct CurrentContext {
25    pub turn_id: Option<Uuid>,
26    pub provider: Option<String>,
27    pub provider_host: Option<String>,
28    pub model: Option<String>,
29    pub repo_origin: Option<RepoOrigin>,
30    pub mode: Option<crate::event::SessionMode>,
31    pub session_id: Option<Uuid>,
32}
33
34tokio::task_local! {
35    static CTX: CurrentContext;
36}
37
38/// Resolve the `provider_host` envelope field from a vendor type and an
39/// optional configured `base_url`.
40///
41/// 1. If `base_url` parses as a URL with a host → that host (no scheme,
42///    no port, no path; path/query are dropped because they may carry
43///    tokens or tenant ids).
44/// 2. Otherwise fall back to each vendor's well-known host.
45/// 3. Unknown vendor with no parseable URL → `None`.
46pub fn resolve_provider_host(vendor: &str, base_url: Option<&str>) -> Option<String> {
47    if let Some(raw) = base_url {
48        if let Some(host) = url::Url::parse(raw)
49            .ok()
50            .and_then(|u| u.host_str().map(str::to_string))
51        {
52            return Some(host);
53        }
54    }
55    default_host_for_vendor(vendor)
56}
57
58fn default_host_for_vendor(vendor: &str) -> Option<String> {
59    match vendor {
60        "claude" => Some("api.anthropic.com".into()),
61        "openai" => Some("api.openai.com".into()),
62        "ollama" => Some("localhost".into()),
63        _ => None,
64    }
65}
66
67impl CurrentContext {
68    pub async fn scope<F, Fut, R>(ctx: CurrentContext, fut: F) -> R
69    where
70        F: FnOnce() -> Fut,
71        Fut: std::future::Future<Output = R>,
72    {
73        CTX.scope(ctx, fut()).await
74    }
75
76    /// Access the current context, or default if unset.
77    pub fn current() -> CurrentContext {
78        CTX.try_with(|c| c.clone()).unwrap_or_default()
79    }
80}
81
82/// In-process atomic counters for observability. Shared between `Telemetry` and `SenderRuntime`.
83#[derive(Default)]
84pub struct Counters {
85    pub events_tracked: AtomicU64,       // successful try_send
86    pub events_dropped_mpsc: AtomicU64,  // try_send failed (channel full)
87    pub events_dropped_disk: AtomicU64,  // FIFO evicted from disk queue (cap exceeded)
88    pub segments_posted: AtomicU64,
89    pub bytes_sent: AtomicU64,           // gzipped body bytes
90    pub last_post_unix_ms: AtomicI64,    // 0 = never
91}
92
93impl Counters {
94    pub fn snapshot(&self) -> CountersSnapshot {
95        let last_post_unix_ms = self.last_post_unix_ms.load(Ordering::Relaxed);
96        let last_post_iso = if last_post_unix_ms > 0 {
97            chrono::DateTime::from_timestamp_millis(last_post_unix_ms)
98                .map(|utc| utc.with_timezone(&chrono::Local).to_rfc3339())
99                .unwrap_or_default()
100        } else {
101            String::new()
102        };
103        CountersSnapshot {
104            events_tracked: self.events_tracked.load(Ordering::Relaxed),
105            events_dropped_mpsc: self.events_dropped_mpsc.load(Ordering::Relaxed),
106            events_dropped_disk: self.events_dropped_disk.load(Ordering::Relaxed),
107            segments_posted: self.segments_posted.load(Ordering::Relaxed),
108            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
109            last_post_unix_ms,
110            last_post_iso,
111        }
112    }
113}
114
115#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
116pub struct CountersSnapshot {
117    pub events_tracked: u64,
118    pub events_dropped_mpsc: u64,
119    pub events_dropped_disk: u64,
120    pub segments_posted: u64,
121    pub bytes_sent: u64,
122    pub last_post_unix_ms: i64,
123    /// RFC 3339 with local timezone offset, derived from last_post_unix_ms.
124    /// Empty string when last_post_unix_ms == 0 (never posted).
125    #[serde(skip_serializing_if = "String::is_empty", default)]
126    pub last_post_iso: String,
127}
128
129pub struct Telemetry {
130    enabled: bool,
131    tx: Option<mpsc::Sender<Record>>,
132    device_id: Uuid,
133    launch_id: Uuid,
134    session_id: std::sync::Arc<std::sync::RwLock<Uuid>>,
135    account_id: std::sync::Arc<std::sync::RwLock<Option<String>>>,
136    app_version: String,
137    os: &'static str,
138    arch: &'static str,
139    locale: String,
140    started: Instant,
141    sender_task: Mutex<Option<JoinHandle<()>>>,
142    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
143    pub counters: Arc<Counters>,
144    health_path: Option<PathBuf>,
145}
146
147impl Telemetry {
148    pub fn init(cfg: ResolvedConfig, app_version: String) -> Arc<Self> {
149        let locale = sys_locale::get_locale().unwrap_or_else(|| "en-US".into());
150        let os = os_str();
151        let arch = arch_str();
152        let launch_id = Uuid::new_v4();
153
154        if matches!(cfg.state, TelemetryState::Disabled(_)) {
155            return Arc::new(Self {
156                enabled: false,
157                tx: None,
158                device_id: Uuid::nil(),
159                launch_id,
160                session_id: std::sync::Arc::new(std::sync::RwLock::new(launch_id)),
161                account_id: std::sync::Arc::new(std::sync::RwLock::new(None)),
162                app_version,
163                os,
164                arch,
165                locale,
166                started: Instant::now(),
167                sender_task: Mutex::new(None),
168                shutdown_tx: Mutex::new(None),
169                counters: Arc::new(Counters::default()),
170                health_path: None,
171            });
172        }
173
174        let device_id = match load_or_create(&cfg.atomcode_dir) {
175            Ok(id) => id,
176            Err(e) => {
177                warn!(?e, "device_id init failed; disabling");
178                Uuid::nil()
179            }
180        };
181        let qdir = cfg.atomcode_dir.join("telemetry/queue");
182        let queue = match Queue::open(qdir) {
183            Ok(q) => Arc::new(Mutex::new(q)),
184            Err(e) => {
185                warn!(?e, "queue init failed; disabling");
186                return Arc::new(Self {
187                    enabled: false,
188                    tx: None,
189                    device_id: Uuid::nil(),
190                    launch_id,
191                    session_id: std::sync::Arc::new(std::sync::RwLock::new(launch_id)),
192                    account_id: std::sync::Arc::new(std::sync::RwLock::new(None)),
193                    app_version,
194                    os,
195                    arch,
196                    locale,
197                    started: Instant::now(),
198                    sender_task: Mutex::new(None),
199                    shutdown_tx: Mutex::new(None),
200                    counters: Arc::new(Counters::default()),
201                    health_path: None,
202                });
203            }
204        };
205        let (tx, rx) = mpsc::channel::<Record>(1024);
206        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
207        let http = HttpSender::new(cfg.endpoint.clone(), app_version.clone());
208        let counters = Arc::new(Counters::default());
209        let health_path = cfg.atomcode_dir.join("telemetry/health.json");
210        let rt = SenderRuntime::new(
211            queue.clone(),
212            http,
213            counters.clone(),
214            health_path.clone(),
215        );
216        let queue_task = queue.clone();
217        let handle = tokio::spawn(async move {
218            run_sender(rx, rt, queue_task, shutdown_rx).await;
219        });
220
221        tracing::info!("telemetry initialized (enabled)");
222
223        Arc::new(Self {
224            enabled: true,
225            tx: Some(tx),
226            device_id,
227            launch_id,
228            session_id: std::sync::Arc::new(std::sync::RwLock::new(launch_id)),
229            account_id: std::sync::Arc::new(std::sync::RwLock::new(None)),
230            app_version,
231            os,
232            arch,
233            locale,
234            started: Instant::now(),
235            sender_task: Mutex::new(Some(handle)),
236            shutdown_tx: Mutex::new(Some(shutdown_tx)),
237            counters,
238            health_path: Some(health_path),
239        })
240    }
241
242    /// Non-blocking emit. Drops silently on backpressure or if disabled.
243    pub fn track(&self, event: Event) {
244        if !self.enabled {
245            return;
246        }
247        let tx = match &self.tx {
248            Some(t) => t,
249            None => return,
250        };
251        let ctx = CurrentContext::current();
252        let env = Envelope {
253            device_id: self.device_id,
254            launch_id: self.launch_id,
255            account_id: self.account_id.read().ok().and_then(|g| g.clone()),
256            session_id: ctx.session_id
257                .or_else(|| self.session_id.read().ok().map(|g| *g))
258                .unwrap_or(self.launch_id),
259            turn_id: ctx.turn_id,
260            ts: now_ms(),
261            schema_version: crate::SCHEMA_VERSION,
262            app_version: self.app_version.clone(),
263            os: self.os.to_string(),
264            arch: self.arch.to_string(),
265            locale: self.locale.clone(),
266            provider: ctx.provider,
267            provider_host: ctx.provider_host,
268            model: ctx.model,
269            repo_origin: ctx.repo_origin,
270            mode: ctx.mode,
271        };
272        match tx.try_send(Record {
273            envelope: env,
274            event,
275        }) {
276            Ok(()) => {
277                self.counters
278                    .events_tracked
279                    .fetch_add(1, Ordering::Relaxed);
280                tracing::debug!("telemetry event queued");
281            }
282            Err(_) => {
283                self.counters
284                    .events_dropped_mpsc
285                    .fetch_add(1, Ordering::Relaxed);
286                tracing::warn!("telemetry mpsc full, event dropped");
287            }
288        }
289    }
290
291    /// Signal the sender task to drain mpsc→disk, force-roll the current segment,
292    /// and attempt **one** HTTP send before the process exits. The whole operation
293    /// is bounded by `timeout` (default exit budget is 500ms); if the network call
294    /// outruns the budget the future is cancelled and the segment stays on disk
295    /// for the next process to pick up.
296    ///
297    /// We intentionally do *not* close the mpsc channel: `self.tx` is shared
298    /// (`Arc<Telemetry>`), and closing would race with concurrent callers.
299    /// Instead we use a oneshot to ask the task to exit cleanly.
300    pub async fn shutdown(&self, timeout: Duration) {
301        if let Some(tx) = self.shutdown_tx.lock().await.take() {
302            let _ = tx.send(());
303        }
304        let handle = self.sender_task.lock().await.take();
305        if let Some(h) = handle {
306            let _ = tokio::time::timeout(timeout, h).await;
307        }
308        // Persist final health snapshot regardless of send outcome.
309        self.persist_health();
310        tracing::info!("telemetry shutdown complete");
311    }
312
313    /// Update the active account ID. Pass `Some(id)` after a successful login
314    /// (call this *before* emitting `login_success` so the event itself carries
315    /// the id) and `None` after logout. All subsequent events emitted by this
316    /// process will carry the new value via the envelope.
317    pub fn set_account_id(&self, id: Option<String>) {
318        if let Ok(mut g) = self.account_id.write() {
319            *g = id;
320        }
321    }
322
323    /// Update the active session ID (e.g. when a new AtomCode session is
324    /// established or the user switches session via /session or /resume).
325    pub fn set_session_id(&self, id: Uuid) {
326        if let Ok(mut g) = self.session_id.write() {
327            *g = id;
328        }
329    }
330
331    pub fn is_enabled(&self) -> bool {
332        self.enabled
333    }
334    pub fn device_id(&self) -> Uuid {
335        self.device_id
336    }
337    pub fn launch_id(&self) -> Uuid {
338        self.launch_id
339    }
340    pub fn uptime(&self) -> Duration {
341        self.started.elapsed()
342    }
343
344    pub fn counters_snapshot(&self) -> CountersSnapshot {
345        self.counters.snapshot()
346    }
347
348    fn persist_health(&self) {
349        if let Some(path) = self.health_path.as_ref() {
350            let snap = self.counters.snapshot();
351            if let Ok(json) = serde_json::to_string(&snap) {
352                if let Some(parent) = path.parent() {
353                    let _ = std::fs::create_dir_all(parent);
354                }
355                let _ = std::fs::write(path, json);
356            }
357        }
358    }
359
360    /// In-memory test handle: events captured into a shared Vec, no disk/network.
361    #[cfg(any(test, feature = "test-util"))]
362    pub fn in_memory(app_version: String) -> (Arc<Self>, Arc<Mutex<Vec<Record>>>) {
363        let captured = Arc::new(Mutex::new(Vec::new()));
364        let (tx, mut rx) = mpsc::channel::<Record>(1024);
365        let cap = captured.clone();
366        tokio::spawn(async move {
367            while let Some(r) = rx.recv().await {
368                cap.lock().await.push(r);
369            }
370        });
371        let launch_id = Uuid::nil();
372        let t = Arc::new(Self {
373            enabled: true,
374            tx: Some(tx),
375            device_id: Uuid::nil(),
376            launch_id,
377            session_id: std::sync::Arc::new(std::sync::RwLock::new(launch_id)),
378            account_id: std::sync::Arc::new(std::sync::RwLock::new(None)),
379            app_version,
380            os: os_str(),
381            arch: arch_str(),
382            locale: "en-US".into(),
383            started: Instant::now(),
384            sender_task: Mutex::new(None),
385            shutdown_tx: Mutex::new(None),
386            counters: Arc::new(Counters::default()),
387            health_path: None,
388        });
389        (t, captured)
390    }
391}
392
393async fn run_sender(
394    mut rx: mpsc::Receiver<Record>,
395    rt: SenderRuntime,
396    queue: Arc<Mutex<Queue>>,
397    shutdown: oneshot::Receiver<()>,
398) {
399    let mut tick = interval(Duration::from_secs(60));
400    tick.tick().await; // consume the immediate first tick
401    let mut shutdown = shutdown;
402    loop {
403        tokio::select! {
404            biased;
405            _ = &mut shutdown => {
406                // Pull anything still sitting in mpsc into the active segment.
407                while let Ok(r) = rx.try_recv() {
408                    let mut q = queue.lock().await;
409                    if let Err(e) = q.append(&r) { warn!(?e, "telemetry append failed"); }
410                }
411                { let mut q = queue.lock().await; let _ = q.force_roll(); }
412                // Drain ALL pending segments (oldest first). flush_one only
413                // dispatches the oldest, so a single call would skip the just-
414                // rolled current segment whenever historical segments are
415                // present. No backoff: caller (Telemetry::shutdown) bounds
416                // total time via tokio::time::timeout on the JoinHandle.
417                loop {
418                    match rt.flush_one().await {
419                        Ok(None) => break,
420                        Ok(Some(_)) => continue,
421                        Err(e) => {
422                            warn!(?e, "telemetry shutdown flush failed; remaining segments retained");
423                            break;
424                        }
425                    }
426                }
427                break;
428            }
429            maybe = rx.recv() => {
430                match maybe {
431                    Some(r) => {
432                        let mut q = queue.lock().await;
433                        if let Err(e) = q.append(&r) { warn!(?e, "telemetry append failed"); }
434                    }
435                    None => {
436                        // channel closed — drain sender once and exit
437                        rt.drain_with_backoff().await;
438                        break;
439                    }
440                }
441            }
442            _ = tick.tick() => {
443                { let mut q = queue.lock().await; let _ = q.force_roll(); }
444                rt.drain_with_backoff().await;
445            }
446        }
447    }
448}
449
450fn now_ms() -> i64 {
451    SystemTime::now()
452        .duration_since(UNIX_EPOCH)
453        .map(|d| d.as_millis() as i64)
454        .unwrap_or(0)
455}
456
457fn os_str() -> &'static str {
458    if cfg!(target_os = "macos") {
459        "macos"
460    } else if cfg!(target_os = "linux") {
461        "linux"
462    } else if cfg!(target_os = "windows") {
463        "windows"
464    } else {
465        "other"
466    }
467}
468
469fn arch_str() -> &'static str {
470    if cfg!(target_arch = "x86_64") {
471        "x86_64"
472    } else if cfg!(target_arch = "aarch64") {
473        "aarch64"
474    } else {
475        "other"
476    }
477}
478
479mod sys_locale {
480    /// Minimal locale getter without pulling a new crate: read env, fallback.
481    pub fn get_locale() -> Option<String> {
482        let raw = std::env::var("LANG")
483            .ok()
484            .or_else(|| std::env::var("LC_ALL").ok());
485        match raw.as_deref() {
486            // "C" and "POSIX" are not real locales — common when daemon is
487            // spawned by VS Code (launchd environment on macOS).
488            Some("C") | Some("POSIX") | None => {
489                // On macOS, try AppleLocale from user defaults
490                #[cfg(target_os = "macos")]
491                {
492                    if let Ok(output) = std::process::Command::new("defaults")
493                        .args(["read", "-g", "AppleLocale"])
494                        .output()
495                    {
496                        if output.status.success() {
497                            let locale = String::from_utf8_lossy(&output.stdout)
498                                .trim()
499                                .replace('_', "-");
500                            if !locale.is_empty() {
501                                return Some(locale);
502                            }
503                        }
504                    }
505                }
506                Some("en-US".to_string())
507            }
508            Some(val) => Some(val.split('.').next().unwrap_or(val).replace('_', "-")),
509        }
510    }
511}
512
513#[cfg(test)]
514mod resolve_host_tests {
515    use super::resolve_provider_host;
516
517    #[test]
518    fn parses_host_from_full_url() {
519        assert_eq!(
520            resolve_provider_host("openai", Some("https://api-ai.gitcode.com/v1")),
521            Some("api-ai.gitcode.com".into())
522        );
523    }
524
525    #[test]
526    fn drops_port_path_userinfo() {
527        // Port and path are stripped — only the bare host remains.
528        assert_eq!(
529            resolve_provider_host("openai", Some("https://user:pass@api.example.com:8443/v1/foo?bar=baz")),
530            Some("api.example.com".into())
531        );
532    }
533
534    #[test]
535    fn falls_back_to_vendor_default_when_url_missing() {
536        assert_eq!(resolve_provider_host("claude", None), Some("api.anthropic.com".into()));
537        assert_eq!(resolve_provider_host("openai", None), Some("api.openai.com".into()));
538        assert_eq!(resolve_provider_host("ollama", None), Some("localhost".into()));
539    }
540
541    #[test]
542    fn falls_back_to_vendor_default_when_url_unparseable() {
543        assert_eq!(
544            resolve_provider_host("claude", Some("not a url")),
545            Some("api.anthropic.com".into())
546        );
547    }
548
549    #[test]
550    fn unknown_vendor_with_no_url_yields_none() {
551        assert_eq!(resolve_provider_host("unknown_vendor", None), None);
552    }
553
554    #[test]
555    fn unknown_vendor_with_url_still_uses_url_host() {
556        assert_eq!(
557            resolve_provider_host("unknown_vendor", Some("https://api.example.com")),
558            Some("api.example.com".into())
559        );
560    }
561}
562
563#[cfg(test)]
564mod session_id_tests {
565    use super::*;
566    use crate::event::Event;
567
568    #[tokio::test]
569    async fn current_context_session_id_override_wins_over_telemetry_field() {
570        let (tel, captured) = Telemetry::in_memory("test".into());
571
572        // Simulate CLI-style: set session_id on the Telemetry struct itself
573        let launch = tel.launch_id();
574        tel.set_session_id(launch);
575
576        // Now use a per-scope override via CurrentContext
577        let override_uuid = Uuid::new_v4();
578        CurrentContext::scope(
579            CurrentContext {
580                session_id: Some(override_uuid),
581                ..Default::default()
582            },
583            || async {
584                tel.track(Event::OpenAtomcode);
585            },
586        )
587        .await;
588
589        // Allow the mpsc receiver task to process
590        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
591
592        let records = captured.lock().await;
593        assert_eq!(records.len(), 1);
594        assert_eq!(
595            records[0].envelope.session_id, override_uuid,
596            "CurrentContext.session_id should override the Telemetry-level session_id"
597        );
598    }
599}