use super::connection_manager::{ConnectionManager, ConnectionManagerOptions, ConnectionState};
use crate::crypto::aes::Aes128State;
use anyhow::{anyhow, Result};
use gloo::timers::callback::Interval;
use log::{debug, info, warn};
use std::cell::RefCell;
use std::rc::{Rc, Weak};
use videocall_types::protos::packet_wrapper::PacketWrapper;
#[derive(Debug)]
pub struct ConnectionController {
manager: Rc<RefCell<ConnectionManager>>,
_timers: Vec<Interval>, }
impl ConnectionController {
pub fn new(options: ConnectionManagerOptions, aes: Rc<Aes128State>) -> Result<Self> {
info!("Creating ConnectionController with timer management");
let manager = Rc::new(RefCell::new(ConnectionManager::new(
options.clone(),
aes.clone(),
)?));
manager
.borrow_mut()
.set_manager_ref(Rc::downgrade(&manager));
manager.borrow_mut().initialize()?;
let timers = Self::start_timers(Rc::downgrade(&manager));
info!("ConnectionController created with all timers started");
Ok(Self {
manager,
_timers: timers,
})
}
fn start_timers(mgr_weak: Weak<RefCell<ConnectionManager>>) -> Vec<Interval> {
let mut timers = Vec::new();
let mgr_ref = mgr_weak.clone();
timers.push(Interval::new(1000, move || {
if let Some(mgr) = mgr_ref.upgrade() {
if let Ok(mut mgr) = mgr.try_borrow_mut() {
mgr.trigger_diagnostics_report();
if matches!(
mgr.get_connection_state(),
ConnectionState::Connected { .. }
) {
if let Err(e) = mgr.send_rtt_probes() {
debug!("Failed to send 1Hz RTT probe post-election: {e}");
}
if mgr.check_rtt_degradation() {
if let Err(e) = mgr.start_reelection() {
log::error!("Failed to start re-election: {e}");
}
}
}
} else {
warn!("1Hz diagnostics timer: skipped — ConnectionManager already borrowed");
}
}
}));
let mgr_ref = mgr_weak.clone();
timers.push(Interval::new(200, move || {
if let Some(mgr) = mgr_ref.upgrade() {
if let Ok(mut mgr) = mgr.try_borrow_mut() {
if matches!(mgr.get_connection_state(), ConnectionState::Testing { .. }) {
if let Err(e) = mgr.send_rtt_probes() {
debug!("Failed to send RTT probes during election: {e}");
}
}
} else {
warn!("200ms RTT probe timer: skipped — ConnectionManager already borrowed");
}
}
}));
let mgr_ref = mgr_weak.clone();
timers.push(Interval::new(100, move || {
if let Some(mgr) = mgr_ref.upgrade() {
if let Ok(mut mgr) = mgr.try_borrow_mut() {
mgr.check_and_complete_election();
} else {
warn!(
"100ms election check timer: skipped — ConnectionManager already borrowed"
);
}
}
}));
info!("All ConnectionController timers started");
timers
}
pub fn send_packet(&self, packet: PacketWrapper) -> Result<()> {
let mgr = self
.manager
.try_borrow()
.map_err(|_| anyhow!("Failed to borrow ConnectionManager"))?;
mgr.send_packet(packet)
}
#[allow(dead_code)]
pub fn send_packet_datagram(&self, packet: PacketWrapper) -> Result<()> {
let mgr = self
.manager
.try_borrow()
.map_err(|_| anyhow!("Failed to borrow ConnectionManager"))?;
mgr.send_packet_datagram(packet)
}
pub fn set_video_enabled(&self, enabled: bool) -> Result<()> {
let mgr = self
.manager
.try_borrow()
.map_err(|_| anyhow!("Failed to borrow ConnectionManager"))?;
mgr.set_video_enabled(enabled)
}
pub fn set_audio_enabled(&self, enabled: bool) -> Result<()> {
let mgr = self
.manager
.try_borrow()
.map_err(|_| anyhow!("Failed to borrow ConnectionManager"))?;
mgr.set_audio_enabled(enabled)
}
pub fn set_screen_enabled(&self, enabled: bool) -> Result<()> {
let mgr = self
.manager
.try_borrow()
.map_err(|_| anyhow!("Failed to borrow ConnectionManager"))?;
mgr.set_screen_enabled(enabled)
}
pub fn set_speaking(&self, speaking: bool) {
if let Ok(mgr) = self.manager.try_borrow() {
mgr.set_speaking(speaking);
}
}
pub fn set_own_session_id(&self, session_id: u64) -> Result<()> {
let mgr = self
.manager
.try_borrow()
.map_err(|_| anyhow!("Failed to borrow ConnectionManager"))?;
mgr.set_own_session_id(session_id);
Ok(())
}
pub fn is_connected(&self) -> bool {
if let Ok(mgr) = self.manager.try_borrow() {
mgr.is_connected()
} else {
false
}
}
pub fn disconnect(&self) -> anyhow::Result<()> {
let mut mgr = self
.manager
.try_borrow_mut()
.map_err(|_| anyhow!("Failed to borrow ConnectionManager"))?;
mgr.disconnect()
}
pub fn get_connection_state(&self) -> ConnectionState {
if let Ok(mgr) = self.manager.try_borrow() {
mgr.get_connection_state()
} else {
ConnectionState::Failed {
error: "Failed to borrow ConnectionManager".to_string(),
last_known_server: None,
}
}
}
pub fn get_rtt_measurements_clone(
&self,
) -> std::collections::HashMap<String, super::connection_manager::ServerRttMeasurement> {
if let Ok(mgr) = self.manager.try_borrow() {
mgr.get_rtt_measurements().clone()
} else {
std::collections::HashMap::new()
}
}
}
impl Drop for ConnectionController {
fn drop(&mut self) {
info!("Dropping ConnectionController and cleaning up timers");
if let Ok(mut mgr) = self.manager.try_borrow_mut() {
mgr.disconnect().ok();
}
}
}
#[cfg(all(test, target_arch = "wasm32"))]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use videocall_types::Callback;
#[derive(Debug, Clone)]
struct StateCapture {
states: Arc<Mutex<Vec<ConnectionState>>>,
}
impl StateCapture {
fn new() -> Self {
Self {
states: Arc::new(Mutex::new(Vec::new())),
}
}
fn callback(&self) -> Callback<ConnectionState> {
let states = self.states.clone();
Callback::from(move |state: ConnectionState| {
states.lock().unwrap().push(state);
})
}
fn get_states(&self) -> Vec<ConnectionState> {
self.states.lock().unwrap().clone()
}
fn last_state(&self) -> Option<ConnectionState> {
self.states.lock().unwrap().last().cloned()
}
}
fn create_test_options(state_capture: &StateCapture) -> ConnectionManagerOptions {
ConnectionManagerOptions {
websocket_urls: vec!["ws://localhost:8080".to_string()],
webtransport_urls: vec!["https://localhost:8443".to_string()],
userid: "test_user".to_string(),
on_inbound_media: Callback::from(|_| {}),
on_state_changed: state_capture.callback(),
peer_monitor: Callback::from(|_| {}),
election_period_ms: 1000,
}
}
fn create_test_aes() -> Rc<Aes128State> {
let key = vec![1u8; 16];
let iv = vec![2u8; 16];
Rc::new(Aes128State::from_vecs(key, iv, true))
}
#[test]
fn test_connection_controller_creation() {
let state_capture = StateCapture::new();
let options = create_test_options(&state_capture);
let aes = create_test_aes();
let _controller = ConnectionController::new(options, aes);
}
#[test]
fn test_connection_controller_delegation() {
let state_capture = StateCapture::new();
let options = create_test_options(&state_capture);
let aes = create_test_aes();
if let Ok(controller) = ConnectionController::new(options, aes) {
let _is_connected = controller.is_connected();
let _state = controller.get_connection_state();
let _measurements = controller.get_rtt_measurements_clone();
}
}
}