1use crate::RunMatSession;
2use runmat_telemetry::{
3 serialize_envelope, ProviderSnapshot, RuntimeExecutionCounters, RuntimeFinishedEnvelope,
4 RuntimeFinishedPayload, RuntimeStartedEnvelope, RuntimeStartedPayload, TelemetryRunKind,
5 EVENT_RUNTIME_FINISHED, EVENT_RUNTIME_STARTED,
6};
7use runmat_time::{unix_timestamp_ms, Instant};
8use std::sync::Arc;
9use std::time::Duration;
10use uuid::Uuid;
11
12pub trait TelemetrySink: Send + Sync {
17 fn emit(&self, payload_json: String);
18}
19
20#[derive(Debug, Clone, Default)]
21pub struct TelemetryPlatformInfo {
22 pub os: Option<String>,
23 pub arch: Option<String>,
24}
25
26#[derive(Debug, Clone)]
27pub struct TelemetryRunConfig {
28 pub kind: TelemetryRunKind,
29 pub jit_enabled: bool,
30 pub accelerate_enabled: bool,
31}
32
33#[derive(Debug, Clone, Default)]
34pub struct TelemetryRunFinish {
35 pub duration: Option<Duration>,
37 pub success: bool,
38 pub jit_used: bool,
39 pub error: Option<String>,
41 pub failure: Option<TelemetryFailureInfo>,
42 pub host: Option<TelemetryHost>,
43 pub counters: Option<RuntimeExecutionCounters>,
44 pub provider: Option<ProviderSnapshot>,
45}
46
47#[derive(Debug, Clone)]
48pub struct TelemetryFailureInfo {
49 pub stage: String,
50 pub code: String,
51 pub has_span: bool,
52 pub component: Option<String>,
53}
54
55#[derive(Debug, Clone, Copy)]
56pub enum TelemetryHost {
57 Cli,
58 Wasm,
59 Kernel,
60 Desktop,
61}
62
63impl TelemetryHost {
64 pub fn as_str(&self) -> &'static str {
65 match self {
66 TelemetryHost::Cli => "cli",
67 TelemetryHost::Wasm => "wasm",
68 TelemetryHost::Kernel => "kernel",
69 TelemetryHost::Desktop => "desktop",
70 }
71 }
72}
73
74pub struct TelemetryRunGuard {
75 sink: Arc<dyn TelemetrySink>,
76 cid: Option<String>,
77 platform: TelemetryPlatformInfo,
78 release: String,
79 session_id: String,
80 run_kind: String,
81 started_at: Instant,
82 started_payload: RuntimeStartedPayload,
83}
84
85impl TelemetryRunGuard {
86 pub fn session_id(&self) -> &str {
87 &self.session_id
88 }
89
90 pub fn finish(self, mut finish: TelemetryRunFinish) {
91 let duration = finish
92 .duration
93 .take()
94 .or_else(|| Some(self.started_at.elapsed()));
95 let duration_us = duration.map(|d| (d.as_micros().min(u64::MAX as u128)) as u64);
96
97 let (gpu_wall_ns, gpu_dispatches, upload_bytes, download_bytes, fusion_hits, fusion_misses) =
98 finish
99 .provider
100 .as_ref()
101 .map_or((None, None, None, None, None, None), |snapshot| {
102 (
103 Some(snapshot.gpu_wall_ns()),
104 Some(snapshot.gpu_dispatches()),
105 Some(snapshot.telemetry.upload_bytes),
106 Some(snapshot.telemetry.download_bytes),
107 Some(snapshot.telemetry.fusion_cache_hits),
108 Some(snapshot.telemetry.fusion_cache_misses),
109 )
110 });
111
112 let gpu_ratio = match (gpu_wall_ns, duration_us) {
113 (Some(wall_ns), Some(us)) if us > 0 => {
114 Some(clamp_ratio(wall_ns as f64 / (us as f64 * 1000.0)))
115 }
116 _ => None,
117 };
118 let fusion_hit_ratio = match (fusion_hits, fusion_misses) {
119 (Some(h), Some(m)) if h + m > 0 => Some(h as f64 / (h + m) as f64),
120 _ => None,
121 };
122
123 let mut error = finish.error.map(|mut e| {
124 if e.len() > 256 {
125 e.truncate(256);
126 }
127 e
128 });
129 if error.is_none() {
130 error = finish.failure.as_ref().map(|f| f.code.clone());
131 }
132
133 let runtime_failure_stage = finish.failure.as_ref().map(|f| f.stage.clone());
134 let runtime_failure_code = finish.failure.as_ref().map(|f| f.code.clone());
135 let runtime_failure_has_span = finish.failure.as_ref().map(|f| f.has_span);
136 let runtime_failure_component = finish.failure.as_ref().and_then(|f| f.component.clone());
137 let runtime_failure_host = finish.host.map(|h| h.as_str().to_string());
138
139 let envelope: RuntimeFinishedEnvelope = RuntimeFinishedEnvelope {
140 event_label: EVENT_RUNTIME_FINISHED,
141 uuid: Uuid::new_v4().to_string(),
142 cid: self.cid.clone(),
143 session_id: self.session_id.clone(),
144 os: self.platform.os.clone(),
145 arch: self.platform.arch.clone(),
146 release: Some(self.release.clone()),
147 run_kind: self.run_kind.clone(),
148 payload: RuntimeFinishedPayload {
149 duration_us,
150 success: finish.success,
151 jit_enabled: self.started_payload.jit_enabled,
152 jit_used: finish.jit_used,
153 accelerate_enabled: self.started_payload.accelerate_enabled,
154 timestamp_ms: Some(unix_timestamp_ms().min(u64::MAX as u128) as u64),
155 error,
156 runtime_failure_stage,
157 runtime_failure_code,
158 runtime_failure_has_span,
159 runtime_failure_host,
160 runtime_failure_component,
161 counters: finish.counters,
162 provider: finish.provider,
163 gpu_wall_ns,
164 gpu_ratio,
165 gpu_dispatches,
166 gpu_upload_bytes: upload_bytes,
167 gpu_download_bytes: download_bytes,
168 fusion_cache_hits: fusion_hits,
169 fusion_cache_misses: fusion_misses,
170 fusion_hit_ratio,
171 },
172 };
173
174 if let Some(serialized) = serialize_envelope(&envelope) {
175 self.sink.emit(serialized);
176 }
177 }
178}
179
180fn clamp_ratio(value: f64) -> f64 {
181 if value.is_finite() {
182 value.clamp(0.0, 1.0)
183 } else {
184 0.0
185 }
186}
187
188impl RunMatSession {
189 pub fn set_telemetry_sink(&mut self, sink: Option<Arc<dyn TelemetrySink>>) {
190 self.telemetry_sink = sink;
191 }
192
193 pub fn set_telemetry_platform_info(&mut self, platform: TelemetryPlatformInfo) {
194 self.telemetry_platform = platform;
195 }
196
197 pub fn telemetry_platform_info(&self) -> &TelemetryPlatformInfo {
198 &self.telemetry_platform
199 }
200
201 pub fn telemetry_run(&self, config: TelemetryRunConfig) -> Option<TelemetryRunGuard> {
202 if !self.telemetry_consent {
203 return None;
204 }
205 let sink = self.telemetry_sink.as_ref()?.clone();
206
207 let platform = TelemetryPlatformInfo {
208 os: self
209 .telemetry_platform
210 .os
211 .clone()
212 .or_else(|| Some(std::env::consts::OS.to_string())),
213 arch: self
214 .telemetry_platform
215 .arch
216 .clone()
217 .or_else(|| Some(std::env::consts::ARCH.to_string())),
218 };
219
220 let session_id = Uuid::new_v4().to_string();
221 let started_payload = RuntimeStartedPayload {
222 jit_enabled: config.jit_enabled,
223 accelerate_enabled: config.accelerate_enabled,
224 timestamp_ms: Some(unix_timestamp_ms().min(u64::MAX as u128) as u64),
225 };
226 let envelope: RuntimeStartedEnvelope = RuntimeStartedEnvelope {
227 event_label: EVENT_RUNTIME_STARTED,
228 uuid: Uuid::new_v4().to_string(),
229 cid: self.telemetry_client_id.clone(),
230 session_id: session_id.clone(),
231 os: platform.os.clone(),
232 arch: platform.arch.clone(),
233 release: Some(env!("CARGO_PKG_VERSION").to_string()),
234 run_kind: config.kind.as_str().to_string(),
235 payload: started_payload.clone(),
236 };
237
238 if let Some(serialized) = serialize_envelope(&envelope) {
239 sink.emit(serialized);
240 }
241
242 Some(TelemetryRunGuard {
243 sink,
244 cid: self.telemetry_client_id.clone(),
245 platform,
246 release: env!("CARGO_PKG_VERSION").to_string(),
247 session_id,
248 run_kind: config.kind.as_str().to_string(),
249 started_at: Instant::now(),
250 started_payload,
251 })
252 }
253}