mecha10-video 0.1.25

WebRTC video streaming for Mecha10 - camera frame capture and broadcasting
Documentation
/// WebRTC Signaling Relay Client
///
/// Connects to a remote signaling relay service instead of running a local server.
/// Used when robots are behind NAT/firewalls and need to connect outbound.
use anyhow::{Context, Result};
use futures_util::{SinkExt, StreamExt};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{error, info, warn};

use crate::webrtc::{SignalingMessage, WebRTCConnection, WebRTCServer};

/// Configuration for signaling relay client
pub struct SignalingRelayConfig {
    /// Relay WebSocket URL (e.g., wss://api.mecha.industries/webrtc-relay)
    pub url: String,
    /// Robot ID for authentication
    pub robot_id: String,
    /// API key for authentication
    pub api_key: String,
    /// Reconnect interval in milliseconds
    pub reconnect_interval_ms: u64,
}

/// Start signaling relay client
///
/// Connects outbound to a relay service and handles signaling through it.
/// The relay forwards messages between this robot and connected dashboards.
pub async fn start_signaling_relay_client(
    config: SignalingRelayConfig,
    webrtc_server: Arc<WebRTCServer>,
) -> Result<()> {
    info!(
        "🌐 Starting WebRTC signaling relay client for robot {}",
        config.robot_id
    );
    info!("📡 Connecting to relay: {}", config.url);

    loop {
        match connect_to_relay(&config, Arc::clone(&webrtc_server)).await {
            Ok(()) => {
                info!("Relay connection closed normally");
            }
            Err(e) => {
                error!("Relay connection error: {}", e);
            }
        }

        // Reconnect after delay
        let delay = Duration::from_millis(config.reconnect_interval_ms);
        info!("Reconnecting to signaling relay in {:?}...", delay);
        tokio::time::sleep(delay).await;
    }
}

/// Connect to relay and handle signaling
async fn connect_to_relay(config: &SignalingRelayConfig, webrtc_server: Arc<WebRTCServer>) -> Result<()> {
    // Convert http(s):// URLs to ws(s):// for WebSocket connections
    let ws_url = if config.url.starts_with("http://") {
        config.url.replacen("http://", "ws://", 1)
    } else if config.url.starts_with("https://") {
        config.url.replacen("https://", "wss://", 1)
    } else {
        config.url.clone()
    };

    // Build connection URL with auth params
    let url = format!(
        "{}?type=robot&robot_id={}&api_key={}",
        ws_url, config.robot_id, config.api_key
    );

    info!("Connecting to signaling relay...");

    let (ws_stream, _) = connect_async(&url).await.context("Failed to connect to relay")?;
    let (mut write, mut read) = ws_stream.split();

    info!("✅ Connected to signaling relay");

    // Current active WebRTC connection (one dashboard at a time)
    let current_connection: Arc<Mutex<Option<Arc<WebRTCConnection>>>> = Arc::new(Mutex::new(None));

    // Handle signaling messages from relay (from dashboard)
    while let Some(msg) = read.next().await {
        match msg {
            Ok(Message::Text(text)) => {
                match handle_relay_message(&text, &webrtc_server, &current_connection, &mut write).await {
                    Ok(()) => {}
                    Err(e) => {
                        error!("Error handling relay message: {}", e);
                    }
                }
            }
            Ok(Message::Close(_)) => {
                info!("Relay closed connection");
                break;
            }
            Ok(Message::Ping(data)) => {
                if let Err(e) = write.send(Message::Pong(data)).await {
                    error!("Failed to send pong: {}", e);
                    break;
                }
            }
            Err(e) => {
                error!("Relay WebSocket error: {}", e);
                break;
            }
            _ => {}
        }
    }

    // Cleanup current connection
    if let Some(conn) = current_connection.lock().await.take() {
        info!("🧹 Cleaning up WebRTC connection");
        if let Err(e) = conn.peer_connection().close().await {
            error!("Error closing peer connection: {}", e);
        }
    }

    Ok(())
}

