use log::{debug, warn};
use protobuf::Message;
use serde_json::{json, Value};
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::{Rc, Weak};
use videocall_diagnostics::{subscribe, DiagEvent, MetricValue};
use videocall_types::protos::health_packet::{
HealthPacket as PbHealthPacket, NetEqNetwork as PbNetEqNetwork,
NetEqOperationCounters as PbNetEqOperationCounters, NetEqStats as PbNetEqStats,
PeerStats as PbPeerStats, VideoStats as PbVideoStats,
};
use videocall_types::protos::packet_wrapper::packet_wrapper::PacketType;
use videocall_types::protos::packet_wrapper::PacketWrapper;
use wasm_bindgen_futures::spawn_local;
use web_time::{SystemTime, UNIX_EPOCH};
use yew::prelude::Callback;
#[derive(Debug, Clone)]
pub struct PeerHealthData {
pub peer_id: String,
pub last_neteq_stats: Option<Value>,
pub last_video_stats: Option<Value>,
pub can_listen: bool,
pub can_see: bool,
pub last_update_ms: u64,
}
impl PeerHealthData {
pub fn new(peer_id: String) -> Self {
Self {
peer_id,
last_neteq_stats: None,
last_video_stats: None,
can_listen: false,
can_see: false,
last_update_ms: 0,
}
}
pub fn update_audio_stats(&mut self, neteq_stats: Value) {
self.last_neteq_stats = Some(neteq_stats);
self.can_listen = true; self.last_update_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
}
pub fn update_video_stats(&mut self, video_stats: Value) {
self.last_video_stats = Some(video_stats);
self.can_see = true; self.last_update_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
}
pub fn mark_audio_timeout(&mut self) {
self.can_listen = false;
}
pub fn mark_video_timeout(&mut self) {
self.can_see = false;
}
}
#[derive(Debug)]
pub struct HealthReporter {
session_id: String,
meeting_id: String, reporting_peer: String,
peer_health_data: Rc<RefCell<HashMap<String, PeerHealthData>>>,
send_packet_callback: Option<Callback<PacketWrapper>>,
health_interval_ms: u64,
reporting_audio_enabled: Rc<RefCell<bool>>,
reporting_video_enabled: Rc<RefCell<bool>>,
active_server_url: Rc<RefCell<Option<String>>>,
active_server_type: Rc<RefCell<Option<String>>>,
active_server_rtt_ms: Rc<RefCell<Option<f64>>>,
}
impl HealthReporter {
pub fn new(session_id: String, reporting_peer: String, health_interval_ms: u64) -> Self {
Self {
session_id,
meeting_id: "".to_string(), reporting_peer,
peer_health_data: Rc::new(RefCell::new(HashMap::new())),
send_packet_callback: None,
health_interval_ms,
reporting_audio_enabled: Rc::new(RefCell::new(false)),
reporting_video_enabled: Rc::new(RefCell::new(false)),
active_server_url: Rc::new(RefCell::new(None)),
active_server_type: Rc::new(RefCell::new(None)),
active_server_rtt_ms: Rc::new(RefCell::new(None)),
}
}
pub fn set_meeting_id(&mut self, meeting_id: String) {
self.meeting_id = meeting_id;
}
pub fn set_reporting_audio_enabled(&self, enabled: bool) {
if let Ok(mut ae) = self.reporting_audio_enabled.try_borrow_mut() {
*ae = enabled;
}
}
pub fn set_reporting_video_enabled(&self, enabled: bool) {
if let Ok(mut ve) = self.reporting_video_enabled.try_borrow_mut() {
*ve = enabled;
}
}
pub fn set_send_packet_callback(&mut self, callback: Callback<PacketWrapper>) {
self.send_packet_callback = Some(callback);
}
pub fn set_health_interval(&mut self, interval_ms: u64) {
self.health_interval_ms = interval_ms;
}
pub fn start_diagnostics_subscription(&self) {
let peer_health_data = Rc::downgrade(&self.peer_health_data);
let audio_enabled = Rc::downgrade(&self.reporting_audio_enabled);
let video_enabled = Rc::downgrade(&self.reporting_video_enabled);
let active_server_url = Rc::downgrade(&self.active_server_url);
let active_server_type = Rc::downgrade(&self.active_server_type);
let active_server_rtt_ms = Rc::downgrade(&self.active_server_rtt_ms);
spawn_local(async move {
debug!("Started health diagnostics subscription");
let mut receiver = subscribe();
while let Ok(event) = receiver.recv().await {
if let Some(peer_health_data) = Weak::upgrade(&peer_health_data) {
if event.subsystem == "sender" {
if let (Some(ae), Some(ve)) =
(Weak::upgrade(&audio_enabled), Weak::upgrade(&video_enabled))
{
for m in &event.metrics {
match m.name {
"sender_audio_enabled" => {
if let MetricValue::U64(v) = &m.value {
*ae.borrow_mut() = *v > 0;
}
}
"sender_video_enabled" => {
if let MetricValue::U64(v) = &m.value {
*ve.borrow_mut() = *v > 0;
}
}
_ => {}
}
}
}
}
if event.subsystem == "connection_manager" {
if let (Some(url_rc), Some(typ_rc), Some(rtt_rc)) = (
Weak::upgrade(&active_server_url),
Weak::upgrade(&active_server_type),
Weak::upgrade(&active_server_rtt_ms),
) {
for m in &event.metrics {
match m.name {
"active_server_url" => {
if let MetricValue::Text(v) = &m.value {
*url_rc.borrow_mut() = Some(v.clone());
}
}
"active_server_type" => {
if let MetricValue::Text(v) = &m.value {
*typ_rc.borrow_mut() = Some(v.clone());
}
}
"active_server_rtt" => {
if let MetricValue::F64(v) = &m.value {
*rtt_rc.borrow_mut() = Some(*v);
}
}
_ => {}
}
}
}
}
Self::process_diagnostics_event(event, &peer_health_data);
} else {
debug!("HealthReporter dropped, stopping diagnostics subscription");
break;
}
}
});
}
fn process_diagnostics_event(
event: DiagEvent,
peer_health_data: &Rc<RefCell<HashMap<String, PeerHealthData>>>,
) {
let mut reporting_peer: Option<String> = None;
let mut target_peer: Option<String> = None;
for metric in &event.metrics {
match metric.name {
"from_peer" => {
if let MetricValue::Text(s) = &metric.value {
reporting_peer = Some(s.clone());
}
}
"to_peer" => {
if let MetricValue::Text(s) = &metric.value {
target_peer = Some(s.clone());
}
}
_ => {}
}
}
if reporting_peer.is_none() || target_peer.is_none() {
if let Some(sid) = event.stream_id.clone() {
let parts: Vec<&str> = sid.split("->").collect();
if parts.len() == 2 {
reporting_peer.get_or_insert(parts[0].to_string());
target_peer.get_or_insert(parts[1].to_string());
}
}
}
let reporting_peer = reporting_peer.unwrap_or_else(|| "unknown".to_string());
let target_peer = target_peer.unwrap_or_else(|| "unknown".to_string());
if event.subsystem == "neteq" {
if let Ok(mut health_map) = peer_health_data.try_borrow_mut() {
let peer_data = health_map
.entry(target_peer.to_string())
.or_insert_with(|| PeerHealthData::new(target_peer.to_string()));
for metric in &event.metrics {
match metric.name {
"stats_json" => {
if let MetricValue::Text(json_str) = &metric.value {
if let Ok(neteq_json) = serde_json::from_str::<Value>(json_str) {
peer_data.update_audio_stats(neteq_json);
peer_data.can_listen = true;
debug!(
"Updated NetEQ stats for peer: {target_peer} (from {reporting_peer})"
);
}
}
}
"audio_buffer_ms" => {
if let MetricValue::U64(buffer_ms) = &metric.value {
peer_data.can_listen = *buffer_ms > 0;
debug!(
"Updated audio health (buffer: {buffer_ms}ms) for peer: {target_peer} (from {reporting_peer})"
);
}
}
"packets_awaiting_decode" => {
if let MetricValue::U64(packets) = &metric.value {
debug!(
"Updated packets awaiting decode: {packets} for peer: {target_peer} (from {reporting_peer})"
);
}
}
_ => {}
}
}
}
}
else if event.subsystem == "decoder" {
if let Ok(mut health_map) = peer_health_data.try_borrow_mut() {
let peer_data = health_map
.entry(target_peer.to_string())
.or_insert_with(|| PeerHealthData::new(target_peer.to_string()));
for metric in &event.metrics {
match metric.name {
"fps" => {
if let MetricValue::F64(fps) = &metric.value {
peer_data.can_see = *fps > 0.0;
debug!(
"Updated video health (FPS: {fps:.2}) for peer: {target_peer} (from {reporting_peer})"
);
}
}
"media_type" => {
if let MetricValue::Text(media_type) = &metric.value {
if media_type.contains("AUDIO") {
peer_data.can_listen = true;
} else if media_type.contains("VIDEO")
|| media_type.contains("SCREEN")
{
peer_data.can_see = true;
}
debug!(
"Updated media health ({media_type}) for peer: {target_peer} (from {reporting_peer})"
);
}
}
_ => {}
}
}
}
}
else if event.subsystem == "sender" {
debug!(
"Received sender event for peer: {} at {}",
target_peer, event.ts_ms
);
}
else if event.subsystem == "peer_status" {
if let Ok(mut health_map) = peer_health_data.try_borrow_mut() {
let peer_data = health_map
.entry(target_peer.to_string())
.or_insert_with(|| PeerHealthData::new(target_peer.to_string()));
for metric in &event.metrics {
match metric.name {
"audio_enabled" => {
if let MetricValue::U64(v) = &metric.value {
peer_data.can_listen = *v > 0;
}
}
"video_enabled" => {
if let MetricValue::U64(v) = &metric.value {
peer_data.can_see = *v > 0;
}
}
_ => {}
}
}
}
}
else if event.subsystem == "video_decoder" || event.subsystem == "video" {
if let Ok(mut health_map) = peer_health_data.try_borrow_mut() {
let peer_data = health_map
.entry(target_peer.to_string())
.or_insert_with(|| PeerHealthData::new(target_peer.to_string()));
let mut video_stats = match &peer_data.last_video_stats {
Some(Value::Object(map)) => Value::Object(map.clone()),
_ => json!({}),
};
video_stats["timestamp_ms"] = json!(event.ts_ms);
for metric in &event.metrics {
match metric.name {
"fps_received" => {
if let MetricValue::F64(fps) = &metric.value {
video_stats["fps_received"] = json!(fps);
peer_data.can_see = *fps > 0.0;
}
}
"frames_buffered" | "packets_buffered" => match &metric.value {
MetricValue::U64(v) => {
video_stats["frames_buffered"] = json!(v);
}
MetricValue::F64(v) => {
video_stats["frames_buffered"] = json!(v);
}
_ => {}
},
"frames_decoded" => {
if let MetricValue::U64(frames) = &metric.value {
video_stats["frames_decoded"] = json!(frames);
}
}
"bitrate_kbps" => {
if let MetricValue::U64(bitrate) = &metric.value {
video_stats["bitrate_kbps"] = json!(bitrate);
}
}
_ => {}
}
}
peer_data.update_video_stats(video_stats);
debug!("Updated video health for peer: {target_peer}");
}
}
}
pub fn start_health_reporting(&self) {
if self.send_packet_callback.is_none() {
warn!("Cannot start health reporting: no send packet callback set");
return;
}
let peer_health_data = Rc::downgrade(&self.peer_health_data);
let session_id = self.session_id.clone();
let meeting_id = self.meeting_id.clone();
let reporting_peer = self.reporting_peer.clone();
let send_callback = self.send_packet_callback.clone().unwrap();
let interval_ms = self.health_interval_ms;
let audio_enabled = Rc::downgrade(&self.reporting_audio_enabled);
let video_enabled = Rc::downgrade(&self.reporting_video_enabled);
let active_server_url = Rc::downgrade(&self.active_server_url);
let active_server_type = Rc::downgrade(&self.active_server_type);
let active_server_rtt_ms = Rc::downgrade(&self.active_server_rtt_ms);
spawn_local(async move {
debug!("Started health reporting with interval: {interval_ms}ms");
loop {
gloo_timers::future::TimeoutFuture::new(interval_ms as u32).await;
if let Some(peer_health_data) = Weak::upgrade(&peer_health_data) {
if let Ok(health_map) = peer_health_data.try_borrow() {
let self_audio_enabled = Weak::upgrade(&audio_enabled)
.and_then(|ae| ae.try_borrow().ok().map(|v| *v))
.unwrap_or(false);
let self_video_enabled = Weak::upgrade(&video_enabled)
.and_then(|ve| ve.try_borrow().ok().map(|v| *v))
.unwrap_or(false);
let active_url = Weak::upgrade(&active_server_url)
.and_then(|rc| rc.try_borrow().ok().and_then(|v| v.clone()));
let active_type = Weak::upgrade(&active_server_type)
.and_then(|rc| rc.try_borrow().ok().and_then(|v| v.clone()));
let active_rtt = Weak::upgrade(&active_server_rtt_ms)
.and_then(|rc| rc.try_borrow().ok().and_then(|v| *v));
let health_packet = Self::create_health_packet(
&session_id,
&meeting_id,
&reporting_peer,
&health_map,
self_audio_enabled,
self_video_enabled,
active_url,
active_type,
active_rtt,
);
if let Some(packet) = health_packet {
send_callback.emit(packet);
debug!("Sent health packet for session: {session_id}");
}
}
} else {
debug!("HealthReporter dropped, stopping health reporting");
break;
}
}
});
}
#[allow(clippy::too_many_arguments)]
fn create_health_packet(
session_id: &str,
meeting_id: &str,
reporting_peer: &str,
health_map: &HashMap<String, PeerHealthData>,
self_audio_enabled: bool,
self_video_enabled: bool,
active_server_url: Option<String>,
active_server_type: Option<String>,
active_server_rtt_ms: Option<f64>,
) -> Option<PacketWrapper> {
if health_map.is_empty() {
return None;
}
let mut pb = PbHealthPacket::new();
pb.session_id = session_id.to_string();
pb.meeting_id = meeting_id.to_string();
pb.reporting_peer = reporting_peer.to_string();
pb.timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
pb.reporting_audio_enabled = self_audio_enabled;
pb.reporting_video_enabled = self_video_enabled;
if let Some(url) = active_server_url {
pb.active_server_url = url;
}
if let Some(typ) = active_server_type {
pb.active_server_type = typ;
}
if let Some(rtt) = active_server_rtt_ms {
pb.active_server_rtt_ms = rtt;
}
for (peer_id, health_data) in health_map.iter() {
let mut ps = PbPeerStats::new();
ps.can_listen = health_data.can_listen;
ps.can_see = health_data.can_see;
ps.audio_enabled = health_data.can_listen;
ps.video_enabled = health_data.can_see;
if let Some(neteq) = &health_data.last_neteq_stats {
let mut ns = PbNetEqStats::new();
if let Some(v) = neteq.get("current_buffer_size_ms").and_then(|v| v.as_f64()) {
ns.current_buffer_size_ms = v;
}
if let Some(v) = neteq
.get("packets_awaiting_decode")
.and_then(|v| v.as_f64())
{
ns.packets_awaiting_decode = v;
}
if let Some(v) = neteq.get("packets_per_sec").and_then(|v| v.as_f64()) {
ns.packets_per_sec = v;
}
if let Some(network) = neteq.get("network") {
let mut nn = PbNetEqNetwork::new();
if let Some(counters) = network.get("operation_counters") {
let mut oc = PbNetEqOperationCounters::new();
if let Some(v) = counters.get("normal_per_sec").and_then(|v| v.as_f64()) {
oc.normal_per_sec = v;
}
if let Some(v) = counters.get("expand_per_sec").and_then(|v| v.as_f64()) {
oc.expand_per_sec = v;
}
if let Some(v) = counters.get("accelerate_per_sec").and_then(|v| v.as_f64())
{
oc.accelerate_per_sec = v;
}
if let Some(v) = counters
.get("fast_accelerate_per_sec")
.and_then(|v| v.as_f64())
{
oc.fast_accelerate_per_sec = v;
}
if let Some(v) = counters
.get("preemptive_expand_per_sec")
.and_then(|v| v.as_f64())
{
oc.preemptive_expand_per_sec = v;
}
if let Some(v) = counters.get("merge_per_sec").and_then(|v| v.as_f64()) {
oc.merge_per_sec = v;
}
if let Some(v) = counters
.get("comfort_noise_per_sec")
.and_then(|v| v.as_f64())
{
oc.comfort_noise_per_sec = v;
}
if let Some(v) = counters.get("dtmf_per_sec").and_then(|v| v.as_f64()) {
oc.dtmf_per_sec = v;
}
if let Some(v) = counters.get("undefined_per_sec").and_then(|v| v.as_f64())
{
oc.undefined_per_sec = v;
}
nn.operation_counters = ::protobuf::MessageField::some(oc);
}
ns.network = ::protobuf::MessageField::some(nn);
}
ps.neteq_stats = ::protobuf::MessageField::some(ns);
}
if let Some(video) = &health_data.last_video_stats {
let mut vs = PbVideoStats::new();
if let Some(v) = video.get("fps_received").and_then(|v| v.as_f64()) {
vs.fps_received = v;
}
if let Some(v) = video.get("frames_buffered").and_then(|v| v.as_f64()) {
vs.frames_buffered = v;
}
if let Some(v) = video.get("frames_decoded").and_then(|v| v.as_u64()) {
vs.frames_decoded = v;
}
if let Some(v) = video.get("bitrate_kbps").and_then(|v| v.as_u64()) {
vs.bitrate_kbps = v;
}
ps.video_stats = ::protobuf::MessageField::some(vs);
}
pb.peer_stats.insert(peer_id.clone(), ps);
}
let bytes = pb.write_to_bytes().unwrap_or_default();
Some(PacketWrapper {
packet_type: PacketType::HEALTH.into(),
email: reporting_peer.to_string(),
data: bytes,
..Default::default()
})
}
pub fn remove_peer(&self, peer_id: &str) {
if let Ok(mut health_map) = self.peer_health_data.try_borrow_mut() {
health_map.remove(peer_id);
debug!("Removed peer from health tracking: {peer_id}");
}
}
pub fn get_health_summary(&self) -> Option<Value> {
if let Ok(health_map) = self.peer_health_data.try_borrow() {
let summary = health_map
.iter()
.map(|(peer_id, health_data)| {
(
peer_id.clone(),
json!({
"can_listen": health_data.can_listen,
"can_see": health_data.can_see,
"last_update_ms": health_data.last_update_ms
}),
)
})
.collect::<serde_json::Map<_, _>>();
Some(Value::Object(summary))
} else {
None
}
}
}