use crate::diagnostics;
use crate::events::{FleetEvent, FleetEventBuffer};
use crate::machine_info::MachineInfo;
use crate::metrics;
use crate::otlp::OtlpMetricBridge;
use crate::panic_handler;
use crate::session;
use bevy::diagnostic::DiagnosticsStore;
use bevy::prelude::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TelemetryPayload {
pub app_id: String,
pub timestamp: u64,
pub session_id: String,
pub session_start_time: u64,
pub session_stats: session::SessionStats,
pub machine_info: Option<MachineInfo>,
pub metrics: Vec<crate::metrics::FleetMetric>,
pub events: Vec<FleetEvent>,
pub diagnostics: Vec<crate::diagnostics::FleetDiagnostic>,
pub panics: Vec<crate::panic_handler::PanicInfo>,
}
#[derive(Default)]
pub struct PublisherStats {
pub total_sent: AtomicUsize,
pub total_failed: AtomicUsize,
pub total_dropped: AtomicUsize,
pub consecutive_failures: AtomicUsize,
}
#[derive(Resource)]
pub struct PublishTimer {
last_publish: std::time::Instant,
backoff_multiplier: u32,
failure_stage: u32,
}
impl Default for PublishTimer {
fn default() -> Self {
Self {
last_publish: std::time::Instant::now(),
backoff_multiplier: 1,
failure_stage: 0,
}
}
}
impl PublishTimer {
fn get_next_interval(&self, base_interval: u64) -> u64 {
base_interval.saturating_mul(self.backoff_multiplier as u64)
}
fn record_success(&mut self) {
if self.backoff_multiplier > 1 {
info!("Resetting publish backoff");
}
self.backoff_multiplier = 1;
self.failure_stage = 0;
}
fn apply_failure_backoff(&mut self, consecutive_failures: u32) {
if consecutive_failures < 3 {
return;
}
let stage = ((consecutive_failures - 3) / 3) + 1;
if stage <= self.failure_stage {
return;
}
self.failure_stage = stage;
let desired_multiplier = (1u32 << stage.min(4)).min(16);
if desired_multiplier > self.backoff_multiplier {
self.backoff_multiplier = desired_multiplier;
warn!(
"Increasing publish backoff to {}x ({} consecutive failures)",
self.backoff_multiplier, consecutive_failures
);
}
}
}
#[cfg(not(target_arch = "wasm32"))]
mod native {
use super::*;
use crate::config::FleetConfig;
use tokio::sync::mpsc;
#[derive(Resource, Clone)]
pub struct TelemetrySender {
pub(crate) sender: mpsc::Sender<TelemetryPayload>,
pub(crate) in_flight_counter: Arc<AtomicUsize>,
pub(crate) stats: Arc<PublisherStats>,
pub(crate) _otlp_bridge: Arc<OtlpMetricBridge>,
}
impl TelemetrySender {
#[allow(dead_code)]
pub fn send_now(&self, payload: TelemetryPayload) -> Result<(), String> {
self.sender
.blocking_send(payload)
.map_err(|err| err.to_string())
}
}
pub fn setup_publisher(mut commands: Commands, config: Res<FleetConfig>) {
let otlp_bridge = match OtlpMetricBridge::new(&config) {
Ok(bridge) => Arc::new(bridge),
Err(err) => {
error!("Failed to initialise OTLP exporter: {err:?}");
return;
}
};
let (sender, mut receiver) = mpsc::channel::<TelemetryPayload>(config.max_queue_size);
let in_flight_counter = Arc::new(AtomicUsize::new(0));
let stats = Arc::new(PublisherStats::default());
let in_flight_clone = in_flight_counter.clone();
let stats_clone = stats.clone();
let exporter = otlp_bridge.clone();
std::thread::spawn(move || {
while let Some(payload) = receiver.blocking_recv() {
in_flight_clone.fetch_add(1, Ordering::Relaxed);
let result = exporter.export(&payload);
in_flight_clone.fetch_sub(1, Ordering::Relaxed);
match result {
Ok(_) => {
stats_clone.total_sent.fetch_add(1, Ordering::Relaxed);
stats_clone.consecutive_failures.store(0, Ordering::Relaxed);
debug!("Successfully published telemetry via OTLP");
}
Err(err) => {
stats_clone.total_failed.fetch_add(1, Ordering::Relaxed);
stats_clone
.consecutive_failures
.fetch_add(1, Ordering::Relaxed);
warn!("Failed to publish telemetry via OTLP: {err:?}");
}
}
}
});
commands.insert_resource(TelemetrySender {
sender,
in_flight_counter,
stats,
_otlp_bridge: otlp_bridge,
});
commands.insert_resource(PublishTimer::default());
}
pub fn publish_telemetry(
config: Res<FleetConfig>,
mut timer: ResMut<PublishTimer>,
sender: Option<Res<TelemetrySender>>,
machine_info: Option<Res<MachineInfo>>,
mut session_info: ResMut<session::SessionInfo>,
diagnostics_store: Res<DiagnosticsStore>,
mut event_buffer: ResMut<FleetEventBuffer>,
) {
let Some(sender_res) = sender else {
warn!("TelemetrySender not found");
return;
};
let consecutive_failures = sender_res
.stats
.consecutive_failures
.load(Ordering::Relaxed) as u32;
if consecutive_failures == 0 {
timer.record_success();
} else {
timer.apply_failure_backoff(consecutive_failures);
}
let interval = timer.get_next_interval(config.publish_interval_secs);
let elapsed = timer.last_publish.elapsed().as_secs();
if elapsed < interval {
return;
}
timer.last_publish = std::time::Instant::now();
if sender_res.sender.capacity() == 0 {
let dropped = sender_res
.stats
.total_dropped
.fetch_add(1, Ordering::Relaxed);
warn!(
"Telemetry queue full, dropping payload (total dropped: {})",
dropped + 1
);
return;
}
let metrics = metrics::extract_metrics_from_diagnostics(&diagnostics_store);
let events_to_send = event_buffer.take();
let diagnostics = diagnostics::extract_diagnostics(&diagnostics_store);
let panics = panic_handler::get_panics();
session_info.update_stats(
metrics.len(),
events_to_send.len(),
diagnostics.len(),
panics.len(),
);
let payload = TelemetryPayload {
app_id: config.app_id.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
session_id: session_info.session_id.clone(),
session_start_time: session_info.session_start_time,
session_stats: session_info.stats.clone(),
machine_info: machine_info.map(|info| (*info).clone()),
metrics,
events: events_to_send,
diagnostics,
panics,
};
match sender_res.sender.try_send(payload) {
Ok(_) => {
debug!("Telemetry queued for publishing");
}
Err(mpsc::error::TrySendError::Full(_)) => {
let dropped = sender_res
.stats
.total_dropped
.fetch_add(1, Ordering::Relaxed);
warn!(
"Telemetry queue full, dropping payload (total dropped: {})",
dropped + 1
);
}
Err(mpsc::error::TrySendError::Closed(_)) => {
error!("Telemetry channel closed");
}
}
}
pub fn log_publisher_stats(
sender: Option<Res<TelemetrySender>>,
mut last_log: Local<Option<std::time::Instant>>,
) {
let Some(sender) = sender else {
return;
};
let last_log_time = last_log.get_or_insert_with(std::time::Instant::now);
if last_log_time.elapsed().as_secs() < 300 {
return;
}
*last_log = Some(std::time::Instant::now());
let sent = sender.stats.total_sent.load(Ordering::Relaxed);
let failed = sender.stats.total_failed.load(Ordering::Relaxed);
let dropped = sender.stats.total_dropped.load(Ordering::Relaxed);
let in_flight = sender.in_flight_counter.load(Ordering::Relaxed);
info!(
"Fleet telemetry stats - Sent: {}, Failed: {}, Dropped: {}, In-flight: {}",
sent, failed, dropped, in_flight
);
}
}
#[cfg(target_arch = "wasm32")]
mod web {
use super::*;
use crate::config::FleetConfig;
#[derive(Resource, Clone)]
pub struct TelemetrySender {
exporter: Arc<OtlpMetricBridge>,
pub(crate) stats: Arc<PublisherStats>,
pub(crate) in_flight_counter: Arc<AtomicUsize>,
}
impl TelemetrySender {
pub fn send_now(&self, payload: TelemetryPayload) -> Result<(), String> {
match self.exporter.export(&payload) {
Ok(_) => {
self.stats.total_sent.fetch_add(1, Ordering::Relaxed);
self.stats.consecutive_failures.store(0, Ordering::Relaxed);
Ok(())
}
Err(err) => {
self.stats.total_failed.fetch_add(1, Ordering::Relaxed);
self.stats
.consecutive_failures
.fetch_add(1, Ordering::Relaxed);
Err(err.to_string())
}
}
}
}
pub fn setup_publisher(mut commands: Commands, config: Res<FleetConfig>) {
let exporter = match OtlpMetricBridge::new(&config) {
Ok(bridge) => Arc::new(bridge),
Err(err) => {
error!("Failed to initialise OTLP exporter: {err:?}");
return;
}
};
commands.insert_resource(TelemetrySender {
exporter,
stats: Arc::new(PublisherStats::default()),
in_flight_counter: Arc::new(AtomicUsize::new(0)),
});
commands.insert_resource(PublishTimer::default());
}
pub fn publish_telemetry(
config: Res<FleetConfig>,
mut timer: ResMut<PublishTimer>,
sender: Option<Res<TelemetrySender>>,
machine_info: Option<Res<MachineInfo>>,
mut session_info: ResMut<session::SessionInfo>,
diagnostics_store: Res<DiagnosticsStore>,
mut event_buffer: ResMut<FleetEventBuffer>,
) {
let Some(sender_res) = sender else {
warn!("TelemetrySender not found");
return;
};
let consecutive_failures = sender_res
.stats
.consecutive_failures
.load(Ordering::Relaxed) as u32;
if consecutive_failures == 0 {
timer.record_success();
} else {
timer.apply_failure_backoff(consecutive_failures);
}
let interval = timer.get_next_interval(config.publish_interval_secs);
let elapsed = timer.last_publish.elapsed().as_secs();
if elapsed < interval {
return;
}
timer.last_publish = std::time::Instant::now();
let metrics = metrics::extract_metrics_from_diagnostics(&diagnostics_store);
let events_to_send = event_buffer.take();
let diagnostics = diagnostics::extract_diagnostics(&diagnostics_store);
let panics = panic_handler::get_panics();
session_info.update_stats(
metrics.len(),
events_to_send.len(),
diagnostics.len(),
panics.len(),
);
let payload = TelemetryPayload {
app_id: config.app_id.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
session_id: session_info.session_id.clone(),
session_start_time: session_info.session_start_time,
session_stats: session_info.stats.clone(),
machine_info: machine_info.map(|info| (*info).clone()),
metrics,
events: events_to_send,
diagnostics,
panics,
};
if let Err(err) = sender_res.send_now(payload) {
warn!("Failed to publish telemetry via Fetch: {err}");
} else {
debug!("Telemetry dispatched via Fetch");
}
}
pub fn log_publisher_stats(
sender: Option<Res<TelemetrySender>>,
mut last_log: Local<Option<std::time::Instant>>,
) {
let Some(sender) = sender else {
return;
};
let last_log_time = last_log.get_or_insert_with(std::time::Instant::now);
if last_log_time.elapsed().as_secs() < 300 {
return;
}
*last_log = Some(std::time::Instant::now());
let sent = sender.stats.total_sent.load(Ordering::Relaxed);
let failed = sender.stats.total_failed.load(Ordering::Relaxed);
let dropped = sender.stats.total_dropped.load(Ordering::Relaxed);
let in_flight = sender.in_flight_counter.load(Ordering::Relaxed);
info!(
"Fleet telemetry stats - Sent: {}, Failed: {}, Dropped: {}, In-flight: {}",
sent, failed, dropped, in_flight
);
}
}
#[cfg(not(target_arch = "wasm32"))]
pub use native::{TelemetrySender, log_publisher_stats, publish_telemetry, setup_publisher};
#[cfg(target_arch = "wasm32")]
pub use web::{TelemetrySender, log_publisher_stats, publish_telemetry, setup_publisher};