/// Handle a signaling message received from the relay
async fn handle_relay_message(
    text: &str,
    webrtc_server: &Arc<WebRTCServer>,
    current_connection: &Arc<Mutex<Option<Arc<WebRTCConnection>>>>,
    write: &mut futures_util::stream::SplitSink<
        tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
        Message,
    >,
) -> Result<()> {
    // Try to parse as signaling message
    let msg: SignalingMessage = match serde_json::from_str(text) {
        Ok(m) => m,
        Err(_) => {
            // Check for welcome/error messages
            if let Ok(json) = serde_json::from_str::<serde_json::Value>(text) {
                if json.get("type").and_then(|v| v.as_str()) == Some("welcome") {
                    info!("✅ Received welcome from relay");
                    return Ok(());
                }
                if json.get("type").and_then(|v| v.as_str()) == Some("error") {
                    let message = json.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
                    error!("❌ Relay error: {}", message);
                    return Ok(());
                }
            }
            warn!("Unknown message from relay: {}", text);
            return Ok(());
        }
    };

    match msg {
        SignalingMessage::Ready => {
            info!("📨 Received 'ready' signal from dashboard - creating offer");

            // Close existing connection if any
            if let Some(old_conn) = current_connection.lock().await.take() {
                warn!("Closing existing WebRTC connection for new dashboard");
                let _ = old_conn.peer_connection().close().await;
            }

            // Create new WebRTC connection
            let connection = match webrtc_server.create_connection().await {
                Ok(conn) => Arc::new(conn),
                Err(e) => {
                    error!("Failed to create WebRTC connection: {}", e);
                    let error_msg =
                        serde_json::json!({"type": "error", "message": format!("Failed to create connection: {}", e)});
                    write.send(Message::Text(error_msg.to_string())).await?;
                    return Ok(());
                }
            };

            // Start streaming loop for this connection
            let conn_for_streaming = Arc::clone(&connection);
            tokio::spawn(async move {
                if let Err(e) = conn_for_streaming.run_streaming_loop().await {
                    error!("Streaming loop error: {}", e);
                }
            });

            // Store connection
            *current_connection.lock().await = Some(Arc::clone(&connection));

            // Create and send SDP offer
            match connection.create_offer().await {
                Ok(offer_sdp) => {
                    let offer_msg = SignalingMessage::Offer { sdp: offer_sdp };
                    let json = serde_json::to_string(&offer_msg)?;
                    write.send(Message::Text(json)).await?;
                    info!("✅ Sent SDP offer to relay");
                }
                Err(e) => {
                    error!("Failed to create SDP offer: {}", e);
                    let error_msg =
                        serde_json::json!({"type": "error", "message": format!("Failed to create offer: {}", e)});
                    write.send(Message::Text(error_msg.to_string())).await?;
                }
            }
        }
        SignalingMessage::Answer { sdp } => {
            info!("📨 Received SDP answer from dashboard");

            if let Some(connection) = current_connection.lock().await.as_ref() {
                match connection.handle_answer(sdp).await {
                    Ok(()) => {
                        info!("✅ WebRTC connection established");
                    }
                    Err(e) => {
                        error!("Failed to handle SDP answer: {}", e);
                    }
                }
            } else {
                warn!("Received answer but no active connection");
            }
        }
        SignalingMessage::IceCandidate {
            candidate,
            sdp_mid,
            sdp_mline_index,
        } => {
            if let Some(connection) = current_connection.lock().await.as_ref() {
                if let Err(e) = connection.add_ice_candidate(candidate, sdp_mid, sdp_mline_index).await {
                    error!("Failed to add ICE candidate: {}", e);
                }
            }
        }
        SignalingMessage::Offer { .. } => {
            // We don't accept offers from dashboard - we create offers
            warn!("Received unexpected offer from dashboard (ignored)");
        }
    }

    Ok(())
}