use crate::RunMatSession;
use runmat_telemetry::{
serialize_envelope, ProviderSnapshot, RuntimeExecutionCounters, RuntimeFinishedEnvelope,
RuntimeFinishedPayload, RuntimeStartedEnvelope, RuntimeStartedPayload, TelemetryRunKind,
EVENT_RUNTIME_FINISHED, EVENT_RUNTIME_STARTED,
};
use runmat_time::{unix_timestamp_ms, Instant};
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
pub trait TelemetrySink: Send + Sync {
fn emit(&self, payload_json: String);
}
#[derive(Debug, Clone, Default)]
pub struct TelemetryPlatformInfo {
pub os: Option<String>,
pub arch: Option<String>,
}
#[derive(Debug, Clone)]
pub struct TelemetryRunConfig {
pub kind: TelemetryRunKind,
pub jit_enabled: bool,
pub accelerate_enabled: bool,
}
#[derive(Debug, Clone, Default)]
pub struct TelemetryRunFinish {
pub duration: Option<Duration>,
pub success: bool,
pub jit_used: bool,
pub error: Option<String>,
pub failure: Option<TelemetryFailureInfo>,
pub host: Option<TelemetryHost>,
pub counters: Option<RuntimeExecutionCounters>,
pub provider: Option<ProviderSnapshot>,
}
#[derive(Debug, Clone)]
pub struct TelemetryFailureInfo {
pub stage: String,
pub code: String,
pub has_span: bool,
pub component: Option<String>,
}
#[derive(Debug, Clone, Copy)]
pub enum TelemetryHost {
Cli,
Wasm,
Kernel,
Desktop,
}
impl TelemetryHost {
pub fn as_str(&self) -> &'static str {
match self {
TelemetryHost::Cli => "cli",
TelemetryHost::Wasm => "wasm",
TelemetryHost::Kernel => "kernel",
TelemetryHost::Desktop => "desktop",
}
}
}
pub struct TelemetryRunGuard {
sink: Arc<dyn TelemetrySink>,
cid: Option<String>,
platform: TelemetryPlatformInfo,
release: String,
session_id: String,
run_kind: String,
started_at: Instant,
started_payload: RuntimeStartedPayload,
}
impl TelemetryRunGuard {
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn finish(self, mut finish: TelemetryRunFinish) {
let duration = finish
.duration
.take()
.or_else(|| Some(self.started_at.elapsed()));
let duration_us = duration.map(|d| (d.as_micros().min(u64::MAX as u128)) as u64);
let (gpu_wall_ns, gpu_dispatches, upload_bytes, download_bytes, fusion_hits, fusion_misses) =
finish
.provider
.as_ref()
.map_or((None, None, None, None, None, None), |snapshot| {
(
Some(snapshot.gpu_wall_ns()),
Some(snapshot.gpu_dispatches()),
Some(snapshot.telemetry.upload_bytes),
Some(snapshot.telemetry.download_bytes),
Some(snapshot.telemetry.fusion_cache_hits),
Some(snapshot.telemetry.fusion_cache_misses),
)
});
let gpu_ratio = match (gpu_wall_ns, duration_us) {
(Some(wall_ns), Some(us)) if us > 0 => {
Some(clamp_ratio(wall_ns as f64 / (us as f64 * 1000.0)))
}
_ => None,
};
let fusion_hit_ratio = match (fusion_hits, fusion_misses) {
(Some(h), Some(m)) if h + m > 0 => Some(h as f64 / (h + m) as f64),
_ => None,
};
let mut error = finish.error.map(|mut e| {
if e.len() > 256 {
e.truncate(256);
}
e
});
if error.is_none() {
error = finish.failure.as_ref().map(|f| f.code.clone());
}
let runtime_failure_stage = finish.failure.as_ref().map(|f| f.stage.clone());
let runtime_failure_code = finish.failure.as_ref().map(|f| f.code.clone());
let runtime_failure_has_span = finish.failure.as_ref().map(|f| f.has_span);
let runtime_failure_component = finish.failure.as_ref().and_then(|f| f.component.clone());
let runtime_failure_host = finish.host.map(|h| h.as_str().to_string());
let envelope: RuntimeFinishedEnvelope = RuntimeFinishedEnvelope {
event_label: EVENT_RUNTIME_FINISHED,
uuid: Uuid::new_v4().to_string(),
cid: self.cid.clone(),
session_id: self.session_id.clone(),
os: self.platform.os.clone(),
arch: self.platform.arch.clone(),
release: Some(self.release.clone()),
run_kind: self.run_kind.clone(),
payload: RuntimeFinishedPayload {
duration_us,
success: finish.success,
jit_enabled: self.started_payload.jit_enabled,
jit_used: finish.jit_used,
accelerate_enabled: self.started_payload.accelerate_enabled,
timestamp_ms: Some(unix_timestamp_ms().min(u64::MAX as u128) as u64),
error,
runtime_failure_stage,
runtime_failure_code,
runtime_failure_has_span,
runtime_failure_host,
runtime_failure_component,
counters: finish.counters,
provider: finish.provider,
gpu_wall_ns,
gpu_ratio,
gpu_dispatches,
gpu_upload_bytes: upload_bytes,
gpu_download_bytes: download_bytes,
fusion_cache_hits: fusion_hits,
fusion_cache_misses: fusion_misses,
fusion_hit_ratio,
},
};
if let Some(serialized) = serialize_envelope(&envelope) {
self.sink.emit(serialized);
}
}
}
fn clamp_ratio(value: f64) -> f64 {
if value.is_finite() {
value.clamp(0.0, 1.0)
} else {
0.0
}
}
impl RunMatSession {
pub fn set_telemetry_sink(&mut self, sink: Option<Arc<dyn TelemetrySink>>) {
self.telemetry_sink = sink;
}
pub fn set_telemetry_platform_info(&mut self, platform: TelemetryPlatformInfo) {
self.telemetry_platform = platform;
}
pub fn telemetry_platform_info(&self) -> &TelemetryPlatformInfo {
&self.telemetry_platform
}
pub fn telemetry_run(&self, config: TelemetryRunConfig) -> Option<TelemetryRunGuard> {
if !self.telemetry_consent {
return None;
}
let sink = self.telemetry_sink.as_ref()?.clone();
let platform = TelemetryPlatformInfo {
os: self
.telemetry_platform
.os
.clone()
.or_else(|| Some(std::env::consts::OS.to_string())),
arch: self
.telemetry_platform
.arch
.clone()
.or_else(|| Some(std::env::consts::ARCH.to_string())),
};
let session_id = Uuid::new_v4().to_string();
let started_payload = RuntimeStartedPayload {
jit_enabled: config.jit_enabled,
accelerate_enabled: config.accelerate_enabled,
timestamp_ms: Some(unix_timestamp_ms().min(u64::MAX as u128) as u64),
};
let envelope: RuntimeStartedEnvelope = RuntimeStartedEnvelope {
event_label: EVENT_RUNTIME_STARTED,
uuid: Uuid::new_v4().to_string(),
cid: self.telemetry_client_id.clone(),
session_id: session_id.clone(),
os: platform.os.clone(),
arch: platform.arch.clone(),
release: Some(env!("CARGO_PKG_VERSION").to_string()),
run_kind: config.kind.as_str().to_string(),
payload: started_payload.clone(),
};
if let Some(serialized) = serialize_envelope(&envelope) {
sink.emit(serialized);
}
Some(TelemetryRunGuard {
sink,
cid: self.telemetry_client_id.clone(),
platform,
release: env!("CARGO_PKG_VERSION").to_string(),
session_id,
run_kind: config.kind.as_str().to_string(),
started_at: Instant::now(),
started_payload,
})
}
}