use anyhow::{Context, Result};
use futures_util::{SinkExt, StreamExt};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{error, info, warn};
use crate::webrtc::{SignalingMessage, WebRTCConnection, WebRTCServer};
pub struct SignalingRelayConfig {
pub url: String,
pub robot_id: String,
pub api_key: String,
pub reconnect_interval_ms: u64,
}
pub async fn start_signaling_relay_client(
config: SignalingRelayConfig,
webrtc_server: Arc<WebRTCServer>,
) -> Result<()> {
info!(
"🌐 Starting WebRTC signaling relay client for robot {}",
config.robot_id
);
info!("📡 Connecting to relay: {}", config.url);
loop {
match connect_to_relay(&config, Arc::clone(&webrtc_server)).await {
Ok(()) => {
info!("Relay connection closed normally");
}
Err(e) => {
error!("Relay connection error: {}", e);
}
}
let delay = Duration::from_millis(config.reconnect_interval_ms);
info!("Reconnecting to signaling relay in {:?}...", delay);
tokio::time::sleep(delay).await;
}
}
async fn connect_to_relay(config: &SignalingRelayConfig, webrtc_server: Arc<WebRTCServer>) -> Result<()> {
let ws_url = if config.url.starts_with("http://") {
config.url.replacen("http://", "ws://", 1)
} else if config.url.starts_with("https://") {
config.url.replacen("https://", "wss://", 1)
} else {
config.url.clone()
};
let url = format!(
"{}?type=robot&robot_id={}&api_key={}",
ws_url, config.robot_id, config.api_key
);
info!("Connecting to signaling relay...");
let (ws_stream, _) = connect_async(&url).await.context("Failed to connect to relay")?;
let (mut write, mut read) = ws_stream.split();
info!("✅ Connected to signaling relay");
let current_connection: Arc<Mutex<Option<Arc<WebRTCConnection>>>> = Arc::new(Mutex::new(None));
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
match handle_relay_message(&text, &webrtc_server, ¤t_connection, &mut write).await {
Ok(()) => {}
Err(e) => {
error!("Error handling relay message: {}", e);
}
}
}
Ok(Message::Close(_)) => {
info!("Relay closed connection");
break;
}
Ok(Message::Ping(data)) => {
if let Err(e) = write.send(Message::Pong(data)).await {
error!("Failed to send pong: {}", e);
break;
}
}
Err(e) => {
error!("Relay WebSocket error: {}", e);
break;
}
_ => {}
}
}
if let Some(conn) = current_connection.lock().await.take() {
info!("🧹 Cleaning up WebRTC connection");
if let Err(e) = conn.peer_connection().close().await {
error!("Error closing peer connection: {}", e);
}
}
Ok(())
}
async fn handle_relay_message(
text: &str,
webrtc_server: &Arc<WebRTCServer>,
current_connection: &Arc<Mutex<Option<Arc<WebRTCConnection>>>>,
write: &mut futures_util::stream::SplitSink<
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
Message,
>,
) -> Result<()> {
let msg: SignalingMessage = match serde_json::from_str(text) {
Ok(m) => m,
Err(_) => {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(text) {
if json.get("type").and_then(|v| v.as_str()) == Some("welcome") {
info!("✅ Received welcome from relay");
return Ok(());
}
if json.get("type").and_then(|v| v.as_str()) == Some("error") {
let message = json.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
error!("❌ Relay error: {}", message);
return Ok(());
}
}
warn!("Unknown message from relay: {}", text);
return Ok(());
}
};
match msg {
SignalingMessage::Ready => {
info!("📨 Received 'ready' signal from dashboard - creating offer");
if let Some(old_conn) = current_connection.lock().await.take() {
warn!("Closing existing WebRTC connection for new dashboard");
let _ = old_conn.peer_connection().close().await;
}
let connection = match webrtc_server.create_connection().await {
Ok(conn) => Arc::new(conn),
Err(e) => {
error!("Failed to create WebRTC connection: {}", e);
let error_msg =
serde_json::json!({"type": "error", "message": format!("Failed to create connection: {}", e)});
write.send(Message::Text(error_msg.to_string())).await?;
return Ok(());
}
};
let conn_for_streaming = Arc::clone(&connection);
tokio::spawn(async move {
if let Err(e) = conn_for_streaming.run_streaming_loop().await {
error!("Streaming loop error: {}", e);
}
});
*current_connection.lock().await = Some(Arc::clone(&connection));
match connection.create_offer().await {
Ok(offer_sdp) => {
let offer_msg = SignalingMessage::Offer { sdp: offer_sdp };
let json = serde_json::to_string(&offer_msg)?;
write.send(Message::Text(json)).await?;
info!("✅ Sent SDP offer to relay");
}
Err(e) => {
error!("Failed to create SDP offer: {}", e);
let error_msg =
serde_json::json!({"type": "error", "message": format!("Failed to create offer: {}", e)});
write.send(Message::Text(error_msg.to_string())).await?;
}
}
}
SignalingMessage::Answer { sdp } => {
info!("📨 Received SDP answer from dashboard");
if let Some(connection) = current_connection.lock().await.as_ref() {
match connection.handle_answer(sdp).await {
Ok(()) => {
info!("✅ WebRTC connection established");
}
Err(e) => {
error!("Failed to handle SDP answer: {}", e);
}
}
} else {
warn!("Received answer but no active connection");
}
}
SignalingMessage::IceCandidate {
candidate,
sdp_mid,
sdp_mline_index,
} => {
if let Some(connection) = current_connection.lock().await.as_ref() {
if let Err(e) = connection.add_ice_candidate(candidate, sdp_mid, sdp_mline_index).await {
error!("Failed to add ICE candidate: {}", e);
}
}
}
SignalingMessage::Offer { .. } => {
warn!("Received unexpected offer from dashboard (ignored)");
}
}
Ok(())
}