use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use uuid::Uuid;
pub use xybrid_core::device::DeviceProfile;
use xybrid_core::device::{ResourceMonitor, ResourceTelemetryMode, ResourceUsageSummary};
use xybrid_core::event_bus::{EventContext, OrchestratorEvent};
use xybrid_core::execution::listener::{self as execution_listener, ExecutionEvent};
use xybrid_core::http::{CircuitBreaker, CircuitConfig, RetryPolicy};
use xybrid_core::orchestrator::routing_engine::LocalReliabilityHint;
use xybrid_core::tracing as core_tracing;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelemetryEvent {
pub event_type: String,
pub stage_name: Option<String>,
pub target: Option<String>,
pub latency_ms: Option<u32>,
pub error: Option<String>,
pub data: Option<String>,
pub timestamp_ms: u64,
}
pub type TelemetrySender = mpsc::Sender<TelemetryEvent>;
static TELEMETRY_SENDERS: Mutex<Vec<TelemetrySender>> = Mutex::new(Vec::new());
pub const DEFAULT_INGEST_URL: &str = "https://ingest.xybrid.dev";
const MAX_FAILED_QUEUE_SIZE: usize = 1000;
const CONNECT_TIMEOUT_MS: u64 = 5000;
const REQUEST_TIMEOUT_MS: u64 = 10000;
#[derive(Debug, Clone)]
pub struct TelemetryConfig {
pub endpoint: String,
pub api_key: String,
pub session_id: Uuid,
pub device_id: Option<String>,
pub platform: Option<String>,
pub app_version: Option<String>,
pub batch_size: usize,
pub flush_interval_secs: u64,
pub max_retries: u32,
pub enable_retry_queue: bool,
pub device_label: Option<String>,
pub device_profile_override: Option<DeviceProfile>,
pub device_profile_patch: DeviceProfile,
pub auto_hardware_detection: bool,
pub capture_hostname: bool,
#[doc(hidden)]
pub device_id_explicit: bool,
pub resource_telemetry: ResourceTelemetryMode,
}
impl Default for TelemetryConfig {
fn default() -> Self {
let device = crate::device::Device::current();
Self {
endpoint: String::new(),
api_key: String::new(),
session_id: Uuid::new_v4(),
device_id: Some(device.id.clone()),
platform: Some(device.platform.clone()),
app_version: None,
batch_size: 10,
flush_interval_secs: 5,
max_retries: 3,
enable_retry_queue: true,
device_label: None,
device_profile_override: None,
device_profile_patch: DeviceProfile::default(),
auto_hardware_detection: true,
capture_hostname: false,
device_id_explicit: false,
resource_telemetry: ResourceTelemetryMode::Off,
}
}
}
impl TelemetryConfig {
pub fn new(endpoint: impl Into<String>, api_key: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
api_key: api_key.into(),
..Default::default()
}
}
pub fn with_session_id(mut self, session_id: Uuid) -> Self {
self.session_id = session_id;
self
}
pub fn with_device(
mut self,
device_id: impl Into<String>,
platform: impl Into<String>,
) -> Self {
self.device_id = Some(device_id.into());
self.platform = Some(platform.into());
self.device_id_explicit = true;
self
}
pub fn with_platform(mut self, platform: impl Into<String>) -> Self {
self.platform = Some(platform.into());
self
}
pub fn with_app_version(mut self, version: impl Into<String>) -> Self {
self.app_version = Some(version.into());
self
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn with_flush_interval(mut self, secs: u64) -> Self {
self.flush_interval_secs = secs;
self
}
pub fn with_device_label(mut self, label: impl Into<String>) -> Self {
self.device_label = Some(label.into());
self
}
pub fn with_hardware(mut self, profile: DeviceProfile) -> Self {
self.device_profile_override = Some(profile);
self.auto_hardware_detection = false;
self
}
pub fn with_hardware_chip(mut self, chip: impl Into<String>) -> Self {
self.device_profile_patch.chip_family = Some(chip.into());
self
}
pub fn with_hardware_ram_gb(mut self, gb: u32) -> Self {
self.device_profile_patch.ram_gb = Some(gb);
self
}
pub fn with_hardware_os(mut self, os: impl Into<String>, version: impl Into<String>) -> Self {
self.device_profile_patch.os = Some(os.into());
self.device_profile_patch.os_version = Some(version.into());
self
}
pub fn with_hardware_arch(mut self, arch: impl Into<String>) -> Self {
self.device_profile_patch.arch = Some(arch.into());
self
}
pub fn with_device_attribute(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.device_profile_patch
.custom
.insert(key.into(), value.into());
self
}
pub fn with_auto_hardware_detection(mut self, enabled: bool) -> Self {
self.auto_hardware_detection = enabled;
self
}
pub fn with_hostname_capture(mut self, enabled: bool) -> Self {
self.capture_hostname = enabled;
self
}
pub fn with_resource_telemetry(mut self, mode: ResourceTelemetryMode) -> Self {
self.resource_telemetry = mode.normalized();
self
}
}
fn resource_mode_from_env() -> Option<ResourceTelemetryMode> {
let raw = std::env::var("XYBRID_RESOURCE_TELEMETRY").ok()?;
let lower = raw.trim().to_ascii_lowercase();
let (head, interval) = match lower.split_once(':') {
Some((h, t)) => (h, t.parse::<u32>().ok()),
None => (lower.as_str(), None),
};
let default_interval = ResourceTelemetryMode::DEFAULT_SUMMARY_INTERVAL_MS;
let mode = match head {
"off" => ResourceTelemetryMode::Off,
"boundary" => ResourceTelemetryMode::Boundary,
"summary" => ResourceTelemetryMode::Summary {
interval_ms: interval.unwrap_or(default_interval),
},
"debug_local" | "debuglocal" | "debug-local" => ResourceTelemetryMode::DebugLocal {
interval_ms: interval.unwrap_or(default_interval),
},
_ => return None,
};
Some(mode.normalized())
}
#[derive(Debug, Clone, Serialize)]
struct PlatformEvent {
session_id: Uuid,
event_type: String,
payload: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
device_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
device_label: Option<String>,
platform: Option<String>,
app_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
device: Option<DeviceProfile>,
timestamp: Option<String>,
pipeline_id: Option<Uuid>,
trace_id: Option<Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
correlation_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
outcome_category: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
abort_reason: Option<String>,
stages: Option<serde_json::Value>,
}
#[derive(Debug, Serialize)]
struct PlatformEventBatch {
events: Vec<PlatformEvent>,
}
const CONTEXT_PIPELINE_ID_KEY: &str = "__xybrid_pipeline_id";
const CONTEXT_TRACE_ID_KEY: &str = "__xybrid_trace_id";
pub struct HttpTelemetryExporter {
config: TelemetryConfig,
device_profile: Option<DeviceProfile>,
buffer: Arc<Mutex<Vec<TelemetryEvent>>>,
running: Arc<AtomicBool>,
pipeline_id: Arc<RwLock<Option<Uuid>>>,
trace_id: Arc<RwLock<Option<Uuid>>>,
agent: ureq::Agent,
circuit: Arc<CircuitBreaker>,
retry_policy: RetryPolicy,
failed_queue: Arc<Mutex<VecDeque<PlatformEvent>>>,
dropped_count: Arc<AtomicU32>,
}
impl HttpTelemetryExporter {
pub fn new(mut config: TelemetryConfig) -> Self {
let device_profile = resolve_device_profile(&config);
if !config.auto_hardware_detection && !config.device_id_explicit {
config.device_id = None;
}
let agent = ureq::AgentBuilder::new()
.timeout_connect(Duration::from_millis(CONNECT_TIMEOUT_MS))
.timeout(Duration::from_millis(REQUEST_TIMEOUT_MS))
.build();
let circuit = Arc::new(CircuitBreaker::new(CircuitConfig::default()));
let retry_policy = RetryPolicy {
max_attempts: config.max_retries,
initial_delay_ms: 500,
max_delay_ms: 5000,
jitter_factor: 0.3,
};
Self {
config,
device_profile,
buffer: Arc::new(Mutex::new(Vec::new())),
running: Arc::new(AtomicBool::new(false)),
pipeline_id: Arc::new(RwLock::new(None)),
trace_id: Arc::new(RwLock::new(None)),
agent,
circuit,
retry_policy,
failed_queue: Arc::new(Mutex::new(VecDeque::new())),
dropped_count: Arc::new(AtomicU32::new(0)),
}
}
pub fn from_env() -> Option<Self> {
let api_key = std::env::var("XYBRID_API_KEY").ok()?;
let endpoint = std::env::var("XYBRID_INGEST_URL")
.or_else(|_| std::env::var("XYBRID_PLATFORM_URL"))
.unwrap_or_else(|_| DEFAULT_INGEST_URL.to_string());
let config = TelemetryConfig::new(endpoint, api_key);
Some(Self::new(config))
}
pub fn set_pipeline_context(&self, pipeline_id: Option<Uuid>, trace_id: Option<Uuid>) {
if let Ok(mut pid) = self.pipeline_id.write() {
*pid = pipeline_id;
}
if let Ok(mut tid) = self.trace_id.write() {
*tid = trace_id;
}
}
pub fn is_circuit_open(&self) -> bool {
self.circuit.is_open()
}
pub fn reset_circuit(&self) {
self.circuit.reset();
}
pub fn failed_queue_size(&self) -> usize {
self.failed_queue.lock().map(|q| q.len()).unwrap_or(0)
}
pub fn dropped_count(&self) -> u32 {
self.dropped_count.load(Ordering::Relaxed)
}
pub fn start(&self) {
if self.running.swap(true, Ordering::SeqCst) {
return; }
let buffer = Arc::clone(&self.buffer);
let running = Arc::clone(&self.running);
let config = self.config.clone();
let device_profile = self.device_profile.clone();
let flush_interval = Duration::from_secs(config.flush_interval_secs);
let pipeline_id = Arc::clone(&self.pipeline_id);
let trace_id = Arc::clone(&self.trace_id);
let agent = self.agent.clone();
let circuit = Arc::clone(&self.circuit);
let retry_policy = self.retry_policy.clone();
let failed_queue = Arc::clone(&self.failed_queue);
let dropped_count = Arc::clone(&self.dropped_count);
thread::spawn(move || {
while running.load(Ordering::SeqCst) {
thread::sleep(flush_interval);
if config.enable_retry_queue {
retry_failed_events(&failed_queue, &config, &agent, &circuit, &retry_policy);
}
flush_buffer_with_retry(
&buffer,
&config,
device_profile.as_ref(),
&pipeline_id,
&trace_id,
&agent,
&circuit,
&retry_policy,
&failed_queue,
&dropped_count,
);
}
});
}
pub fn stop(&self) {
self.running.store(false, Ordering::SeqCst);
flush_buffer_with_retry(
&self.buffer,
&self.config,
self.device_profile.as_ref(),
&self.pipeline_id,
&self.trace_id,
&self.agent,
&self.circuit,
&self.retry_policy,
&self.failed_queue,
&self.dropped_count,
);
}
pub fn push(&self, event: TelemetryEvent) {
let mut buffer = self.buffer.lock().unwrap();
buffer.push(event);
if buffer.len() >= self.config.batch_size {
let events: Vec<TelemetryEvent> = buffer.drain(..).collect();
drop(buffer); send_batch_with_retry(
&events,
&self.config,
self.device_profile.as_ref(),
&self.pipeline_id,
&self.trace_id,
&self.agent,
&self.circuit,
&self.retry_policy,
&self.failed_queue,
&self.dropped_count,
);
}
}
pub fn flush(&self) {
flush_buffer_with_retry(
&self.buffer,
&self.config,
self.device_profile.as_ref(),
&self.pipeline_id,
&self.trace_id,
&self.agent,
&self.circuit,
&self.retry_policy,
&self.failed_queue,
&self.dropped_count,
);
}
pub fn create_sender(&self) -> TelemetrySender {
let (tx, rx) = mpsc::channel::<TelemetryEvent>();
let buffer = Arc::clone(&self.buffer);
let batch_size = self.config.batch_size;
let config = self.config.clone();
let device_profile = self.device_profile.clone();
let pipeline_id = Arc::clone(&self.pipeline_id);
let trace_id = Arc::clone(&self.trace_id);
let agent = self.agent.clone();
let circuit = Arc::clone(&self.circuit);
let retry_policy = self.retry_policy.clone();
let failed_queue = Arc::clone(&self.failed_queue);
let dropped_count = Arc::clone(&self.dropped_count);
thread::spawn(move || {
for event in rx {
let mut buf = buffer.lock().unwrap();
buf.push(event);
if buf.len() >= batch_size {
let events: Vec<TelemetryEvent> = buf.drain(..).collect();
drop(buf);
send_batch_with_retry(
&events,
&config,
device_profile.as_ref(),
&pipeline_id,
&trace_id,
&agent,
&circuit,
&retry_policy,
&failed_queue,
&dropped_count,
);
}
}
});
tx
}
}
impl Drop for HttpTelemetryExporter {
fn drop(&mut self) {
self.stop();
}
}
fn flush_buffer_with_retry(
buffer: &Arc<Mutex<Vec<TelemetryEvent>>>,
config: &TelemetryConfig,
device_profile: Option<&DeviceProfile>,
pipeline_id: &Arc<RwLock<Option<Uuid>>>,
trace_id: &Arc<RwLock<Option<Uuid>>>,
agent: &ureq::Agent,
circuit: &Arc<CircuitBreaker>,
retry_policy: &RetryPolicy,
failed_queue: &Arc<Mutex<VecDeque<PlatformEvent>>>,
dropped_count: &Arc<AtomicU32>,
) {
let events: Vec<TelemetryEvent> = {
let mut buf = buffer.lock().unwrap();
buf.drain(..).collect()
};
if !events.is_empty() {
send_batch_with_retry(
&events,
config,
device_profile,
pipeline_id,
trace_id,
agent,
circuit,
retry_policy,
failed_queue,
dropped_count,
);
}
}
fn send_batch_with_retry(
events: &[TelemetryEvent],
config: &TelemetryConfig,
device_profile: Option<&DeviceProfile>,
pipeline_id: &Arc<RwLock<Option<Uuid>>>,
trace_id: &Arc<RwLock<Option<Uuid>>>,
agent: &ureq::Agent,
circuit: &Arc<CircuitBreaker>,
retry_policy: &RetryPolicy,
failed_queue: &Arc<Mutex<VecDeque<PlatformEvent>>>,
dropped_count: &Arc<AtomicU32>,
) {
if events.is_empty() || config.endpoint.is_empty() || config.api_key.is_empty() {
return;
}
if !circuit.can_execute() {
if config.enable_retry_queue {
let pid = pipeline_id.read().ok().and_then(|g| *g);
let tid = trace_id.read().ok().and_then(|g| *g);
let platform_events: Vec<PlatformEvent> = events
.iter()
.map(|e| convert_to_platform_event(e, config, device_profile, pid, tid))
.collect();
queue_failed_events(platform_events, failed_queue, dropped_count);
}
return;
}
let pid = pipeline_id.read().ok().and_then(|g| *g);
let tid = trace_id.read().ok().and_then(|g| *g);
let platform_events: Vec<PlatformEvent> = events
.iter()
.map(|e| convert_to_platform_event(e, config, device_profile, pid, tid))
.collect();
let result = send_batch_inner(&platform_events, config, agent, circuit, retry_policy);
if let Err(failed_events) = result {
if config.enable_retry_queue {
queue_failed_events(failed_events, failed_queue, dropped_count);
}
}
}
fn send_batch_inner(
events: &[PlatformEvent],
config: &TelemetryConfig,
agent: &ureq::Agent,
circuit: &Arc<CircuitBreaker>,
retry_policy: &RetryPolicy,
) -> Result<(), Vec<PlatformEvent>> {
let batch = PlatformEventBatch {
events: events.to_vec(),
};
let url = format!("{}/v1/events/batch", config.endpoint.trim_end_matches('/'));
for attempt in 0..retry_policy.max_attempts {
let delay = retry_policy.delay_for_attempt(attempt);
if !delay.is_zero() {
std::thread::sleep(delay);
}
if !circuit.can_execute() {
return Err(events.to_vec());
}
let result = agent
.post(&url)
.set("Authorization", &format!("Bearer {}", config.api_key))
.set("Content-Type", "application/json")
.send_json(&batch);
match result {
Ok(response) => {
let status = response.status();
if status == 200 || status == 201 {
circuit.record_success();
return Ok(());
} else if is_retryable_status(status) {
circuit.record_failure();
} else {
circuit.record_success(); log::warn!(
target: "xybrid_telemetry",
"Platform returned status {}",
status
);
return Ok(()); }
}
Err(ureq::Error::Status(status, _)) => {
if status == 429 {
circuit.record_rate_limited();
} else if is_retryable_status(status) {
circuit.record_failure();
} else {
circuit.record_success();
log::warn!(
target: "xybrid_telemetry",
"Platform returned status {}",
status
);
return Ok(());
}
}
Err(ureq::Error::Transport(_)) => {
circuit.record_failure();
}
}
}
Err(events.to_vec())
}
fn is_retryable_status(status: u16) -> bool {
matches!(status, 429 | 502 | 503 | 504)
}
fn queue_failed_events(
events: Vec<PlatformEvent>,
failed_queue: &Arc<Mutex<VecDeque<PlatformEvent>>>,
dropped_count: &Arc<AtomicU32>,
) {
let mut queue = failed_queue.lock().unwrap();
for event in events {
if queue.len() >= MAX_FAILED_QUEUE_SIZE {
queue.pop_front();
dropped_count.fetch_add(1, Ordering::Relaxed);
}
queue.push_back(event);
}
}
fn retry_failed_events(
failed_queue: &Arc<Mutex<VecDeque<PlatformEvent>>>,
config: &TelemetryConfig,
agent: &ureq::Agent,
circuit: &Arc<CircuitBreaker>,
retry_policy: &RetryPolicy,
) {
if !circuit.can_execute() {
return;
}
let events: Vec<PlatformEvent> = {
let mut queue = failed_queue.lock().unwrap();
let batch_size = config.batch_size.min(queue.len());
queue.drain(..batch_size).collect()
};
if events.is_empty() {
return;
}
if let Err(failed_events) = send_batch_inner(&events, config, agent, circuit, retry_policy) {
let mut queue = failed_queue.lock().unwrap();
for event in failed_events.into_iter().rev() {
queue.push_front(event);
}
}
}
fn convert_to_platform_event(
event: &TelemetryEvent,
config: &TelemetryConfig,
device_profile: Option<&DeviceProfile>,
pipeline_id: Option<Uuid>,
trace_id: Option<Uuid>,
) -> PlatformEvent {
let mut payload = serde_json::json!({});
let mut event_pipeline_id = pipeline_id;
let mut event_trace_id = trace_id;
let mut correlation_id = None;
let mut outcome_category = None;
let mut abort_reason = None;
if let Some(stage) = &event.stage_name {
payload["stage_name"] = serde_json::json!(stage);
}
if let Some(target) = &event.target {
payload["target"] = serde_json::json!(target);
}
if let Some(latency) = event.latency_ms {
payload["latency_ms"] = serde_json::json!(latency);
}
if let Some(error) = &event.error {
payload["error"] = serde_json::json!(error);
payload["status"] = serde_json::json!("error");
} else {
payload["status"] = serde_json::json!("success");
}
if let Some(data) = &event.data {
if let Ok(mut parsed) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(obj) = parsed.as_object_mut() {
if let Some(v) = obj.remove(CONTEXT_PIPELINE_ID_KEY) {
event_pipeline_id = v.as_str().and_then(|s| Uuid::parse_str(s).ok());
}
if let Some(v) = obj.remove(CONTEXT_TRACE_ID_KEY) {
event_trace_id = v.as_str().and_then(|s| Uuid::parse_str(s).ok());
}
}
for key in [
"tokens_in",
"tokens_out",
"model_id",
"cache_read_input_tokens",
"cache_creation_input_tokens",
"resource_summary",
"correlation_id",
"outcome_category",
"abort_reason",
"local_reliability_hint",
"streaming",
]
.iter()
{
if payload.get(*key).is_none() {
if let Some(v) = parsed.get(*key) {
payload[*key] = v.clone();
}
}
}
correlation_id = parsed
.get("correlation_id")
.and_then(serde_json::Value::as_str)
.map(str::to_string);
outcome_category = parsed.get("outcome_category").cloned();
abort_reason = parsed
.get("abort_reason")
.and_then(serde_json::Value::as_str)
.map(str::to_string);
payload["data"] = parsed;
} else {
payload["data"] = serde_json::json!(data);
}
}
let timestamp = chrono::DateTime::from_timestamp_millis(event.timestamp_ms as i64)
.map(|dt| dt.to_rfc3339());
let stages = if matches!(
event.event_type.as_str(),
"PipelineComplete" | "ModelComplete" | "ModelWarmup" | "LocalAborted" | "CloudRetry"
) && core_tracing::is_tracing_enabled()
{
let embedded_spans: Option<serde_json::Value> = payload
.get("data")
.and_then(|d| d.get("spans"))
.filter(|v| !v.is_null())
.cloned();
let spans = if let Some(inner) = embedded_spans {
serde_json::json!({ "spans": inner })
} else {
let s = core_tracing::get_stages_json();
core_tracing::reset_tracing();
s
};
if let Some((tokens_in, tokens_out)) = extract_llm_token_counts(&spans) {
if let Some(n) = tokens_in {
if payload.get("tokens_in").is_none() {
payload["tokens_in"] = serde_json::json!(n);
}
}
if let Some(n) = tokens_out {
if payload.get("tokens_out").is_none() {
payload["tokens_out"] = serde_json::json!(n);
}
}
}
if payload.get("backend").is_none() {
if let Some(v) = extract_string_attr_from_any_span(&spans, "backend") {
payload["backend"] = serde_json::json!(v);
}
}
if payload.get("provider").is_none() {
if let Some(v) = extract_llm_inference_string_attr(&spans, "provider") {
payload["provider"] = serde_json::json!(v);
}
}
if payload.get("task").is_none() {
if let Some(v) = extract_string_attr_from_any_span(&spans, "task") {
payload["task"] = serde_json::json!(v);
}
}
if payload.get("quantization").is_none() {
if let Some(v) = extract_string_attr_from_any_span(&spans, "quantization") {
payload["quantization"] = serde_json::json!(v);
}
}
if payload.get("execution_provider").is_none() {
if let Some(v) = extract_string_attr_from_any_span(&spans, "execution_provider") {
payload["execution_provider"] = serde_json::json!(v);
}
}
if payload.get("prompt_cached_tokens").is_none() {
if let Some(n) = extract_llm_prompt_cached_tokens(&spans) {
payload["prompt_cached_tokens"] = serde_json::json!(n);
}
}
Some(spans)
} else {
None
};
PlatformEvent {
session_id: config.session_id,
event_type: event.event_type.clone(),
payload,
device_id: config.device_id.clone(),
device_label: config.device_label.clone(),
platform: config.platform.clone(),
app_version: config.app_version.clone(),
device: device_profile.cloned(),
timestamp,
pipeline_id: event_pipeline_id,
trace_id: event_trace_id,
correlation_id,
outcome_category,
abort_reason,
stages,
}
}
fn resolve_device_profile(config: &TelemetryConfig) -> Option<DeviceProfile> {
let mut profile = if config.auto_hardware_detection {
DeviceProfile::detect()
} else {
DeviceProfile::default()
};
profile = profile.merged_with(config.device_profile_patch.clone());
if let Some(override_) = config.device_profile_override.clone() {
profile = profile.merged_with(override_);
}
if config.capture_hostname && profile.hostname.is_none() {
profile.hostname = detect_hostname();
}
if profile.is_empty() {
None
} else {
Some(profile)
}
}
fn detect_hostname() -> Option<String> {
std::env::var("HOSTNAME")
.or_else(|_| std::env::var("COMPUTERNAME"))
.ok()
.map(|hostname| hostname.trim().to_string())
.filter(|hostname| !hostname.is_empty())
}
fn extract_llm_token_counts(stages: &serde_json::Value) -> Option<(Option<u64>, Option<u64>)> {
let spans = stages.get("spans")?.as_array()?;
let read = |meta: Option<&serde_json::Value>, keys: &[&str]| -> Option<u64> {
for k in keys {
let Some(v) = meta.and_then(|m| m.get(*k)) else {
continue;
};
if let Some(n) = v.as_u64() {
return Some(n);
}
if let Some(s) = v.as_str() {
if let Ok(n) = s.parse::<u64>() {
return Some(n);
}
}
}
None
};
let mut saw_llm_span = false;
let mut tokens_in: Option<u64> = None;
let mut tokens_out: Option<u64> = None;
for span in spans {
let name = span.get("name").and_then(|v| v.as_str()).unwrap_or("");
let meta = span.get("metadata");
let is_llm_span = name.starts_with("llm_inference")
|| name.starts_with("inference:")
|| meta
.map(|m| {
m.get("ttft_ms").is_some()
|| m.get("tokens_generated").is_some()
|| m.get("tokens_out").is_some()
|| m.get("completion_tokens").is_some()
})
.unwrap_or(false);
if !is_llm_span {
continue;
}
saw_llm_span = true;
if let Some(v) = read(meta, &["tokens_in", "prompt_tokens"]) {
tokens_in = Some(v);
}
if let Some(v) = read(
meta,
&["tokens_out", "completion_tokens", "tokens_generated"],
) {
tokens_out = Some(v);
}
}
if saw_llm_span {
Some((tokens_in, tokens_out))
} else {
None
}
}
fn extract_llm_prompt_cached_tokens(stages: &serde_json::Value) -> Option<u64> {
let spans = stages.get("spans")?.as_array()?;
let mut latest: Option<u64> = None;
for span in spans {
let name = span.get("name").and_then(|v| v.as_str()).unwrap_or("");
let meta = span.get("metadata");
let is_llm_span = name.starts_with("llm_inference")
|| name.starts_with("inference:")
|| meta
.map(|m| {
m.get("ttft_ms").is_some()
|| m.get("tokens_generated").is_some()
|| m.get("tokens_out").is_some()
|| m.get("completion_tokens").is_some()
})
.unwrap_or(false);
if !is_llm_span {
continue;
}
let Some(v) = meta.and_then(|m| m.get("prompt_cached_tokens")) else {
continue;
};
if let Some(n) = v.as_u64() {
latest = Some(n);
} else if let Some(s) = v.as_str() {
if let Ok(n) = s.parse::<u64>() {
latest = Some(n);
}
}
}
latest
}
fn extract_llm_inference_string_attr(stages: &serde_json::Value, key: &str) -> Option<String> {
let spans = stages.get("spans")?.as_array()?;
let mut found: Option<String> = None;
for span in spans {
let name = span.get("name").and_then(|v| v.as_str()).unwrap_or("");
let meta = span.get("metadata");
let is_llm_span = name.starts_with("llm_inference")
|| name.starts_with("inference:")
|| meta
.map(|m| {
m.get("ttft_ms").is_some()
|| m.get("tokens_generated").is_some()
|| m.get("tokens_out").is_some()
|| m.get("completion_tokens").is_some()
})
.unwrap_or(false);
if !is_llm_span {
continue;
}
if let Some(v) = meta.and_then(|m| m.get(key)).and_then(|v| v.as_str()) {
found = Some(v.to_string());
}
}
found
}
fn extract_string_attr_from_any_span(stages: &serde_json::Value, key: &str) -> Option<String> {
let spans = stages.get("spans")?.as_array()?;
let mut found: Option<String> = None;
for span in spans {
let meta = span.get("metadata");
if let Some(v) = meta.and_then(|m| m.get(key)).and_then(|v| v.as_str()) {
found = Some(v.to_string());
}
}
found
}
static PLATFORM_EXPORTER: RwLock<Option<HttpTelemetryExporter>> = RwLock::new(None);
pub fn init_platform_telemetry(mut config: TelemetryConfig) {
core_tracing::init_tracing(true);
config.resource_telemetry = resolve_resource_telemetry_mode(config.resource_telemetry);
activate_resource_telemetry(config.resource_telemetry);
register_execution_listener();
let exporter = HttpTelemetryExporter::new(config);
exporter.start();
let sender = exporter.create_sender();
register_telemetry_sender(sender);
if let Ok(mut global) = PLATFORM_EXPORTER.write() {
*global = Some(exporter);
}
}
static RESOURCE_TELEMETRY_MODE: RwLock<ResourceTelemetryMode> =
RwLock::new(ResourceTelemetryMode::Off);
fn set_resource_telemetry_mode(mode: ResourceTelemetryMode) {
if let Ok(mut guard) = RESOURCE_TELEMETRY_MODE.write() {
*guard = mode;
}
}
fn resolve_resource_telemetry_mode(configured: ResourceTelemetryMode) -> ResourceTelemetryMode {
match resource_mode_from_env() {
Some(env_mode) => env_mode,
None => configured,
}
}
fn activate_resource_telemetry(mode: ResourceTelemetryMode) {
if !mode.is_off() {
ResourceMonitor::global().prewarm();
}
set_resource_telemetry_mode(mode);
}
pub fn resource_telemetry_mode() -> ResourceTelemetryMode {
match RESOURCE_TELEMETRY_MODE.read() {
Ok(g) => *g,
Err(poisoned) => *poisoned.into_inner(),
}
}
pub fn begin_resource_run() -> xybrid_core::device::RunGuard {
ResourceMonitor::global().begin_run(resource_telemetry_mode())
}
pub fn attach_resource_summary(event: &mut TelemetryEvent, summary: Option<ResourceUsageSummary>) {
let Some(summary) = summary else {
return;
};
let Ok(summary_json) = serde_json::to_value(&summary) else {
return;
};
let mut parsed: serde_json::Value = match event.data.as_deref() {
Some(s) if !s.is_empty() => match serde_json::from_str(s) {
Ok(v) => v,
Err(_) => return,
},
_ => serde_json::json!({}),
};
if let Some(obj) = parsed.as_object_mut() {
obj.insert("resource_summary".to_string(), summary_json);
event.data = serde_json::to_string(&parsed).ok();
}
}
pub fn publish_with_resource_summary(
mut event: TelemetryEvent,
guard: xybrid_core::device::RunGuard,
) {
attach_resource_summary(&mut event, guard.finish());
publish_telemetry_event(event);
}
pub(crate) fn publish_with_resource_summary_in_context(
mut event: TelemetryEvent,
guard: xybrid_core::device::RunGuard,
pipeline_id: Option<Uuid>,
trace_id: Option<Uuid>,
) {
attach_resource_summary(&mut event, guard.finish());
publish_telemetry_event_in_context(event, pipeline_id, trace_id);
}
pub fn local_aborted_event(
correlation_id: &str,
model_id: &str,
abort_reason: xybrid_core::abort::AbortReason,
latency_ms: u32,
tokens_emitted: u32,
) -> TelemetryEvent {
local_aborted_event_with_details(
correlation_id,
model_id,
abort_reason,
latency_ms,
tokens_emitted,
None,
None,
)
}
pub fn local_aborted_event_with_details(
correlation_id: &str,
model_id: &str,
abort_reason: xybrid_core::abort::AbortReason,
latency_ms: u32,
tokens_emitted: u32,
resource_summary: Option<ResourceUsageSummary>,
local_reliability_hint: Option<LocalReliabilityHint>,
) -> TelemetryEvent {
let mut data = serde_json::json!({
"model_id": model_id,
"correlation_id": correlation_id,
"outcome_category": {
"kind": "aborted_for_cloud_fallback",
"reason": abort_reason.as_str(),
},
"abort_reason": abort_reason.as_str(),
"tokens_emitted": tokens_emitted,
});
if let Some(obj) = data.as_object_mut() {
if let Some(summary) =
resource_summary.and_then(|summary| serde_json::to_value(summary).ok())
{
obj.insert("resource_summary".to_string(), summary);
}
if let Some(hint) = local_reliability_hint {
obj.insert(
"local_reliability_hint".to_string(),
serde_json::json!({
"recent_abort_rate": hint.recent_abort_rate,
"sample_size": hint.sample_size,
}),
);
}
}
TelemetryEvent {
event_type: "LocalAborted".to_string(),
stage_name: Some(model_id.to_string()),
target: Some("local".to_string()),
latency_ms: Some(latency_ms),
error: None,
data: Some(data.to_string()),
timestamp_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
}
}
pub fn publish_local_aborted(
correlation_id: &str,
model_id: &str,
abort_reason: xybrid_core::abort::AbortReason,
latency_ms: u32,
tokens_emitted: u32,
) {
publish_telemetry_event(local_aborted_event(
correlation_id,
model_id,
abort_reason,
latency_ms,
tokens_emitted,
));
}
pub(crate) fn publish_local_aborted_with_details(
correlation_id: &str,
model_id: &str,
abort_reason: xybrid_core::abort::AbortReason,
latency_ms: u32,
tokens_emitted: u32,
resource_summary: Option<ResourceUsageSummary>,
local_reliability_hint: Option<LocalReliabilityHint>,
) {
publish_telemetry_event(local_aborted_event_with_details(
correlation_id,
model_id,
abort_reason,
latency_ms,
tokens_emitted,
resource_summary,
local_reliability_hint,
));
}
pub(crate) fn redact_error_for_telemetry(message: &str) -> String {
let redacted = [
"Authorization: Bearer ",
"authorization: Bearer ",
"Bearer ",
"api_key=",
"api-key=",
"x-api-key: ",
"X-API-Key: ",
]
.iter()
.fold(message.to_string(), |current, marker| {
redact_value_after_marker(¤t, marker)
});
redact_secret_like_tokens(&redacted)
}
fn redact_value_after_marker(input: &str, marker: &str) -> String {
let mut output = String::with_capacity(input.len());
let mut rest = input;
while let Some(index) = rest.find(marker) {
let marker_end = index + marker.len();
output.push_str(&rest[..marker_end]);
rest = &rest[marker_end..];
let value_len = rest
.find(|c: char| c.is_whitespace() || matches!(c, ',' | ';' | ')' | ']'))
.unwrap_or(rest.len());
output.push_str("[REDACTED]");
rest = &rest[value_len..];
}
output.push_str(rest);
output
}
fn redact_secret_like_tokens(input: &str) -> String {
input
.split_whitespace()
.map(redact_secret_like_token)
.collect::<Vec<_>>()
.join(" ")
}
fn redact_secret_like_token(token: &str) -> String {
let trimmed =
token.trim_matches(|c: char| !(c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.')));
if trimmed.starts_with("sk_")
|| trimmed.starts_with("sk-")
|| trimmed.starts_with("hf_")
|| trimmed.starts_with("ghp_")
|| trimmed.starts_with("gho_")
|| trimmed.starts_with("xoxb-")
|| trimmed.starts_with("xoxp-")
{
token.replacen(trimmed, "[REDACTED]", 1)
} else {
token.to_string()
}
}
pub fn cloud_denied_by_policy_event(
correlation_id: &str,
model_id: &str,
abort_reason: xybrid_core::abort::AbortReason,
policy_reason: &str,
latency_ms: u32,
) -> TelemetryEvent {
let policy_reason = redact_error_for_telemetry(policy_reason);
TelemetryEvent {
event_type: "LocalFailed".to_string(),
stage_name: Some(model_id.to_string()),
target: Some("local".to_string()),
latency_ms: Some(latency_ms),
error: Some(format!("cloud_denied_by_policy: {}", policy_reason)),
data: Some(
serde_json::json!({
"model_id": model_id,
"correlation_id": correlation_id,
"outcome_category": {
"kind": "hard_fail",
"reason": "cloud_denied_by_policy",
},
"terminal_state_tag": {
"kind": "cloud_denied_by_policy",
"abort_reason": abort_reason.as_str(),
},
"abort_reason": abort_reason.as_str(),
"policy_reason": policy_reason,
"status": "error",
})
.to_string(),
),
timestamp_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
}
}
pub fn publish_cloud_denied_by_policy(
correlation_id: &str,
model_id: &str,
abort_reason: xybrid_core::abort::AbortReason,
policy_reason: &str,
latency_ms: u32,
) {
publish_telemetry_event(cloud_denied_by_policy_event(
correlation_id,
model_id,
abort_reason,
policy_reason,
latency_ms,
));
}
pub fn cloud_retry_event(
correlation_id: &str,
model_id: &str,
provider: Option<&str>,
latency_ms: u32,
tokens_emitted: u32,
error: Option<&str>,
) -> TelemetryEvent {
let provider_value = provider.unwrap_or("xybrid");
let redacted_error = error.map(redact_error_for_telemetry);
let (status, outcome_category) = if let Some(reason) = redacted_error.as_deref() {
(
"error",
serde_json::json!({
"kind": "hard_fail",
"reason": reason,
}),
)
} else {
("ok", serde_json::json!("cloud_success"))
};
TelemetryEvent {
event_type: "CloudRetry".to_string(),
stage_name: Some(model_id.to_string()),
target: Some("cloud".to_string()),
latency_ms: Some(latency_ms),
error: redacted_error,
data: Some(
serde_json::json!({
"model_id": model_id,
"correlation_id": correlation_id,
"provider": provider_value,
"outcome_category": outcome_category,
"tokens_emitted": tokens_emitted,
"status": status,
})
.to_string(),
),
timestamp_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
}
}
pub fn publish_cloud_retry(
correlation_id: &str,
model_id: &str,
provider: Option<&str>,
latency_ms: u32,
tokens_emitted: u32,
error: Option<&str>,
) {
publish_telemetry_event(cloud_retry_event(
correlation_id,
model_id,
provider,
latency_ms,
tokens_emitted,
error,
));
}
pub fn init_platform_telemetry_from_env() -> bool {
if let Some(exporter) = HttpTelemetryExporter::from_env() {
core_tracing::init_tracing(true);
let resource_mode = resolve_resource_telemetry_mode(ResourceTelemetryMode::Off);
activate_resource_telemetry(resource_mode);
register_execution_listener();
exporter.start();
let sender = exporter.create_sender();
register_telemetry_sender(sender);
if let Ok(mut global) = PLATFORM_EXPORTER.write() {
*global = Some(exporter);
}
true
} else {
false
}
}
pub fn set_telemetry_pipeline_context(pipeline_id: Option<Uuid>, trace_id: Option<Uuid>) {
let mut event_context = xybrid_core::event_bus::EventContext::current();
event_context.pipeline_id = pipeline_id;
event_context.trace_id = trace_id;
if event_context.is_empty() {
xybrid_core::event_bus::clear_current_event_context();
} else {
xybrid_core::event_bus::set_current_event_context(event_context);
}
if let Ok(exporter) = PLATFORM_EXPORTER.read() {
if let Some(exp) = exporter.as_ref() {
exp.set_pipeline_context(pipeline_id, trace_id);
}
}
}
pub(crate) struct TelemetryPipelineContextGuard {
previous_pipeline_id: Option<Uuid>,
previous_trace_id: Option<Uuid>,
}
impl TelemetryPipelineContextGuard {
pub(crate) fn install(pipeline_id: Option<Uuid>, trace_id: Option<Uuid>) -> Self {
let (previous_pipeline_id, previous_trace_id) = current_telemetry_pipeline_context();
set_telemetry_pipeline_context(pipeline_id, trace_id);
Self {
previous_pipeline_id,
previous_trace_id,
}
}
}
impl Drop for TelemetryPipelineContextGuard {
fn drop(&mut self) {
set_telemetry_pipeline_context(self.previous_pipeline_id, self.previous_trace_id);
}
}
fn register_execution_listener() {
execution_listener::set_execution_listener(|event| {
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let telemetry_event = match event {
ExecutionEvent::Started { model_id, method } => TelemetryEvent {
event_type: "ExecutionStarted".to_string(),
stage_name: Some(method),
target: Some("device".to_string()),
latency_ms: None,
error: None,
data: Some(serde_json::json!({ "model": model_id }).to_string()),
timestamp_ms,
},
ExecutionEvent::Completed {
model_id,
method,
latency_ms,
} => TelemetryEvent {
event_type: "ExecutionCompleted".to_string(),
stage_name: Some(method),
target: Some("device".to_string()),
latency_ms: Some(latency_ms as u32),
error: None,
data: Some(serde_json::json!({ "model": model_id }).to_string()),
timestamp_ms,
},
ExecutionEvent::Failed {
model_id,
method,
latency_ms,
error,
} => TelemetryEvent {
event_type: "ExecutionFailed".to_string(),
stage_name: Some(model_id.clone()),
target: Some("device".to_string()),
latency_ms: Some(latency_ms as u32),
error: Some(error),
data: Some(
serde_json::json!({
"model": model_id,
"method": method,
})
.to_string(),
),
timestamp_ms,
},
};
publish_telemetry_event(telemetry_event);
});
}
pub fn flush_platform_telemetry() {
if let Ok(exporter) = PLATFORM_EXPORTER.read() {
if let Some(exp) = exporter.as_ref() {
exp.flush();
}
}
}
pub fn shutdown_platform_telemetry() {
core_tracing::init_tracing(false);
execution_listener::clear_execution_listener();
if let Ok(mut exporter) = PLATFORM_EXPORTER.write() {
if let Some(exp) = exporter.take() {
exp.stop();
}
}
}
pub fn register_telemetry_sender(sender: TelemetrySender) {
if let Ok(mut senders) = TELEMETRY_SENDERS.lock() {
senders.push(sender);
}
}
pub fn convert_orchestrator_event(event: &OrchestratorEvent) -> TelemetryEvent {
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let context = event.context().clone();
let telemetry_event = match event {
OrchestratorEvent::PipelineStart { stages, .. } => TelemetryEvent {
event_type: "PipelineStart".to_string(),
stage_name: None,
target: None,
latency_ms: None,
error: None,
data: Some(serde_json::json!({"stages": stages}).to_string()),
timestamp_ms,
},
OrchestratorEvent::PipelineComplete {
total_latency_ms, ..
} => TelemetryEvent {
event_type: "PipelineComplete".to_string(),
stage_name: None,
target: None,
latency_ms: Some(*total_latency_ms),
error: None,
data: None,
timestamp_ms,
},
OrchestratorEvent::StageStart { stage_name, .. } => TelemetryEvent {
event_type: "StageStart".to_string(),
stage_name: Some(stage_name.clone()),
target: None,
latency_ms: None,
error: None,
data: None,
timestamp_ms,
},
OrchestratorEvent::StageComplete {
stage_name,
target,
latency_ms,
..
} => TelemetryEvent {
event_type: "StageComplete".to_string(),
stage_name: Some(stage_name.clone()),
target: Some(target.clone()),
latency_ms: Some(*latency_ms),
error: None,
data: None,
timestamp_ms,
},
OrchestratorEvent::StageError {
stage_name, error, ..
} => TelemetryEvent {
event_type: "StageError".to_string(),
stage_name: Some(stage_name.clone()),
target: None,
latency_ms: None,
error: Some(error.clone()),
data: None,
timestamp_ms,
},
OrchestratorEvent::RoutingDecided {
stage_name,
target,
reason,
recent_abort_rate,
sample_size,
..
} => TelemetryEvent {
event_type: "RoutingDecided".to_string(),
stage_name: Some(stage_name.clone()),
target: Some(target.clone()),
latency_ms: None,
error: None,
data: Some(
serde_json::json!({
"reason": reason,
"local_reliability_hint": {
"recent_abort_rate": if recent_abort_rate.is_finite() {
*recent_abort_rate
} else {
0.0_f32
},
"sample_size": sample_size,
},
})
.to_string(),
),
timestamp_ms,
},
OrchestratorEvent::ExecutionStarted {
stage_name, target, ..
} => TelemetryEvent {
event_type: "ExecutionStarted".to_string(),
stage_name: Some(stage_name.clone()),
target: Some(target.clone()),
latency_ms: None,
error: None,
data: None,
timestamp_ms,
},
OrchestratorEvent::ExecutionCompleted {
stage_name,
target,
execution_time_ms,
..
} => TelemetryEvent {
event_type: "ExecutionCompleted".to_string(),
stage_name: Some(stage_name.clone()),
target: Some(target.clone()),
latency_ms: Some(*execution_time_ms),
error: None,
data: None,
timestamp_ms,
},
OrchestratorEvent::ExecutionFailed {
stage_name,
target,
error,
..
} => TelemetryEvent {
event_type: "ExecutionFailed".to_string(),
stage_name: Some(stage_name.clone()),
target: Some(target.clone()),
latency_ms: None,
error: Some(error.clone()),
data: None,
timestamp_ms,
},
OrchestratorEvent::PolicyEvaluated {
stage_name,
allowed,
reason,
..
} => TelemetryEvent {
event_type: "PolicyEvaluated".to_string(),
stage_name: Some(stage_name.clone()),
target: None,
latency_ms: None,
error: if *allowed {
None
} else {
reason.clone().or(Some("Policy violation".to_string()))
},
data: Some(
serde_json::json!({
"allowed": allowed,
"reason": reason
})
.to_string(),
),
timestamp_ms,
},
OrchestratorEvent::LocalAborted {
stage_name,
target,
reason,
..
} => TelemetryEvent {
event_type: "LocalAborted".to_string(),
stage_name: Some(stage_name.clone()),
target: Some(target.clone()),
latency_ms: None,
error: Some(reason.clone()),
data: Some(serde_json::json!({ "reason": reason }).to_string()),
timestamp_ms,
},
_ => TelemetryEvent {
event_type: format!("{:?}", event),
stage_name: None,
target: None,
latency_ms: None,
error: None,
data: Some(format!("{:?}", event)),
timestamp_ms,
},
};
attach_event_context(telemetry_event, &context)
}
fn attach_event_context(event: TelemetryEvent, context: &EventContext) -> TelemetryEvent {
if context.is_empty() {
return event;
}
let mut data = match event.data.as_ref() {
Some(raw) => match serde_json::from_str::<serde_json::Value>(raw) {
Ok(value) if value.is_object() => value,
Ok(value) => serde_json::json!({ "value": value }),
Err(_) => serde_json::json!({ "value": raw }),
},
None => serde_json::json!({}),
};
if let Some(obj) = data.as_object_mut() {
if let Some(id) = context.pipeline_id {
obj.entry(CONTEXT_PIPELINE_ID_KEY.to_string())
.or_insert_with(|| serde_json::json!(id.to_string()));
}
if let Some(id) = context.trace_id {
obj.entry(CONTEXT_TRACE_ID_KEY.to_string())
.or_insert_with(|| serde_json::json!(id.to_string()));
}
if let Some(id) = context.correlation_id.as_ref() {
obj.entry("correlation_id".to_string())
.or_insert_with(|| serde_json::json!(id));
}
if let Some(id) = context.request_id.as_ref() {
obj.entry("request_id".to_string())
.or_insert_with(|| serde_json::json!(id));
}
if let Some(id) = context.model_id.as_ref() {
obj.entry("model_id".to_string())
.or_insert_with(|| serde_json::json!(id));
}
if let Some(id) = context.span_id.as_ref() {
obj.entry("span_id".to_string())
.or_insert_with(|| serde_json::json!(id));
}
}
TelemetryEvent {
data: Some(data.to_string()),
..event
}
}
fn snapshot_spans_into_event(event: TelemetryEvent) -> TelemetryEvent {
let is_span_bearing = matches!(
event.event_type.as_str(),
"PipelineComplete" | "ModelComplete" | "ModelWarmup" | "LocalAborted" | "CloudRetry"
);
if !is_span_bearing || !core_tracing::is_tracing_enabled() {
return event;
}
let data_already_has_spans = event
.data
.as_ref()
.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
.and_then(|v| v.get("spans").cloned())
.filter(|v| !v.is_null())
.is_some();
if data_already_has_spans {
return event;
}
let captured = core_tracing::get_stages_json();
core_tracing::reset_tracing();
let mut merged: serde_json::Value = event
.data
.as_ref()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or_else(|| serde_json::json!({}));
if !merged.is_object() {
merged = serde_json::json!({});
}
if let Some(spans) = captured.get("spans") {
merged["spans"] = spans.clone();
}
TelemetryEvent {
data: Some(merged.to_string()),
..event
}
}
fn current_telemetry_pipeline_context() -> (Option<Uuid>, Option<Uuid>) {
if let Ok(exporter) = PLATFORM_EXPORTER.read() {
if let Some(exp) = exporter.as_ref() {
let pipeline_id = exp.pipeline_id.read().ok().and_then(|g| *g);
let trace_id = exp.trace_id.read().ok().and_then(|g| *g);
return (pipeline_id, trace_id);
}
}
(None, None)
}
fn snapshot_context_into_event(event: TelemetryEvent) -> TelemetryEvent {
let (pipeline_id, trace_id) = current_telemetry_pipeline_context();
snapshot_context_into_event_with(event, pipeline_id, trace_id)
}
fn snapshot_context_into_event_with(
event: TelemetryEvent,
pipeline_id: Option<Uuid>,
trace_id: Option<Uuid>,
) -> TelemetryEvent {
if pipeline_id.is_none() && trace_id.is_none() {
return event;
}
let mut data = match event.data.as_ref() {
Some(raw) => match serde_json::from_str::<serde_json::Value>(raw) {
Ok(value) if value.is_object() => value,
Ok(value) => serde_json::json!({ "value": value }),
Err(_) => serde_json::json!({ "value": raw }),
},
None => serde_json::json!({}),
};
if let Some(obj) = data.as_object_mut() {
if let Some(id) = pipeline_id {
obj.insert(CONTEXT_PIPELINE_ID_KEY.to_string(), serde_json::json!(id));
}
if let Some(id) = trace_id {
obj.insert(CONTEXT_TRACE_ID_KEY.to_string(), serde_json::json!(id));
}
}
TelemetryEvent {
data: Some(data.to_string()),
..event
}
}
fn build_model_download_event(
model_id: &str,
bytes_downloaded: u64,
source: &str,
duration_ms: u32,
) -> TelemetryEvent {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let data = serde_json::json!({
"model_id": model_id,
"bytes_downloaded": bytes_downloaded,
"source": source,
"duration_ms": duration_ms,
});
TelemetryEvent {
event_type: "ModelDownload".to_string(),
stage_name: None,
target: None,
latency_ms: Some(duration_ms),
error: None,
data: Some(data.to_string()),
timestamp_ms: now_ms,
}
}
pub fn publish_model_download(
model_id: &str,
bytes_downloaded: u64,
source: &str,
duration_ms: u32,
) {
if crate::telemetry_optout::is_telemetry_opted_out() {
return;
}
let event = build_model_download_event(model_id, bytes_downloaded, source, duration_ms);
publish_telemetry_event(event);
}
pub fn publish_telemetry_event(event: TelemetryEvent) {
let event = snapshot_spans_into_event(event);
let event = snapshot_context_into_event(event);
dispatch_telemetry_event(event);
}
pub(crate) fn publish_telemetry_event_in_context(
event: TelemetryEvent,
pipeline_id: Option<Uuid>,
trace_id: Option<Uuid>,
) {
let event = snapshot_spans_into_event(event);
let event = snapshot_context_into_event_with(event, pipeline_id, trace_id);
dispatch_telemetry_event(event);
}
fn dispatch_telemetry_event(event: TelemetryEvent) {
let Ok(senders) = TELEMETRY_SENDERS.lock() else {
return;
};
let mut dead_senders = Vec::new();
for (idx, sender) in senders.iter().enumerate() {
if sender.send(event.clone()).is_err() {
dead_senders.push(idx);
}
}
drop(senders);
if !dead_senders.is_empty() {
if let Ok(mut senders) = TELEMETRY_SENDERS.lock() {
for idx in dead_senders.iter().rev() {
senders.remove(*idx);
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum BridgeError {
#[error("orchestrator event bridge is no longer running")]
Stopped,
#[error("orchestrator event bridge flush acknowledgement was dropped")]
FlushAckDropped,
#[error("orchestrator event bridge thread panicked")]
ThreadPanicked,
}
enum BridgeCommand {
Flush(mpsc::Sender<()>),
}
#[must_use = "BridgeHandle must be flushed or joined, otherwise queued orchestrator telemetry can be dropped"]
pub struct BridgeHandle {
join_handle: thread::JoinHandle<()>,
command_tx: mpsc::Sender<BridgeCommand>,
}
impl BridgeHandle {
pub fn flush(&self) -> Result<(), BridgeError> {
let (ack_tx, ack_rx) = mpsc::channel();
self.command_tx
.send(BridgeCommand::Flush(ack_tx))
.map_err(|_| BridgeError::Stopped)?;
ack_rx.recv().map_err(|_| BridgeError::FlushAckDropped)
}
pub fn drain(&self) {
let _ = self.flush();
}
pub fn join(self) -> Result<(), BridgeError> {
let Self {
join_handle,
command_tx,
} = self;
drop(command_tx);
join_handle.join().map_err(|_| BridgeError::ThreadPanicked)
}
}
pub type OrchestratorEventBridge = BridgeHandle;
const BRIDGE_POLL_INTERVAL: Duration = Duration::from_millis(1);
pub fn bridge_orchestrator_events(
orchestrator: &xybrid_core::orchestrator::Orchestrator,
) -> BridgeHandle {
let event_bus = orchestrator.event_bus();
let subscription = event_bus.subscribe();
let (command_tx, command_rx) = mpsc::channel();
let join_handle = thread::spawn(move || bridge_loop(subscription, command_rx));
BridgeHandle {
join_handle,
command_tx,
}
}
pub fn subscribe_orchestrator_events(
orchestrator: &xybrid_core::orchestrator::Orchestrator,
) -> BridgeHandle {
bridge_orchestrator_events(orchestrator)
}
pub(crate) fn subscribe_orchestrator_events_in_context(
orchestrator: &xybrid_core::orchestrator::Orchestrator,
_pipeline_id: Option<Uuid>,
_trace_id: Option<Uuid>,
) -> BridgeHandle {
bridge_orchestrator_events(orchestrator)
}
fn bridge_loop(
subscription: xybrid_core::event_bus::Subscription,
command_rx: mpsc::Receiver<BridgeCommand>,
) {
loop {
drain_bridge_commands(&subscription, &command_rx);
match subscription.recv_timeout(BRIDGE_POLL_INTERVAL) {
Ok(event) => publish_orchestrator_event(event),
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => {
drain_available_orchestrator_events(&subscription);
drain_bridge_commands(&subscription, &command_rx);
break;
}
}
}
}
fn drain_bridge_commands(
subscription: &xybrid_core::event_bus::Subscription,
command_rx: &mpsc::Receiver<BridgeCommand>,
) {
while let Ok(command) = command_rx.try_recv() {
match command {
BridgeCommand::Flush(ack_tx) => {
drain_available_orchestrator_events(subscription);
let _ = ack_tx.send(());
}
}
}
}
fn drain_available_orchestrator_events(subscription: &xybrid_core::event_bus::Subscription) {
loop {
match subscription.try_recv() {
Ok(event) => publish_orchestrator_event(event),
Err(mpsc::TryRecvError::Empty | mpsc::TryRecvError::Disconnected) => break,
}
}
}
fn orchestrator_event_is_wire_noise(event: &OrchestratorEvent) -> bool {
matches!(
event,
OrchestratorEvent::PipelineStart { .. }
| OrchestratorEvent::PipelineComplete { .. }
| OrchestratorEvent::StageStart { .. }
| OrchestratorEvent::StageComplete { .. }
| OrchestratorEvent::ExecutionStarted { .. }
| OrchestratorEvent::ExecutionCompleted { .. }
)
}
fn publish_orchestrator_event(event: OrchestratorEvent) {
if orchestrator_event_is_wire_noise(&event) {
return;
}
let telemetry_event = convert_orchestrator_event(&event);
publish_telemetry_event(telemetry_event);
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::MutexGuard;
static TELEMETRY_SENDER_TEST_LOCK: Mutex<()> = Mutex::new(());
#[test]
fn platform_event_payload_has_no_legacy_cache_keys() {
let data = serde_json::json!({
"model_id": "deepseek-chat",
"tokens_in": 1000,
"tokens_out": 120,
"cost_usd": 0.00123,
"cache_read_input_tokens": 800,
"cache_read_cost_usd": 0.000056,
"uncached_input_cost_usd": 0.000054,
});
let event = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: Some("cloud_synthesize".to_string()),
target: Some("cloud".to_string()),
latency_ms: Some(420),
error: None,
data: Some(data.to_string()),
timestamp_ms: 1_700_000_000_000,
};
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform_event = convert_to_platform_event(&event, &config, None, None, None);
let payload_json = serde_json::to_string(&platform_event.payload).unwrap();
let forbidden = [
format!("cache{}hit{}tokens", "_", "_"),
format!("cache{}miss{}tokens", "_", "_"),
format!("cache{}hit{}cost{}usd", "_", "_", "_"),
format!("cache{}miss{}cost{}usd", "_", "_", "_"),
];
for key in &forbidden {
assert!(
!payload_json.contains(key),
"platform event payload leaked legacy key {key}: {payload_json}"
);
}
assert!(payload_json.contains("cache_read_input_tokens"));
}
#[test]
fn resource_summary_attaches_and_hoists_through_convert() {
let summary = ResourceUsageSummary {
cpu_avg_pct: Some(34.1),
cpu_peak_pct: Some(62.5),
process_rss_peak_mb: Some(712),
available_mem_min_mb: Some(4180),
memory_pressure_peak: xybrid_core::device::MemoryPressure::Normal,
thermal_state_peak: xybrid_core::device::ThermalState::Normal,
battery_pct_end: Some(72),
sample_count: 4,
sampling_mode: "summary".to_string(),
sampling_interval_ms: Some(1000),
};
let mut event = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: Some("qwen2.5-0.5b".to_string()),
target: Some("local".to_string()),
latency_ms: Some(420),
error: None,
data: Some(serde_json::json!({ "model_id": "qwen2.5-0.5b" }).to_string()),
timestamp_ms: 1_700_000_000_000,
};
attach_resource_summary(&mut event, Some(summary));
let data_json = event.data.clone().expect("data present");
let data: serde_json::Value = serde_json::from_str(&data_json).unwrap();
assert!(data.get("resource_summary").is_some());
assert_eq!(
data["resource_summary"]["cpu_peak_pct"].as_f64(),
Some(62.5)
);
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
let payload_json = serde_json::to_string(&platform.payload).unwrap();
assert!(
payload_json.contains("\"resource_summary\""),
"resource_summary must be hoisted to payload top level, got: {}",
payload_json
);
let parsed: serde_json::Value = serde_json::from_str(&payload_json).unwrap();
assert_eq!(
parsed["resource_summary"]["cpu_peak_pct"].as_f64(),
Some(62.5),
"hoisted value should be the same object we attached"
);
}
#[test]
fn pipeline_complete_publishes_resource_summary_in_payload() {
let summary = ResourceUsageSummary {
cpu_avg_pct: Some(12.0),
cpu_peak_pct: Some(45.0),
process_rss_peak_mb: Some(380),
available_mem_min_mb: Some(6000),
memory_pressure_peak: xybrid_core::device::MemoryPressure::Normal,
thermal_state_peak: xybrid_core::device::ThermalState::Normal,
battery_pct_end: None,
sample_count: 3,
sampling_mode: "summary".to_string(),
sampling_interval_ms: Some(1000),
};
let mut event = TelemetryEvent {
event_type: "PipelineComplete".to_string(),
stage_name: Some("mirage-document-insights".to_string()),
target: None,
latency_ms: Some(1_200),
error: None,
data: Some("{\"stages\":[],\"output_type\":\"Text\"}".to_string()),
timestamp_ms: 1_700_000_000_000,
};
attach_resource_summary(&mut event, Some(summary));
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
let payload_json = serde_json::to_string(&platform.payload).unwrap();
assert!(
payload_json.contains("\"resource_summary\""),
"PipelineComplete should carry resource_summary on payload top level, got: {payload_json}"
);
let parsed: serde_json::Value = serde_json::from_str(&payload_json).unwrap();
assert_eq!(parsed["resource_summary"]["sample_count"].as_i64(), Some(3));
assert_eq!(
parsed["resource_summary"]["memory_pressure_peak"].as_str(),
Some("normal")
);
}
#[test]
fn routing_outcome_fields_hoist_to_platform_event_top_level() {
let event = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: Some("qwen2.5-0.5b".to_string()),
target: Some("local".to_string()),
latency_ms: Some(420),
error: None,
data: Some(
serde_json::json!({
"model_id": "qwen2.5-0.5b",
"correlation_id": "run-string-123",
"outcome_category": {
"kind": "aborted_for_cloud_fallback",
"reason": "stress_memory"
},
"abort_reason": "stress_memory"
})
.to_string(),
),
timestamp_ms: 1_700_000_000_000,
};
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
assert_eq!(platform.correlation_id.as_deref(), Some("run-string-123"));
assert_eq!(
platform
.outcome_category
.as_ref()
.and_then(|v| v.get("kind")),
Some(&serde_json::json!("aborted_for_cloud_fallback"))
);
assert_eq!(platform.abort_reason.as_deref(), Some("stress_memory"));
assert_eq!(platform.payload["correlation_id"], "run-string-123");
assert_eq!(
platform.payload["outcome_category"]["kind"],
"aborted_for_cloud_fallback"
);
assert_eq!(platform.payload["abort_reason"], "stress_memory");
}
#[test]
fn local_reliability_hint_hoists_to_platform_event_top_level() {
let event = TelemetryEvent {
event_type: "RoutingDecision".to_string(),
stage_name: Some("stage-1".to_string()),
target: Some("cloud".to_string()),
latency_ms: None,
error: None,
data: Some(
serde_json::json!({
"stage": "stage-1",
"target": "cloud",
"reason": "history_bias",
"local_reliability_hint": {
"recent_abort_rate": 0.75,
"sample_size": 4_u32,
}
})
.to_string(),
),
timestamp_ms: 1_700_000_000_000,
};
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
assert_eq!(
platform.payload["local_reliability_hint"]["recent_abort_rate"]
.as_f64()
.unwrap_or(-1.0),
0.75
);
assert_eq!(
platform.payload["local_reliability_hint"]["sample_size"].as_i64(),
Some(4)
);
}
#[test]
fn empty_local_reliability_hint_still_hoists_with_sample_size_zero() {
let event = TelemetryEvent {
event_type: "RoutingDecision".to_string(),
stage_name: Some("stage-1".to_string()),
target: Some("local".to_string()),
latency_ms: None,
error: None,
data: Some(
serde_json::json!({
"stage": "stage-1",
"target": "local",
"reason": "default_local",
"local_reliability_hint": {
"recent_abort_rate": 0.0,
"sample_size": 0_u32,
}
})
.to_string(),
),
timestamp_ms: 1_700_000_000_000,
};
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
assert_eq!(
platform.payload["local_reliability_hint"]["sample_size"].as_i64(),
Some(0)
);
}
#[test]
fn local_aborted_and_cloud_retry_events_share_correlation_id() {
let local = local_aborted_event(
"run-abc-123",
"qwen2.5-0.5b",
xybrid_core::abort::AbortReason::StressMemory,
180,
4,
);
let cloud = cloud_retry_event("run-abc-123", "qwen2.5-0.5b", Some("openai"), 920, 38, None);
assert_eq!(local.event_type, "LocalAborted");
assert_eq!(local.target.as_deref(), Some("local"));
assert_eq!(local.latency_ms, Some(180));
assert_eq!(cloud.event_type, "CloudRetry");
assert_eq!(cloud.target.as_deref(), Some("cloud"));
assert_eq!(cloud.latency_ms, Some(920));
let local_data: serde_json::Value =
serde_json::from_str(local.data.as_ref().unwrap()).unwrap();
let cloud_data: serde_json::Value =
serde_json::from_str(cloud.data.as_ref().unwrap()).unwrap();
assert_eq!(local_data["correlation_id"], "run-abc-123");
assert_eq!(cloud_data["correlation_id"], "run-abc-123");
assert_eq!(local_data["abort_reason"], "stress_memory");
assert_eq!(
local_data["outcome_category"]["kind"],
"aborted_for_cloud_fallback"
);
assert_eq!(local_data["outcome_category"]["reason"], "stress_memory");
assert_eq!(local_data["tokens_emitted"], 4);
assert_eq!(cloud_data["provider"], "openai");
assert_eq!(cloud_data["outcome_category"], "cloud_success");
assert_eq!(cloud_data["tokens_emitted"], 38);
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform_local = convert_to_platform_event(&local, &config, None, None, None);
let platform_cloud = convert_to_platform_event(&cloud, &config, None, None, None);
assert_eq!(
platform_local.correlation_id.as_deref(),
Some("run-abc-123")
);
assert_eq!(
platform_cloud.correlation_id.as_deref(),
Some("run-abc-123")
);
assert_eq!(
platform_cloud.outcome_category.as_ref(),
Some(&serde_json::json!("cloud_success"))
);
assert_eq!(
platform_cloud.payload["outcome_category"],
serde_json::json!("cloud_success")
);
assert_eq!(
platform_local.abort_reason.as_deref(),
Some("stress_memory")
);
}
#[test]
fn local_aborted_event_carries_resource_summary_and_reliability_hint() {
let summary = ResourceUsageSummary {
cpu_avg_pct: Some(61.0),
cpu_peak_pct: Some(88.5),
process_rss_peak_mb: Some(2048),
available_mem_min_mb: Some(512),
memory_pressure_peak: xybrid_core::device::MemoryPressure::Critical,
thermal_state_peak: xybrid_core::device::ThermalState::Hot,
battery_pct_end: Some(51),
sample_count: 3,
sampling_mode: "debug_local".to_string(),
sampling_interval_ms: Some(100),
};
let local = local_aborted_event_with_details(
"run-rich-123",
"qwen2.5-0.5b",
xybrid_core::abort::AbortReason::StressMemory,
180,
4,
Some(summary),
Some(LocalReliabilityHint {
recent_abort_rate: 0.5,
sample_size: 2,
}),
);
let data: serde_json::Value =
serde_json::from_str(local.data.as_ref().expect("data present")).unwrap();
assert_eq!(
data["resource_summary"]["memory_pressure_peak"].as_str(),
Some("critical")
);
assert_eq!(
data["resource_summary"]["sampling_interval_ms"].as_i64(),
Some(100)
);
assert_eq!(
data["local_reliability_hint"]["recent_abort_rate"].as_f64(),
Some(0.5)
);
assert_eq!(
data["local_reliability_hint"]["sample_size"].as_i64(),
Some(2)
);
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&local, &config, None, None, None);
assert_eq!(
platform.payload["resource_summary"]["sample_count"].as_i64(),
Some(3)
);
assert_eq!(
platform.payload["local_reliability_hint"]["sample_size"].as_i64(),
Some(2)
);
}
#[test]
fn cloud_retry_event_defaults_provider_when_unspecified() {
let cloud = cloud_retry_event("run-xyz", "model-x", None, 50, 10, None);
let cloud_data: serde_json::Value =
serde_json::from_str(cloud.data.as_ref().unwrap()).unwrap();
assert_eq!(cloud_data["provider"], "xybrid");
assert_eq!(cloud_data["outcome_category"], "cloud_success");
assert_eq!(cloud_data["status"], "ok");
assert!(cloud.error.is_none());
}
#[test]
fn cloud_retry_event_marks_failure_branch() {
let cloud = cloud_retry_event(
"run-fail-1",
"deepseek-chat",
Some("deepseek"),
842,
0,
Some("Gateway returned 502: Provider error"),
);
let cloud_data: serde_json::Value =
serde_json::from_str(cloud.data.as_ref().unwrap()).unwrap();
assert_eq!(cloud.event_type, "CloudRetry");
assert_eq!(cloud.target.as_deref(), Some("cloud"));
assert_eq!(cloud.latency_ms, Some(842));
assert_eq!(
cloud.error.as_deref(),
Some("Gateway returned 502: Provider error")
);
assert_eq!(cloud_data["status"], "error");
assert_eq!(cloud_data["outcome_category"]["kind"], "hard_fail");
assert_eq!(
cloud_data["outcome_category"]["reason"],
"Gateway returned 502: Provider error"
);
assert_eq!(cloud_data["tokens_emitted"], 0);
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&cloud, &config, None, None, None);
assert_eq!(
platform.outcome_category.as_ref(),
Some(&serde_json::json!({
"kind": "hard_fail",
"reason": "Gateway returned 502: Provider error"
}))
);
}
#[test]
fn telemetry_error_redaction_removes_api_keys_and_bearer_tokens() {
let redacted = redact_error_for_telemetry(
"Gateway returned 401: Authorization: Bearer sk_test_abc123 and api_key=hf_secret_xyz",
);
assert!(redacted.contains("Authorization: Bearer [REDACTED]"));
assert!(redacted.contains("api_key=[REDACTED]"));
assert!(!redacted.contains("sk_test_abc123"));
assert!(!redacted.contains("hf_secret_xyz"));
}
#[test]
fn attach_resource_summary_with_none_is_noop() {
let original = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: None,
target: None,
latency_ms: None,
error: None,
data: Some("{\"model_id\":\"x\"}".to_string()),
timestamp_ms: 0,
};
let mut event = original.clone();
attach_resource_summary(&mut event, None);
assert_eq!(event.data, original.data);
}
#[test]
fn env_resource_mode_parses_all_variants() {
let cases: &[(&str, ResourceTelemetryMode)] = &[
("off", ResourceTelemetryMode::Off),
("boundary", ResourceTelemetryMode::Boundary),
(
"summary",
ResourceTelemetryMode::Summary {
interval_ms: ResourceTelemetryMode::DEFAULT_SUMMARY_INTERVAL_MS,
},
),
(
"summary:500",
ResourceTelemetryMode::Summary { interval_ms: 500 },
),
(
"debug_local:250",
ResourceTelemetryMode::DebugLocal { interval_ms: 250 },
),
];
for (raw, expected) in cases {
std::env::set_var("XYBRID_RESOURCE_TELEMETRY", raw);
let parsed = resource_mode_from_env().expect(raw);
assert_eq!(&parsed, expected, "parsing `{}`", raw);
}
std::env::remove_var("XYBRID_RESOURCE_TELEMETRY");
assert!(resource_mode_from_env().is_none());
}
#[test]
fn test_convert_stage_start_event() {
let event = OrchestratorEvent::StageStart {
stage_name: "asr".to_string(),
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "StageStart");
assert_eq!(telemetry.stage_name, Some("asr".to_string()));
assert!(telemetry.target.is_none());
assert!(telemetry.latency_ms.is_none());
assert!(telemetry.error.is_none());
assert!(telemetry.timestamp_ms > 0);
}
#[test]
fn test_convert_stage_complete_event() {
let event = OrchestratorEvent::StageComplete {
stage_name: "tts".to_string(),
target: "local".to_string(),
latency_ms: 150,
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "StageComplete");
assert_eq!(telemetry.stage_name, Some("tts".to_string()));
assert_eq!(telemetry.target, Some("local".to_string()));
assert_eq!(telemetry.latency_ms, Some(150));
assert!(telemetry.error.is_none());
}
#[test]
fn test_convert_stage_error_event() {
let event = OrchestratorEvent::StageError {
stage_name: "asr".to_string(),
error: "Model not found".to_string(),
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "StageError");
assert_eq!(telemetry.stage_name, Some("asr".to_string()));
assert_eq!(telemetry.error, Some("Model not found".to_string()));
}
#[test]
fn test_convert_pipeline_start_event() {
let event = OrchestratorEvent::PipelineStart {
stages: vec!["asr".to_string(), "llm".to_string(), "tts".to_string()],
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "PipelineStart");
assert!(telemetry.stage_name.is_none());
assert!(telemetry.data.is_some());
let data = telemetry.data.unwrap();
assert!(data.contains("asr"));
assert!(data.contains("llm"));
assert!(data.contains("tts"));
}
#[test]
fn test_convert_pipeline_complete_event() {
let event = OrchestratorEvent::PipelineComplete {
total_latency_ms: 500,
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "PipelineComplete");
assert_eq!(telemetry.latency_ms, Some(500));
}
#[test]
fn test_convert_routing_decided_event() {
let event = OrchestratorEvent::RoutingDecided {
stage_name: "asr".to_string(),
target: "cloud".to_string(),
reason: "network_optimal".to_string(),
recent_abort_rate: 0.0,
sample_size: 0,
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "RoutingDecided");
assert_eq!(telemetry.stage_name, Some("asr".to_string()));
assert_eq!(telemetry.target, Some("cloud".to_string()));
assert!(telemetry.data.is_some());
let data = telemetry.data.unwrap();
assert!(data.contains("network_optimal"));
}
#[test]
fn routing_decided_event_carries_local_reliability_hint_end_to_end() {
let event = OrchestratorEvent::RoutingDecided {
stage_name: "stage-1".to_string(),
target: "cloud".to_string(),
reason: "history_bias".to_string(),
recent_abort_rate: 0.75,
sample_size: 4,
context: Default::default(),
};
let telemetry_event = convert_orchestrator_event(&event);
let data_str = telemetry_event.data.as_ref().expect("data must be present");
let parsed_data: serde_json::Value = serde_json::from_str(data_str).unwrap();
assert_eq!(
parsed_data["local_reliability_hint"]["recent_abort_rate"]
.as_f64()
.unwrap_or(-1.0),
0.75
);
assert_eq!(
parsed_data["local_reliability_hint"]["sample_size"].as_i64(),
Some(4)
);
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&telemetry_event, &config, None, None, None);
assert_eq!(
platform.payload["local_reliability_hint"]["recent_abort_rate"]
.as_f64()
.unwrap_or(-1.0),
0.75
);
assert_eq!(
platform.payload["local_reliability_hint"]["sample_size"].as_i64(),
Some(4)
);
}
#[test]
fn routing_decided_event_sanitizes_non_finite_recent_abort_rate() {
for bad_rate in [f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
let event = OrchestratorEvent::RoutingDecided {
stage_name: "stage-1".to_string(),
target: "cloud".to_string(),
reason: "history_bias".to_string(),
recent_abort_rate: bad_rate,
sample_size: 1,
context: Default::default(),
};
let telemetry_event = convert_orchestrator_event(&event);
let parsed_data: serde_json::Value =
serde_json::from_str(telemetry_event.data.as_ref().unwrap()).unwrap();
assert_eq!(
parsed_data["local_reliability_hint"]["recent_abort_rate"].as_f64(),
Some(0.0),
"non-finite rate {bad_rate} must be sanitized to 0.0"
);
}
}
#[test]
fn scoped_orchestrator_bridge_drains_queued_events_with_captured_context() {
let _guard = TelemetrySenderTestGuard::acquire();
let (tx, rx) = mpsc::channel();
register_telemetry_sender(tx);
let pipeline_id = Uuid::new_v4();
let trace_id = Uuid::new_v4();
let _event_context = xybrid_core::event_bus::EventContextGuard::install(
xybrid_core::event_bus::EventContext::default()
.with_pipeline_id(pipeline_id)
.with_trace_id(trace_id),
);
let orchestrator = xybrid_core::orchestrator::Orchestrator::new();
let bridge = bridge_orchestrator_events(&orchestrator);
orchestrator
.event_bus()
.publish(OrchestratorEvent::RoutingDecided {
stage_name: "scoped-bridge-context".to_string(),
target: "cloud".to_string(),
reason: "history_bias".to_string(),
recent_abort_rate: 0.5,
sample_size: 2,
context: Default::default(),
});
bridge.drain();
let mut received = None;
for _ in 0..20 {
match rx.recv_timeout(std::time::Duration::from_millis(50)) {
Ok(event)
if event.event_type == "RoutingDecided"
&& event.stage_name.as_deref() == Some("scoped-bridge-context") =>
{
received = Some(event);
break;
}
Ok(_) | Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => {
panic!("telemetry receiver disconnected before RoutingDecided arrived")
}
}
}
let received = received.expect("drained bridge should publish queued orchestrator event");
let data: serde_json::Value =
serde_json::from_str(received.data.as_ref().expect("context-bearing data")).unwrap();
assert_eq!(
data[CONTEXT_PIPELINE_ID_KEY],
serde_json::json!(pipeline_id)
);
assert_eq!(data[CONTEXT_TRACE_ID_KEY], serde_json::json!(trace_id));
assert_eq!(
data["local_reliability_hint"]["recent_abort_rate"].as_f64(),
Some(0.5)
);
assert_eq!(
data["local_reliability_hint"]["sample_size"].as_i64(),
Some(2)
);
}
#[test]
fn test_convert_execution_started_event() {
let event = OrchestratorEvent::ExecutionStarted {
stage_name: "asr".to_string(),
target: "local".to_string(),
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "ExecutionStarted");
assert_eq!(telemetry.stage_name, Some("asr".to_string()));
assert_eq!(telemetry.target, Some("local".to_string()));
}
#[test]
fn test_convert_execution_completed_event() {
let event = OrchestratorEvent::ExecutionCompleted {
stage_name: "asr".to_string(),
target: "local".to_string(),
execution_time_ms: 75,
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "ExecutionCompleted");
assert_eq!(telemetry.stage_name, Some("asr".to_string()));
assert_eq!(telemetry.target, Some("local".to_string()));
assert_eq!(telemetry.latency_ms, Some(75));
}
#[test]
fn test_convert_execution_failed_event() {
let event = OrchestratorEvent::ExecutionFailed {
stage_name: "tts".to_string(),
target: "cloud".to_string(),
error: "Timeout".to_string(),
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "ExecutionFailed");
assert_eq!(telemetry.stage_name, Some("tts".to_string()));
assert_eq!(telemetry.target, Some("cloud".to_string()));
assert_eq!(telemetry.error, Some("Timeout".to_string()));
}
#[test]
fn test_convert_policy_evaluated_allowed() {
let event = OrchestratorEvent::PolicyEvaluated {
stage_name: "asr".to_string(),
allowed: true,
reason: Some("All conditions met".to_string()),
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "PolicyEvaluated");
assert_eq!(telemetry.stage_name, Some("asr".to_string()));
assert!(telemetry.error.is_none()); assert!(telemetry.data.is_some());
}
#[test]
fn test_convert_policy_evaluated_denied() {
let event = OrchestratorEvent::PolicyEvaluated {
stage_name: "llm".to_string(),
allowed: false,
reason: Some("Privacy policy violation".to_string()),
context: Default::default(),
};
let telemetry = convert_orchestrator_event(&event);
assert_eq!(telemetry.event_type, "PolicyEvaluated");
assert_eq!(telemetry.stage_name, Some("llm".to_string()));
assert_eq!(
telemetry.error,
Some("Privacy policy violation".to_string())
);
}
fn routing_decided_event(stage_name: impl Into<String>) -> OrchestratorEvent {
OrchestratorEvent::RoutingDecided {
stage_name: stage_name.into(),
target: "local".to_string(),
reason: "test_route".to_string(),
recent_abort_rate: 0.0,
sample_size: 0,
context: xybrid_core::event_bus::EventContext::default(),
}
}
fn clear_registered_telemetry_senders() {
TELEMETRY_SENDERS
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.clear();
}
struct TelemetrySenderTestGuard {
_guard: MutexGuard<'static, ()>,
}
impl TelemetrySenderTestGuard {
fn acquire() -> Self {
let guard = TELEMETRY_SENDER_TEST_LOCK
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
clear_registered_telemetry_senders();
Self { _guard: guard }
}
}
impl Drop for TelemetrySenderTestGuard {
fn drop(&mut self) {
clear_registered_telemetry_senders();
}
}
#[test]
fn bridge_drains_all_events_on_orchestrator_drop() {
let _guard = TelemetrySenderTestGuard::acquire();
let (tx, rx) = mpsc::channel();
register_telemetry_sender(tx);
let orchestrator = xybrid_core::orchestrator::Orchestrator::new();
let bridge = bridge_orchestrator_events(&orchestrator);
for idx in 0..100 {
orchestrator
.event_bus()
.publish(routing_decided_event(format!("bridge-drain-{idx:03}")));
}
drop(orchestrator);
bridge.join().expect("bridge should drain cleanly");
let events: Vec<_> = rx
.try_iter()
.filter(|event| {
event.event_type == "RoutingDecided"
&& event
.stage_name
.as_deref()
.is_some_and(|stage| stage.starts_with("bridge-drain-"))
})
.collect();
assert_eq!(events.len(), 100);
for idx in 0..100 {
assert!(
events
.iter()
.any(|event| event.stage_name.as_deref()
== Some(&format!("bridge-drain-{idx:03}"))),
"missing routed event bridge-drain-{idx:03}; got {events:?}"
);
}
}
#[test]
fn bridge_preserves_correlation_id_across_task_boundary() {
let _guard = TelemetrySenderTestGuard::acquire();
let (tx, rx) = mpsc::channel();
register_telemetry_sender(tx);
let publishers: Vec<_> = ["A", "B"]
.into_iter()
.map(|correlation_id| {
thread::spawn(move || {
let _context = xybrid_core::event_bus::EventContextGuard::install(
xybrid_core::event_bus::EventContext::default()
.with_correlation_id(correlation_id),
);
let orchestrator = xybrid_core::orchestrator::Orchestrator::new();
let bridge = bridge_orchestrator_events(&orchestrator);
for idx in 0..10 {
orchestrator
.event_bus()
.publish(routing_decided_event(format!("{correlation_id}-{idx}")));
thread::yield_now();
}
drop(orchestrator);
bridge.join().expect("bridge should drain cleanly");
})
})
.collect();
for publisher in publishers {
publisher.join().expect("publisher thread should not panic");
}
let events: Vec<_> = rx
.try_iter()
.filter(|event| {
event.event_type == "RoutingDecided"
&& event
.stage_name
.as_deref()
.is_some_and(|stage| stage.starts_with("A-") || stage.starts_with("B-"))
})
.collect();
assert_eq!(events.len(), 20);
for event in events {
let stage = event.stage_name.as_deref().expect("stage name");
let data: serde_json::Value =
serde_json::from_str(event.data.as_deref().expect("event data")).unwrap();
let expected = stage.split('-').next().unwrap();
assert_eq!(
data["correlation_id"].as_str(),
Some(expected),
"event {stage} carried wrong context: {data}"
);
}
}
#[test]
fn flush_blocks_until_in_flight_events_delivered() {
let _guard = TelemetrySenderTestGuard::acquire();
let (tx, rx) = mpsc::channel();
register_telemetry_sender(tx);
let orchestrator = xybrid_core::orchestrator::Orchestrator::new();
let bridge = bridge_orchestrator_events(&orchestrator);
for idx in 0..5 {
orchestrator
.event_bus()
.publish(routing_decided_event(format!("flush-{idx}")));
}
bridge.flush().expect("flush should complete");
let mut delivered = Vec::new();
let deadline = std::time::Instant::now() + Duration::from_secs(1);
while delivered.len() < 5 && std::time::Instant::now() < deadline {
let event = rx
.recv_timeout(Duration::from_millis(50))
.expect("flush should deliver event before returning");
if event.event_type == "RoutingDecided"
&& event
.stage_name
.as_deref()
.is_some_and(|stage| stage.starts_with("flush-"))
{
delivered.push(event);
}
}
assert_eq!(delivered.len(), 5);
drop(orchestrator);
bridge.join().expect("bridge should drain cleanly");
}
#[test]
fn test_telemetry_event_serialization() {
let event = TelemetryEvent {
event_type: "StageStart".to_string(),
stage_name: Some("asr".to_string()),
target: None,
latency_ms: None,
error: None,
data: None,
timestamp_ms: 1234567890,
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("StageStart"));
assert!(json.contains("asr"));
let deserialized: TelemetryEvent = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.event_type, "StageStart");
assert_eq!(deserialized.stage_name, Some("asr".to_string()));
}
#[test]
fn test_register_and_publish() {
let _guard = TelemetrySenderTestGuard::acquire();
let (tx, rx) = mpsc::channel();
register_telemetry_sender(tx);
let event = TelemetryEvent {
event_type: "TestEvent".to_string(),
stage_name: Some("test".to_string()),
target: None,
latency_ms: None,
error: None,
data: None,
timestamp_ms: 0,
};
publish_telemetry_event(event.clone());
let received = rx.recv_timeout(std::time::Duration::from_millis(100));
assert!(received.is_ok());
let received_event = received.unwrap();
assert_eq!(received_event.event_type, "TestEvent");
}
#[test]
fn test_telemetry_config_defaults() {
let config = TelemetryConfig::default();
assert_eq!(config.batch_size, 10);
assert_eq!(config.flush_interval_secs, 5);
assert_eq!(config.max_retries, 3);
assert!(config.enable_retry_queue);
assert!(config.auto_hardware_detection);
assert!(!config.capture_hostname);
}
#[test]
fn test_http_exporter_circuit_breaker_initial_state() {
let config = TelemetryConfig::new("https://example.com", "test-key")
.with_device("test-device", "test-platform");
let exporter = HttpTelemetryExporter::new(config);
assert!(!exporter.is_circuit_open());
}
#[test]
fn test_http_exporter_circuit_breaker_reset() {
let config = TelemetryConfig::new("https://example.com", "test-key")
.with_device("test-device", "test-platform");
let exporter = HttpTelemetryExporter::new(config);
for _ in 0..3 {
exporter.circuit.record_failure();
}
assert!(exporter.is_circuit_open());
exporter.reset_circuit();
assert!(!exporter.is_circuit_open());
}
#[test]
fn test_http_exporter_failed_queue_initial_empty() {
let config = TelemetryConfig::new("https://example.com", "test-key")
.with_device("test-device", "test-platform");
let exporter = HttpTelemetryExporter::new(config);
assert_eq!(exporter.failed_queue_size(), 0);
assert_eq!(exporter.dropped_count(), 0);
}
#[test]
fn test_queue_failed_events() {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let dropped = Arc::new(AtomicU32::new(0));
let events = vec![PlatformEvent {
session_id: Uuid::new_v4(),
event_type: "Test".to_string(),
payload: serde_json::json!({}),
device_id: None,
device_label: None,
platform: None,
app_version: None,
device: None,
timestamp: None,
pipeline_id: None,
trace_id: None,
correlation_id: None,
outcome_category: None,
abort_reason: None,
stages: None,
}];
queue_failed_events(events, &queue, &dropped);
assert_eq!(queue.lock().unwrap().len(), 1);
assert_eq!(dropped.load(Ordering::Relaxed), 0);
}
#[test]
fn test_is_retryable_status() {
assert!(is_retryable_status(429));
assert!(is_retryable_status(502));
assert!(is_retryable_status(503));
assert!(is_retryable_status(504));
assert!(!is_retryable_status(200));
assert!(!is_retryable_status(400));
assert!(!is_retryable_status(401));
assert!(!is_retryable_status(404));
}
fn sample_event() -> TelemetryEvent {
TelemetryEvent {
event_type: "TestEvent".to_string(),
stage_name: None,
target: None,
latency_ms: None,
error: None,
data: None,
timestamp_ms: 0,
}
}
#[test]
fn opt_out_emits_no_device_and_no_auto_id() {
let mut config =
TelemetryConfig::new("http://example.invalid", "k").with_auto_hardware_detection(false);
let profile = resolve_device_profile(&config);
assert!(profile.is_none(), "strict opt-out must yield None profile");
if !config.auto_hardware_detection && !config.device_id_explicit {
config.device_id = None;
}
let event =
convert_to_platform_event(&sample_event(), &config, profile.as_ref(), None, None);
let json = serde_json::to_value(&event).unwrap();
assert!(
json.get("device").is_none(),
"no `device` key on strict opt-out, got: {json}"
);
assert!(
json.get("device_id").is_none(),
"strict opt-out must omit `device_id` entirely, not emit null. got: {json}"
);
}
#[test]
fn embedded_context_overrides_flush_context() {
let config = TelemetryConfig::new("http://example.invalid", "k");
let event_pipeline_id = Uuid::new_v4();
let event_trace_id = Uuid::new_v4();
let flush_pipeline_id = Uuid::new_v4();
let flush_trace_id = Uuid::new_v4();
let event = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: None,
target: Some("cloud".to_string()),
latency_ms: Some(42),
error: None,
data: Some(
serde_json::json!({
CONTEXT_PIPELINE_ID_KEY: event_pipeline_id,
CONTEXT_TRACE_ID_KEY: event_trace_id,
"model_id": "deepseek-chat",
"tokens_in": 100,
"tokens_out": 10,
})
.to_string(),
),
timestamp_ms: 0,
};
let converted = convert_to_platform_event(
&event,
&config,
None,
Some(flush_pipeline_id),
Some(flush_trace_id),
);
assert_eq!(converted.pipeline_id, Some(event_pipeline_id));
assert_eq!(converted.trace_id, Some(event_trace_id));
assert_eq!(converted.payload["model_id"], "deepseek-chat");
assert!(converted.payload["data"]
.get(CONTEXT_PIPELINE_ID_KEY)
.is_none());
assert!(converted.payload["data"]
.get(CONTEXT_TRACE_ID_KEY)
.is_none());
}
#[test]
fn extract_llm_token_counts_reads_canonical_keys() {
let stages = serde_json::json!({
"spans": [
{ "name": "execute:preprocessing", "metadata": {} },
{
"name": "llm_inference_with_messages",
"metadata": {
"tokens_in": "128",
"tokens_out": "42",
"tokens_generated": "42"
}
}
]
});
let (tin, tout) = extract_llm_token_counts(&stages).expect("should find llm span");
assert_eq!(tin, Some(128));
assert_eq!(tout, Some(42));
}
#[test]
fn extract_llm_token_counts_falls_back_to_openai_style_keys() {
let stages = serde_json::json!({
"spans": [{
"name": "llm_inference_streaming",
"metadata": {
"prompt_tokens": "16",
"completion_tokens": "64"
}
}]
});
let (tin, tout) = extract_llm_token_counts(&stages).expect("should find llm span");
assert_eq!(tin, Some(16));
assert_eq!(tout, Some(64));
}
#[test]
fn extract_llm_token_counts_returns_none_without_llm_span() {
let stages = serde_json::json!({
"spans": [
{ "name": "execute:asr.whisper-tiny", "metadata": {} }
]
});
assert!(extract_llm_token_counts(&stages).is_none());
}
#[test]
fn extract_llm_prompt_cached_tokens_picks_last_llm_span_with_value() {
let stages = serde_json::json!({
"spans": [
{
"name": "llm_inference_streaming",
"metadata": { "ttft_ms": 120, "prompt_cached_tokens": "0" }
},
{
"name": "llm_inference_with_messages",
"metadata": { "tokens_in": 200, "prompt_cached_tokens": "150" }
}
]
});
assert_eq!(extract_llm_prompt_cached_tokens(&stages), Some(150));
}
#[test]
fn extract_llm_prompt_cached_tokens_handles_numeric_value() {
let stages = serde_json::json!({
"spans": [{
"name": "llm_inference_with_messages",
"metadata": { "prompt_cached_tokens": 96 }
}]
});
assert_eq!(extract_llm_prompt_cached_tokens(&stages), Some(96));
}
#[test]
fn extract_llm_prompt_cached_tokens_returns_none_when_absent() {
let stages = serde_json::json!({
"spans": [{
"name": "llm_inference_with_messages",
"metadata": { "tokens_in": 32, "tokens_out": 96 }
}]
});
assert_eq!(extract_llm_prompt_cached_tokens(&stages), None);
}
#[test]
fn extract_llm_prompt_cached_tokens_ignores_non_llm_spans() {
let stages = serde_json::json!({
"spans": [
{
"name": "execute:asr.whisper-tiny",
"metadata": { "prompt_cached_tokens": "999" }
}
]
});
assert_eq!(extract_llm_prompt_cached_tokens(&stages), None);
}
#[test]
fn extract_llm_token_counts_scans_across_llm_spans() {
let stages = serde_json::json!({
"spans": [
{
"name": "llm_inference_streaming",
"metadata": { "ttft_ms": 120 }
},
{
"name": "inference:qwen2.5-0.5b",
"metadata": { "tokens_in": 32, "tokens_out": 96 }
}
]
});
let (tin, tout) = extract_llm_token_counts(&stages).expect("should find llm spans");
assert_eq!(tin, Some(32));
assert_eq!(tout, Some(96));
}
#[test]
fn extract_llm_token_counts_prefers_last_authoritative_span() {
let stages = serde_json::json!({
"spans": [
{
"name": "llm_inference_streaming",
"metadata": { "tokens_in": 10, "tokens_out": 5 }
},
{
"name": "llm_inference_streaming",
"metadata": { "ttft_ms": 50 }
},
{
"name": "llm_inference_streaming",
"metadata": { "tokens_in": 128, "tokens_out": 42 }
}
]
});
let (tin, tout) = extract_llm_token_counts(&stages).expect("should find llm spans");
assert_eq!(tin, Some(128), "must take last-span tokens_in, not first");
assert_eq!(tout, Some(42), "must take last-span tokens_out, not first");
}
#[test]
fn extract_llm_token_counts_falls_back_across_partial_spans() {
let stages = serde_json::json!({
"spans": [
{
"name": "llm_inference_streaming",
"metadata": { "tokens_in": 64 }
},
{
"name": "llm_inference_streaming",
"metadata": { "tokens_out": 256 }
}
]
});
let (tin, tout) = extract_llm_token_counts(&stages).expect("should find llm spans");
assert_eq!(tin, Some(64));
assert_eq!(tout, Some(256));
}
#[test]
fn extract_llm_inference_string_attr_reads_backend_and_provider() {
let stages = serde_json::json!({
"spans": [
{ "name": "execute:gpt-4o-mini", "metadata": {} },
{
"name": "llm_inference",
"metadata": {
"backend": "cloud",
"provider": "openai",
"tokens_in": "120",
"tokens_out": "32"
}
}
]
});
assert_eq!(
extract_llm_inference_string_attr(&stages, "backend").as_deref(),
Some("cloud")
);
assert_eq!(
extract_llm_inference_string_attr(&stages, "provider").as_deref(),
Some("openai")
);
}
#[test]
fn extract_llm_inference_string_attr_skips_non_llm_spans() {
let stages = serde_json::json!({
"spans": [
{
"name": "execute:asr-whisper-tiny",
"metadata": { "backend": "ort" }
},
{
"name": "execute:gpt-4o-mini",
"metadata": { "provider": "should-not-win" }
}
]
});
assert!(extract_llm_inference_string_attr(&stages, "backend").is_none());
assert!(extract_llm_inference_string_attr(&stages, "provider").is_none());
}
#[test]
fn extract_llm_inference_string_attr_prefers_last_llm_span() {
let stages = serde_json::json!({
"spans": [
{
"name": "llm_inference",
"metadata": { "backend": "cloud", "provider": "anthropic" }
},
{
"name": "llm_inference",
"metadata": { "backend": "cloud", "provider": "openai" }
}
]
});
assert_eq!(
extract_llm_inference_string_attr(&stages, "provider").as_deref(),
Some("openai"),
"must take last-span provider, not first"
);
}
#[test]
fn extract_string_attr_from_any_span_reads_outer_execute_span() {
let stages = serde_json::json!({
"spans": [
{
"name": "execute:wav2vec2-base-960h",
"metadata": { "backend": "ort" }
}
]
});
assert_eq!(
extract_string_attr_from_any_span(&stages, "backend").as_deref(),
Some("ort")
);
}
#[test]
fn extract_string_attr_from_any_span_prefers_last() {
let stages = serde_json::json!({
"spans": [
{
"name": "execute:gpt-4o-mini",
"metadata": { "backend": "first" }
},
{
"name": "llm_inference",
"metadata": { "backend": "second" }
}
]
});
assert_eq!(
extract_string_attr_from_any_span(&stages, "backend").as_deref(),
Some("second")
);
}
#[test]
fn build_model_download_event_has_canonical_shape() {
let event = build_model_download_event("kokoro-82m", 1_234_567, "huggingface", 5_432);
assert_eq!(event.event_type, "ModelDownload");
assert_eq!(
event.latency_ms,
Some(5_432),
"latency_ms must mirror duration_ms so the existing latency column lights up without a schema migration"
);
assert!(event.error.is_none());
assert!(event.stage_name.is_none());
assert!(event.target.is_none());
assert!(event.timestamp_ms > 0, "timestamp must be populated");
let data: serde_json::Value =
serde_json::from_str(event.data.as_deref().expect("data present"))
.expect("data is valid JSON");
assert_eq!(data["model_id"].as_str(), Some("kokoro-82m"));
assert_eq!(data["bytes_downloaded"].as_u64(), Some(1_234_567));
assert_eq!(data["source"].as_str(), Some("huggingface"));
assert_eq!(data["duration_ms"].as_u64(), Some(5_432));
}
#[test]
fn build_model_download_event_carries_r2_source_label() {
let event = build_model_download_event("test-model", 42, "r2", 1);
let data: serde_json::Value = serde_json::from_str(event.data.as_deref().unwrap()).unwrap();
assert_eq!(data["source"].as_str(), Some("r2"));
}
#[test]
fn task_field_hoists_to_payload_top_level() {
xybrid_core::tracing::init_tracing(true);
let data = serde_json::json!({
"spans": [
{
"name": "execute:wav2vec2-base-960h",
"metadata": { "task": "asr", "backend": "ort" }
}
]
});
let event = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: Some("transcribe".to_string()),
target: Some("local".to_string()),
latency_ms: Some(420),
error: None,
data: Some(data.to_string()),
timestamp_ms: 1_700_000_000_000,
};
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
let payload_json = serde_json::to_string(&platform.payload).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&payload_json).unwrap();
assert_eq!(
parsed["task"].as_str(),
Some("asr"),
"task must be hoisted to payload top level: {}",
payload_json
);
}
#[test]
fn quantization_field_hoists_to_payload_top_level() {
xybrid_core::tracing::init_tracing(true);
let data = serde_json::json!({
"spans": [
{
"name": "execute:qwen2.5-0.5b-instruct",
"metadata": { "quantization": "q4_k_m", "backend": "llamacpp" }
}
]
});
let event = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: Some("chat".to_string()),
target: Some("local".to_string()),
latency_ms: Some(1_200),
error: None,
data: Some(data.to_string()),
timestamp_ms: 1_700_000_000_000,
};
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
let parsed: serde_json::Value =
serde_json::from_value(serde_json::to_value(&platform.payload).unwrap()).unwrap();
assert_eq!(
parsed["quantization"].as_str(),
Some("q4_k_m"),
"quantization must be hoisted: {}",
serde_json::to_string(&parsed).unwrap()
);
}
#[test]
fn streaming_field_hoists_to_payload_top_level() {
let data = serde_json::json!({
"model_id": "qwen2.5-0.5b-instruct",
"version": "1.0",
"output_type": "Text",
"streaming": true,
});
let event = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: Some("qwen2.5-0.5b-instruct".to_string()),
target: Some("local".to_string()),
latency_ms: Some(1_200),
error: None,
data: Some(data.to_string()),
timestamp_ms: 1_700_000_000_000,
};
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
let parsed: serde_json::Value =
serde_json::from_value(serde_json::to_value(&platform.payload).unwrap()).unwrap();
assert_eq!(
parsed["streaming"].as_bool(),
Some(true),
"streaming must be hoisted to payload top level: {}",
serde_json::to_string(&parsed).unwrap()
);
}
#[test]
fn streaming_field_omitted_when_data_does_not_carry_it() {
let data = serde_json::json!({
"model_id": "qwen2.5-0.5b-instruct",
"version": "1.0",
"output_type": "Text",
});
let event = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: Some("qwen2.5-0.5b-instruct".to_string()),
target: Some("local".to_string()),
latency_ms: Some(1_200),
error: None,
data: Some(data.to_string()),
timestamp_ms: 1_700_000_000_000,
};
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
let parsed: serde_json::Value =
serde_json::from_value(serde_json::to_value(&platform.payload).unwrap()).unwrap();
assert!(
parsed.get("streaming").is_none(),
"streaming must be omitted from top-level payload when absent in data: {}",
serde_json::to_string(&parsed).unwrap()
);
}
#[test]
fn task_field_omitted_when_span_does_not_carry_it() {
xybrid_core::tracing::init_tracing(true);
let data = serde_json::json!({
"spans": [
{
"name": "execute:custom-model",
"metadata": { "backend": "ort" }
}
]
});
let event = TelemetryEvent {
event_type: "ModelComplete".to_string(),
stage_name: Some("custom".to_string()),
target: Some("local".to_string()),
latency_ms: Some(10),
error: None,
data: Some(data.to_string()),
timestamp_ms: 1_700_000_000_000,
};
let config = TelemetryConfig::new("https://ingest.example.test", "sk_test_abc");
let platform = convert_to_platform_event(&event, &config, None, None, None);
let parsed: serde_json::Value =
serde_json::from_value(serde_json::to_value(&platform.payload).unwrap()).unwrap();
assert!(
parsed.get("task").is_none(),
"task must be absent (not empty string) when bundle didn't declare it: {}",
serde_json::to_string(&parsed).unwrap()
);
}
#[test]
fn opt_out_with_explicit_attribute_suppresses_device_id() {
let config = TelemetryConfig::new("http://example.invalid", "k")
.with_auto_hardware_detection(false)
.with_device_attribute("tailnet", "production");
let exporter = HttpTelemetryExporter::new(config);
assert!(
exporter.config.device_id.is_none(),
"opt-out + explicit attribute must suppress auto-wired device_id, got: {:?}",
exporter.config.device_id
);
}
#[test]
fn opt_out_with_explicit_with_device_preserves_id() {
let config = TelemetryConfig::new("http://example.invalid", "k")
.with_auto_hardware_detection(false)
.with_device("caller-supplied-id", "linux");
let exporter = HttpTelemetryExporter::new(config);
assert_eq!(
exporter.config.device_id.as_deref(),
Some("caller-supplied-id"),
"explicit with_device must survive opt-out clear"
);
}
#[test]
fn with_hardware_disables_auto_detection() {
let profile = DeviceProfile {
chip_family: Some("supplied-chip".into()),
ram_gb: Some(16),
..Default::default()
};
let config = TelemetryConfig::new("http://example.invalid", "k").with_hardware(profile);
assert!(
!config.auto_hardware_detection,
"with_hardware must disable auto-detection"
);
let resolved = resolve_device_profile(&config).expect("profile present");
assert_eq!(resolved.chip_family.as_deref(), Some("supplied-chip"));
assert_eq!(resolved.ram_gb, Some(16));
assert!(resolved.os.is_none(), "os must stay None (opt-out honored)");
assert!(
resolved.arch.is_none(),
"arch must stay None (opt-out honored)"
);
}
#[test]
fn opt_out_with_explicit_attribute_still_emits_device() {
let config = TelemetryConfig::new("http://example.invalid", "k")
.with_auto_hardware_detection(false)
.with_device_attribute("tailnet", "production");
let profile = resolve_device_profile(&config);
let profile = profile.expect("explicit attribute must surface a profile");
assert!(profile.chip_family.is_none());
assert!(profile.ram_gb.is_none());
assert_eq!(
profile.custom.get("tailnet").map(String::as_str),
Some("production")
);
}
}