Skip to main content

runmat_core/
telemetry.rs

1use crate::RunMatSession;
2use runmat_telemetry::{
3    serialize_envelope, ProviderSnapshot, RuntimeExecutionCounters, RuntimeFinishedEnvelope,
4    RuntimeFinishedPayload, RuntimeStartedEnvelope, RuntimeStartedPayload, TelemetryRunKind,
5    EVENT_RUNTIME_FINISHED, EVENT_RUNTIME_STARTED,
6};
7use runmat_time::{unix_timestamp_ms, Instant};
8use std::sync::Arc;
9use std::time::Duration;
10use uuid::Uuid;
11
12/// Host-provided transport for runtime telemetry events.
13///
14/// The sink is intentionally synchronous and best-effort: core constructs the JSON payload and
15/// hands it off to the host implementation (CLI HTTP/UDP, Desktop fetch proxy, etc.).
16pub trait TelemetrySink: Send + Sync {
17    fn emit(&self, payload_json: String);
18}
19
20#[derive(Debug, Clone, Default)]
21pub struct TelemetryPlatformInfo {
22    pub os: Option<String>,
23    pub arch: Option<String>,
24}
25
26#[derive(Debug, Clone)]
27pub struct TelemetryRunConfig {
28    pub kind: TelemetryRunKind,
29    pub jit_enabled: bool,
30    pub accelerate_enabled: bool,
31}
32
33#[derive(Debug, Clone, Default)]
34pub struct TelemetryRunFinish {
35    /// If not set, we use wall-clock elapsed time from the run guard.
36    pub duration: Option<Duration>,
37    pub success: bool,
38    pub jit_used: bool,
39    /// A short, privacy-safe error class/identifier (no source snippets).
40    pub error: Option<String>,
41    pub failure: Option<TelemetryFailureInfo>,
42    pub host: Option<TelemetryHost>,
43    pub counters: Option<RuntimeExecutionCounters>,
44    pub provider: Option<ProviderSnapshot>,
45}
46
47#[derive(Debug, Clone)]
48pub struct TelemetryFailureInfo {
49    pub stage: String,
50    pub code: String,
51    pub has_span: bool,
52    pub component: Option<String>,
53}
54
55#[derive(Debug, Clone, Copy)]
56pub enum TelemetryHost {
57    Cli,
58    Wasm,
59    Kernel,
60    Desktop,
61}
62
63impl TelemetryHost {
64    pub fn as_str(&self) -> &'static str {
65        match self {
66            TelemetryHost::Cli => "cli",
67            TelemetryHost::Wasm => "wasm",
68            TelemetryHost::Kernel => "kernel",
69            TelemetryHost::Desktop => "desktop",
70        }
71    }
72}
73
74pub struct TelemetryRunGuard {
75    sink: Arc<dyn TelemetrySink>,
76    cid: Option<String>,
77    platform: TelemetryPlatformInfo,
78    release: String,
79    session_id: String,
80    run_kind: String,
81    started_at: Instant,
82    started_payload: RuntimeStartedPayload,
83}
84
85impl TelemetryRunGuard {
86    pub fn session_id(&self) -> &str {
87        &self.session_id
88    }
89
90    pub fn finish(self, mut finish: TelemetryRunFinish) {
91        let duration = finish
92            .duration
93            .take()
94            .or_else(|| Some(self.started_at.elapsed()));
95        let duration_us = duration.map(|d| (d.as_micros().min(u64::MAX as u128)) as u64);
96
97        let (gpu_wall_ns, gpu_dispatches, upload_bytes, download_bytes, fusion_hits, fusion_misses) =
98            finish
99                .provider
100                .as_ref()
101                .map_or((None, None, None, None, None, None), |snapshot| {
102                    (
103                        Some(snapshot.gpu_wall_ns()),
104                        Some(snapshot.gpu_dispatches()),
105                        Some(snapshot.telemetry.upload_bytes),
106                        Some(snapshot.telemetry.download_bytes),
107                        Some(snapshot.telemetry.fusion_cache_hits),
108                        Some(snapshot.telemetry.fusion_cache_misses),
109                    )
110                });
111
112        let gpu_ratio = match (gpu_wall_ns, duration_us) {
113            (Some(wall_ns), Some(us)) if us > 0 => {
114                Some(clamp_ratio(wall_ns as f64 / (us as f64 * 1000.0)))
115            }
116            _ => None,
117        };
118        let fusion_hit_ratio = match (fusion_hits, fusion_misses) {
119            (Some(h), Some(m)) if h + m > 0 => Some(h as f64 / (h + m) as f64),
120            _ => None,
121        };
122
123        let mut error = finish.error.map(|mut e| {
124            if e.len() > 256 {
125                e.truncate(256);
126            }
127            e
128        });
129        if error.is_none() {
130            error = finish.failure.as_ref().map(|f| f.code.clone());
131        }
132
133        let runtime_failure_stage = finish.failure.as_ref().map(|f| f.stage.clone());
134        let runtime_failure_code = finish.failure.as_ref().map(|f| f.code.clone());
135        let runtime_failure_has_span = finish.failure.as_ref().map(|f| f.has_span);
136        let runtime_failure_component = finish.failure.as_ref().and_then(|f| f.component.clone());
137        let runtime_failure_host = finish.host.map(|h| h.as_str().to_string());
138
139        let envelope: RuntimeFinishedEnvelope = RuntimeFinishedEnvelope {
140            event_label: EVENT_RUNTIME_FINISHED,
141            uuid: Uuid::new_v4().to_string(),
142            cid: self.cid.clone(),
143            session_id: self.session_id.clone(),
144            os: self.platform.os.clone(),
145            arch: self.platform.arch.clone(),
146            release: Some(self.release.clone()),
147            run_kind: self.run_kind.clone(),
148            payload: RuntimeFinishedPayload {
149                duration_us,
150                success: finish.success,
151                jit_enabled: self.started_payload.jit_enabled,
152                jit_used: finish.jit_used,
153                accelerate_enabled: self.started_payload.accelerate_enabled,
154                timestamp_ms: Some(unix_timestamp_ms().min(u64::MAX as u128) as u64),
155                error,
156                runtime_failure_stage,
157                runtime_failure_code,
158                runtime_failure_has_span,
159                runtime_failure_host,
160                runtime_failure_component,
161                counters: finish.counters,
162                provider: finish.provider,
163                gpu_wall_ns,
164                gpu_ratio,
165                gpu_dispatches,
166                gpu_upload_bytes: upload_bytes,
167                gpu_download_bytes: download_bytes,
168                fusion_cache_hits: fusion_hits,
169                fusion_cache_misses: fusion_misses,
170                fusion_hit_ratio,
171            },
172        };
173
174        if let Some(serialized) = serialize_envelope(&envelope) {
175            self.sink.emit(serialized);
176        }
177    }
178}
179
180fn clamp_ratio(value: f64) -> f64 {
181    if value.is_finite() {
182        value.clamp(0.0, 1.0)
183    } else {
184        0.0
185    }
186}
187
188impl RunMatSession {
189    pub fn set_telemetry_sink(&mut self, sink: Option<Arc<dyn TelemetrySink>>) {
190        self.telemetry_sink = sink;
191    }
192
193    pub fn set_telemetry_platform_info(&mut self, platform: TelemetryPlatformInfo) {
194        self.telemetry_platform = platform;
195    }
196
197    pub fn telemetry_platform_info(&self) -> &TelemetryPlatformInfo {
198        &self.telemetry_platform
199    }
200
201    pub fn telemetry_run(&self, config: TelemetryRunConfig) -> Option<TelemetryRunGuard> {
202        if !self.telemetry_consent {
203            return None;
204        }
205        let sink = self.telemetry_sink.as_ref()?.clone();
206
207        let platform = TelemetryPlatformInfo {
208            os: self
209                .telemetry_platform
210                .os
211                .clone()
212                .or_else(|| Some(std::env::consts::OS.to_string())),
213            arch: self
214                .telemetry_platform
215                .arch
216                .clone()
217                .or_else(|| Some(std::env::consts::ARCH.to_string())),
218        };
219
220        let session_id = Uuid::new_v4().to_string();
221        let started_payload = RuntimeStartedPayload {
222            jit_enabled: config.jit_enabled,
223            accelerate_enabled: config.accelerate_enabled,
224            timestamp_ms: Some(unix_timestamp_ms().min(u64::MAX as u128) as u64),
225        };
226        let envelope: RuntimeStartedEnvelope = RuntimeStartedEnvelope {
227            event_label: EVENT_RUNTIME_STARTED,
228            uuid: Uuid::new_v4().to_string(),
229            cid: self.telemetry_client_id.clone(),
230            session_id: session_id.clone(),
231            os: platform.os.clone(),
232            arch: platform.arch.clone(),
233            release: Some(env!("CARGO_PKG_VERSION").to_string()),
234            run_kind: config.kind.as_str().to_string(),
235            payload: started_payload.clone(),
236        };
237
238        if let Some(serialized) = serialize_envelope(&envelope) {
239            sink.emit(serialized);
240        }
241
242        Some(TelemetryRunGuard {
243            sink,
244            cid: self.telemetry_client_id.clone(),
245            platform,
246            release: env!("CARGO_PKG_VERSION").to_string(),
247            session_id,
248            run_kind: config.kind.as_str().to_string(),
249            started_at: Instant::now(),
250            started_payload,
251        })
252    }
253}