vox-rtc-server 0.1.1

Server-side Rust SDK for controlling Vox-hosted WebRTC sessions
Documentation
use crate::error::{Result, VoxRtcError};
use crate::types::{ChannelState, ConnectionState, EventData};
use pondsocket_client::{
    Channel as PondChannel, ClientError, ClientOptions, ConnectionState as PondConnectionState,
    PondClient,
};
use pondsocket_common::{ChannelEvent, ChannelState as PondChannelState};
use std::time::Duration;
use tokio::sync::{broadcast, watch};

#[derive(Clone)]
pub(crate) struct RawSocketClient {
    client: PondClient,
    params: EventData,
    state_tx: watch::Sender<ConnectionState>,
}

#[derive(Clone)]
pub(crate) struct RawSocketChannel {
    channel: PondChannel,
    state_tx: watch::Sender<ChannelState>,
    message_tx: broadcast::Sender<(String, EventData)>,
}

impl RawSocketClient {
    pub(crate) fn new(endpoint: &str, params: EventData) -> Result<Self> {
        let options = ClientOptions {
            connection_timeout: Duration::from_secs(10),
            ..ClientOptions::default()
        };
        let client = PondClient::with_options(endpoint, Some(params.clone()), options)?;
        let (state_tx, _) = watch::channel(map_connection_state(client.state()));
        Ok(Self {
            client,
            params,
            state_tx,
        })
    }

    pub(crate) fn state(&self) -> ConnectionState {
        map_connection_state(self.client.state())
    }

    pub(crate) fn subscribe_state(&self) -> watch::Receiver<ConnectionState> {
        self.state_tx.subscribe()
    }

    pub(crate) async fn connect(&self) -> Result<()> {
        self.state_tx
            .send_replace(map_connection_state(self.client.state()));
        self.client.connect().await?;
        self.state_tx
            .send_replace(map_connection_state(self.client.state()));
        Ok(())
    }

    pub(crate) async fn disconnect(&self) {
        self.client.disconnect().await;
        self.state_tx
            .send_replace(map_connection_state(self.client.state()));
    }

    pub(crate) async fn create_channel(
        &self,
        name: impl Into<String>,
        params: EventData,
    ) -> RawSocketChannel {
        let channel = self.client.create_channel(name, Some(params)).await;
        RawSocketChannel::new(channel)
    }

    #[allow(dead_code)]
    pub(crate) fn params(&self) -> &EventData {
        &self.params
    }
}

impl RawSocketChannel {
    fn new(channel: PondChannel) -> Self {
        let (state_tx, _) = watch::channel(map_channel_state(channel.state()));
        let (message_tx, _) = broadcast::channel(1024);

        let mut pond_states = channel.subscribe_state();
        let mirror_state_tx = state_tx.clone();
        tokio::spawn(async move {
            loop {
                mirror_state_tx.send_replace(map_channel_state(*pond_states.borrow_and_update()));
                if pond_states.changed().await.is_err() {
                    break;
                }
            }
        });

        let mut pond_events = channel.subscribe_events();
        let mirror_message_tx = message_tx.clone();
        tokio::spawn(async move {
            while let Ok(event) = pond_events.recv().await {
                if let Some((event, payload)) = map_channel_event(event) {
                    let _ = mirror_message_tx.send((event, payload));
                }
            }
        });

        Self {
            channel,
            state_tx,
            message_tx,
        }
    }

    pub(crate) fn name(&self) -> &str {
        self.channel.name()
    }

    pub(crate) fn subscribe_state(&self) -> watch::Receiver<ChannelState> {
        self.state_tx.subscribe()
    }

    pub(crate) fn subscribe_messages(&self) -> broadcast::Receiver<(String, EventData)> {
        self.message_tx.subscribe()
    }

    pub(crate) async fn join(&self) -> Result<()> {
        self.channel.join().await;
        Ok(())
    }

    pub(crate) async fn leave(&self) -> Result<()> {
        self.channel.leave().await;
        Ok(())
    }

    pub(crate) async fn send_message(&self, event: &str, payload: EventData) -> Result<()> {
        self.channel.send_message(event, Some(payload)).await;
        Ok(())
    }
}

fn map_connection_state(state: PondConnectionState) -> ConnectionState {
    match state {
        PondConnectionState::Connecting => ConnectionState::Connecting,
        PondConnectionState::Connected => ConnectionState::Connected,
        PondConnectionState::Disconnected => ConnectionState::Disconnected,
    }
}

fn map_channel_state(state: PondChannelState) -> ChannelState {
    match state {
        PondChannelState::Idle => ChannelState::Idle,
        PondChannelState::Joining => ChannelState::Joining,
        PondChannelState::Joined => ChannelState::Joined,
        PondChannelState::Closed => ChannelState::Closed,
        PondChannelState::Declined => ChannelState::Declined,
        PondChannelState::Stalled => ChannelState::Joining,
    }
}

fn map_channel_event(event: ChannelEvent) -> Option<(String, EventData)> {
    match event {
        ChannelEvent::Message(message) => Some((message.event, message.payload)),
        ChannelEvent::Presence(_) => None,
    }
}

impl From<ClientError> for VoxRtcError {
    fn from(value: ClientError) -> Self {
        match value {
            ClientError::Url(err) => Self::InvalidUrl(err),
            ClientError::Serialization(err) => Self::Json(err),
            ClientError::WebSocket(err) => Self::PondSocketClient(err.to_string()),
            ClientError::NotConnected | ClientError::ChannelClosed => Self::Disconnected,
            other => Self::PondSocketClient(other.to_string()),
        }
    }
}