use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::wire::webrtc::{SignalingClient, WebRtcCoordinator};
use tokio_util::sync::CancellationToken;
const NETWORK_EVENT_SETTLE_WINDOW: Duration = Duration::from_millis(400);
const SIGNALING_PROBE_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum NetworkEvent {
Available,
Lost,
TypeChanged { is_wifi: bool, is_cellular: bool },
CleanupConnections,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NetworkRecoveryAction {
Noop,
Offline,
Restore,
CleanupConnectionsCompat,
}
#[derive(Debug, Clone)]
pub struct NetworkEventResult {
pub event: NetworkEvent,
pub success: bool,
pub error: Option<String>,
pub duration_ms: u64,
}
impl NetworkEventResult {
pub fn success(event: NetworkEvent, duration_ms: u64) -> Self {
Self {
event,
success: true,
error: None,
duration_ms,
}
}
pub fn failure(event: NetworkEvent, error: String, duration_ms: u64) -> Self {
Self {
event,
success: false,
error: Some(error),
duration_ms,
}
}
}
#[async_trait::async_trait]
pub trait NetworkEventProcessor: Send + Sync {
async fn process_network_available(&self) -> Result<(), String>;
async fn process_network_lost(&self) -> Result<(), String>;
async fn process_network_type_changed(
&self,
is_wifi: bool,
is_cellular: bool,
) -> Result<(), String>;
async fn cleanup_connections(&self) -> Result<(), String>;
async fn process_network_recovery_action(
&self,
action: NetworkRecoveryAction,
) -> Result<(), String> {
match action {
NetworkRecoveryAction::Noop => Ok(()),
NetworkRecoveryAction::Offline => self.process_network_lost().await,
NetworkRecoveryAction::Restore => self.process_network_available().await,
NetworkRecoveryAction::CleanupConnectionsCompat => self.cleanup_connections().await,
}
}
}
#[derive(Debug, Clone)]
pub struct DebounceConfig {
pub window: Duration,
}
impl Default for DebounceConfig {
fn default() -> Self {
Self {
window: Duration::from_secs(2),
}
}
}
#[derive(Debug)]
struct DebounceState {
last_available: tokio::sync::Mutex<Option<Instant>>,
last_lost: tokio::sync::Mutex<Option<Instant>>,
last_type_changed: tokio::sync::Mutex<Option<Instant>>,
}
impl DebounceState {
fn new() -> Self {
Self {
last_available: tokio::sync::Mutex::new(None),
last_lost: tokio::sync::Mutex::new(None),
last_type_changed: tokio::sync::Mutex::new(None),
}
}
}
#[derive(Debug)]
struct SignalingRecoveryState {
connect_lock: tokio::sync::Mutex<()>,
last_successful_connect: tokio::sync::Mutex<Option<Instant>>,
}
impl SignalingRecoveryState {
fn new() -> Self {
Self {
connect_lock: tokio::sync::Mutex::new(()),
last_successful_connect: tokio::sync::Mutex::new(None),
}
}
}
pub struct DefaultNetworkEventProcessor {
signaling_client: Arc<dyn SignalingClient>,
webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
debounce_config: DebounceConfig,
debounce_state: Arc<DebounceState>,
recovery_state: Arc<SignalingRecoveryState>,
}
impl DefaultNetworkEventProcessor {
pub fn new(
signaling_client: Arc<dyn SignalingClient>,
webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
) -> Self {
Self::new_with_debounce(
signaling_client,
webrtc_coordinator,
DebounceConfig::default(),
)
}
pub fn new_with_debounce(
signaling_client: Arc<dyn SignalingClient>,
webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
debounce_config: DebounceConfig,
) -> Self {
Self {
signaling_client,
webrtc_coordinator,
debounce_config,
debounce_state: Arc::new(DebounceState::new()),
recovery_state: Arc::new(SignalingRecoveryState::new()),
}
}
async fn should_process_event(&self, event: &NetworkEvent) -> bool {
let now = Instant::now();
match event {
NetworkEvent::Available => {
let mut last = self.debounce_state.last_available.lock().await;
if let Some(last_time) = *last {
if now.duration_since(last_time) < self.debounce_config.window {
tracing::debug!(
"⏸️ Debouncing Network Available event (last event was {:?} ago)",
now.duration_since(last_time)
);
return false;
}
}
*last = Some(now);
true
}
NetworkEvent::Lost => {
let mut last = self.debounce_state.last_lost.lock().await;
if let Some(last_time) = *last {
if now.duration_since(last_time) < self.debounce_config.window {
tracing::debug!(
"⏸️ Debouncing Network Lost event (last event was {:?} ago)",
now.duration_since(last_time)
);
return false;
}
}
*last = Some(now);
true
}
NetworkEvent::TypeChanged { .. } => {
let mut last = self.debounce_state.last_type_changed.lock().await;
if let Some(last_time) = *last {
if now.duration_since(last_time) < self.debounce_config.window {
tracing::debug!(
"⏸️ Debouncing Network TypeChanged event (last event was {:?} ago)",
now.duration_since(last_time)
);
return false;
}
}
*last = Some(now);
true
}
NetworkEvent::CleanupConnections => {
tracing::debug!(
"🧹 CleanupConnections event - no debouncing (always execute immediately)"
);
true
}
}
}
async fn ensure_signaling_connected_once(&self, reason: &str) -> Result<(), String> {
let _guard = self.recovery_state.connect_lock.lock().await;
if self.signaling_client.is_connected() {
tracing::debug!(
reason = reason,
"Signaling already connected, skipping connect"
);
return Ok(());
}
let recently_connected = {
let last = self.recovery_state.last_successful_connect.lock().await;
last.map(|instant| instant.elapsed() < Duration::from_millis(1500))
.unwrap_or(false)
};
if recently_connected && self.signaling_client.is_connected() {
tracing::debug!(
reason = reason,
"Signaling recently connected, reusing connection"
);
return Ok(());
}
tracing::info!(reason = reason, "🔄 Connecting signaling");
self.signaling_client.connect_once().await.map_err(|e| {
let err_msg = format!("WebSocket connect failed: {}", e);
tracing::error!("❌ {}", err_msg);
err_msg
})?;
*self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
tracing::info!(reason = reason, "✅ Signaling connected");
Ok(())
}
async fn ensure_signaling_healthy_once(&self, reason: &str) -> Result<(), String> {
let _guard = self.recovery_state.connect_lock.lock().await;
if !self.signaling_client.is_connected() {
tracing::info!(reason = reason, "🔄 Connecting signaling");
self.signaling_client.connect_once().await.map_err(|e| {
let err_msg = format!("WebSocket connect failed: {}", e);
tracing::error!("❌ {}", err_msg);
err_msg
})?;
*self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
tracing::info!(reason = reason, "✅ Signaling connected");
return Ok(());
}
tracing::debug!(
reason = reason,
timeout_ms = SIGNALING_PROBE_TIMEOUT.as_millis() as u64,
"🔎 Probing existing signaling WebSocket"
);
match self
.signaling_client
.probe_alive(SIGNALING_PROBE_TIMEOUT)
.await
{
Ok(()) => {
tracing::debug!(
reason = reason,
"✅ Signaling probe succeeded; keeping existing WebSocket"
);
Ok(())
}
Err(e) => {
tracing::warn!(
reason = reason,
"⚠️ Signaling probe failed; rebuilding WebSocket: {}",
e
);
if let Err(disconnect_err) = self.signaling_client.disconnect().await {
tracing::warn!(
reason = reason,
"⚠️ Failed to disconnect unhealthy signaling before rebuild: {}",
disconnect_err
);
}
tracing::info!(reason = reason, "🔄 Rebuilding signaling: connecting");
self.signaling_client
.connect_once()
.await
.map_err(|connect_err| {
let err_msg = format!("WebSocket rebuild failed: {}", connect_err);
tracing::error!("❌ {}", err_msg);
err_msg
})?;
*self.recovery_state.last_successful_connect.lock().await = Some(Instant::now());
tracing::info!(reason = reason, "✅ Signaling rebuilt");
Ok(())
}
}
}
async fn restore_signaling_and_webrtc(&self, reason: &str) -> Result<(), String> {
let recovery_targets = if let Some(coordinator) = self.webrtc_coordinator.clone() {
coordinator.begin_network_recovery(reason).await
} else {
Vec::new()
};
self.ensure_signaling_healthy_once(reason).await?;
let coordinator = self.webrtc_coordinator.clone();
if let Some(coordinator) = coordinator {
if recovery_targets.is_empty() {
tracing::info!("♻️ Resuming ICE restart for peers already in network recovery");
} else {
tracing::info!("♻️ Triggering ICE restart for recovering connections...");
}
coordinator.restart_network_recovery_connections().await;
}
Ok(())
}
async fn process_offline(&self) -> Result<(), String> {
tracing::info!("📱 Processing: Network offline");
if let Some(ref coordinator) = self.webrtc_coordinator {
coordinator.begin_network_recovery("NetworkLost").await;
tracing::info!("🧹 Clearing pending ICE restart attempts...");
coordinator.clear_pending_restarts().await;
}
if self.signaling_client.is_connected() {
tracing::info!("🔌 Disconnecting WebSocket...");
let _ = self.signaling_client.disconnect().await;
}
Ok(())
}
}
#[async_trait::async_trait]
impl NetworkEventProcessor for DefaultNetworkEventProcessor {
async fn process_network_available(&self) -> Result<(), String> {
let should_process = self.should_process_event(&NetworkEvent::Available).await;
if !should_process && self.signaling_client.is_connected() {
return Ok(());
}
tracing::info!("📱 Processing: Network available");
self.restore_signaling_and_webrtc("NetworkAvailable").await
}
async fn process_network_lost(&self) -> Result<(), String> {
if !self.should_process_event(&NetworkEvent::Lost).await {
return Ok(());
}
self.process_offline().await
}
async fn process_network_type_changed(
&self,
is_wifi: bool,
is_cellular: bool,
) -> Result<(), String> {
let should_process = self
.should_process_event(&NetworkEvent::TypeChanged {
is_wifi,
is_cellular,
})
.await;
if !should_process && self.signaling_client.is_connected() {
return Ok(());
}
tracing::info!(
"📱 Processing: Network type changed (WiFi={}, Cellular={})",
is_wifi,
is_cellular
);
self.restore_signaling_and_webrtc("NetworkTypeChanged")
.await
}
async fn cleanup_connections(&self) -> Result<(), String> {
let _cleanup_guard = self
.webrtc_coordinator
.as_ref()
.map(|coordinator| coordinator.cleanup_guard());
tracing::info!("🧹 Manually cleaning up all connections...");
if let Some(ref coordinator) = self.webrtc_coordinator {
tracing::info!("♻️ Clearing pending ICE restart attempts...");
coordinator.clear_pending_restarts().await;
tracing::info!("🔻 Closing all WebRTC peer connections...");
if let Err(e) = coordinator.close_all_peers().await {
let err_msg = format!("Failed to close all peers: {}", e);
tracing::warn!("⚠️ {}", err_msg);
} else {
tracing::info!("✅ All WebRTC peer connections closed");
}
}
if self.signaling_client.is_connected() {
tracing::info!("🔌 Disconnecting WebSocket...");
match self.signaling_client.disconnect().await {
Ok(_) => {
tracing::info!("✅ WebSocket disconnected successfully");
}
Err(e) => {
let err_msg = format!("Failed to disconnect WebSocket: {}", e);
tracing::warn!("⚠️ {}", err_msg);
}
}
}
tracing::info!("✅ Connection cleanup completed");
tracing::info!("🔌 Re-establishing signaling connection...");
self.ensure_signaling_connected_once("CompatCleanupConnections")
.await?;
tracing::info!("✅ Connection cleanup and reconnect completed");
Ok(())
}
async fn process_network_recovery_action(
&self,
action: NetworkRecoveryAction,
) -> Result<(), String> {
match action {
NetworkRecoveryAction::Noop => Ok(()),
NetworkRecoveryAction::Offline => self.process_offline().await,
NetworkRecoveryAction::Restore => {
self.restore_signaling_and_webrtc("NetworkEventBatch").await
}
NetworkRecoveryAction::CleanupConnectionsCompat => self.cleanup_connections().await,
}
}
}
pub fn select_network_recovery_action(events: &[NetworkEvent]) -> NetworkRecoveryAction {
let mut saw_cleanup_connections = false;
let mut latest_state_action = NetworkRecoveryAction::Noop;
for event in events {
match event {
NetworkEvent::CleanupConnections => saw_cleanup_connections = true,
NetworkEvent::Available | NetworkEvent::TypeChanged { .. } => {
latest_state_action = NetworkRecoveryAction::Restore
}
NetworkEvent::Lost => latest_state_action = NetworkRecoveryAction::Offline,
}
}
if saw_cleanup_connections {
NetworkRecoveryAction::CleanupConnectionsCompat
} else {
latest_state_action
}
}
pub async fn process_network_event_batch(
events: Vec<NetworkEvent>,
processor: Arc<dyn NetworkEventProcessor>,
) -> Vec<NetworkEventResult> {
if events.is_empty() {
return Vec::new();
}
let action = select_network_recovery_action(&events);
let start = Instant::now();
let result = processor.process_network_recovery_action(action).await;
let duration_ms = start.elapsed().as_millis() as u64;
events
.into_iter()
.map(|event| match &result {
Ok(()) => NetworkEventResult::success(event, duration_ms),
Err(e) => NetworkEventResult::failure(event, e.clone(), duration_ms),
})
.collect()
}
pub async fn run_network_event_reconciler(
mut event_rx: tokio::sync::mpsc::Receiver<NetworkEvent>,
result_tx: tokio::sync::mpsc::Sender<NetworkEventResult>,
processor: Arc<dyn NetworkEventProcessor>,
shutdown_token: CancellationToken,
) {
tracing::info!("🔄 Network event reconciler started");
loop {
tokio::select! {
Some(first_event) = event_rx.recv() => {
let mut events = vec![first_event];
let settle = tokio::time::sleep(NETWORK_EVENT_SETTLE_WINDOW);
tokio::pin!(settle);
loop {
tokio::select! {
Some(next_event) = event_rx.recv() => {
events.push(next_event);
}
_ = &mut settle => {
break;
}
_ = shutdown_token.cancelled() => {
tracing::info!("🛑 Network event reconciler shutting down");
return;
}
else => {
break;
}
}
}
while let Ok(next_event) = event_rx.try_recv() {
events.push(next_event);
}
let action = select_network_recovery_action(&events);
tracing::info!(
event_count = events.len(),
action = ?action,
settle_window_ms = NETWORK_EVENT_SETTLE_WINDOW.as_millis() as u64,
"📱 Processing settled network event batch"
);
let results = process_network_event_batch(events, processor.clone()).await;
for result in results {
if let Err(e) = result_tx.send(result).await {
tracing::warn!("Failed to send event result: {}", e);
}
}
}
_ = shutdown_token.cancelled() => {
tracing::info!("🛑 Network event reconciler shutting down");
break;
}
else => break,
}
}
}
pub struct NetworkEventHandle {
event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
result_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<NetworkEventResult>>>,
}
impl NetworkEventHandle {
pub fn new(
event_tx: tokio::sync::mpsc::Sender<NetworkEvent>,
result_rx: tokio::sync::mpsc::Receiver<NetworkEventResult>,
) -> Self {
Self {
event_tx,
result_rx: Arc::new(tokio::sync::Mutex::new(result_rx)),
}
}
pub async fn handle_network_available(&self) -> Result<NetworkEventResult, String> {
self.send_event_and_await_result(NetworkEvent::Available)
.await
}
pub async fn handle_network_lost(&self) -> Result<NetworkEventResult, String> {
self.send_event_and_await_result(NetworkEvent::Lost).await
}
pub async fn handle_network_type_changed(
&self,
is_wifi: bool,
is_cellular: bool,
) -> Result<NetworkEventResult, String> {
self.send_event_and_await_result(NetworkEvent::TypeChanged {
is_wifi,
is_cellular,
})
.await
}
pub async fn cleanup_connections(&self) -> Result<NetworkEventResult, String> {
self.send_event_and_await_result(NetworkEvent::CleanupConnections)
.await
}
async fn send_event_and_await_result(
&self,
event: NetworkEvent,
) -> Result<NetworkEventResult, String> {
self.event_tx
.send(event.clone())
.await
.map_err(|e| format!("Failed to send network event: {}", e))?;
let mut rx = self.result_rx.lock().await;
rx.recv()
.await
.ok_or_else(|| "Failed to receive network event result".to_string())
}
}
impl Clone for NetworkEventHandle {
fn clone(&self) -> Self {
Self {
event_tx: self.event_tx.clone(),
result_rx: self.result_rx.clone(),
}
}
}