Skip to main content

vox_rtc_server/
socket.rs

1use crate::error::{Result, VoxRtcError};
2use crate::types::{ChannelState, ConnectionState, EventData};
3use pondsocket_client::{
4    Channel as PondChannel, ClientError, ClientOptions, ConnectionState as PondConnectionState,
5    PondClient,
6};
7use pondsocket_common::{ChannelEvent, ChannelState as PondChannelState};
8use std::time::Duration;
9use tokio::sync::{broadcast, watch};
10
11#[derive(Clone)]
12pub(crate) struct RawSocketClient {
13    client: PondClient,
14    params: EventData,
15    state_tx: watch::Sender<ConnectionState>,
16}
17
18#[derive(Clone)]
19pub(crate) struct RawSocketChannel {
20    channel: PondChannel,
21    state_tx: watch::Sender<ChannelState>,
22    message_tx: broadcast::Sender<(String, EventData)>,
23}
24
25impl RawSocketClient {
26    pub(crate) fn new(endpoint: &str, params: EventData) -> Result<Self> {
27        let options = ClientOptions {
28            connection_timeout: Duration::from_secs(10),
29            ..ClientOptions::default()
30        };
31        let client = PondClient::with_options(endpoint, Some(params.clone()), options)?;
32        let (state_tx, _) = watch::channel(map_connection_state(client.state()));
33        Ok(Self {
34            client,
35            params,
36            state_tx,
37        })
38    }
39
40    pub(crate) fn state(&self) -> ConnectionState {
41        map_connection_state(self.client.state())
42    }
43
44    pub(crate) fn subscribe_state(&self) -> watch::Receiver<ConnectionState> {
45        self.state_tx.subscribe()
46    }
47
48    pub(crate) async fn connect(&self) -> Result<()> {
49        self.state_tx
50            .send_replace(map_connection_state(self.client.state()));
51        self.client.connect().await?;
52        self.state_tx
53            .send_replace(map_connection_state(self.client.state()));
54        Ok(())
55    }
56
57    pub(crate) async fn disconnect(&self) {
58        self.client.disconnect().await;
59        self.state_tx
60            .send_replace(map_connection_state(self.client.state()));
61    }
62
63    pub(crate) async fn create_channel(
64        &self,
65        name: impl Into<String>,
66        params: EventData,
67    ) -> RawSocketChannel {
68        let channel = self.client.create_channel(name, Some(params)).await;
69        RawSocketChannel::new(channel)
70    }
71
72    #[allow(dead_code)]
73    pub(crate) fn params(&self) -> &EventData {
74        &self.params
75    }
76}
77
78impl RawSocketChannel {
79    fn new(channel: PondChannel) -> Self {
80        let (state_tx, _) = watch::channel(map_channel_state(channel.state()));
81        let (message_tx, _) = broadcast::channel(1024);
82
83        let mut pond_states = channel.subscribe_state();
84        let mirror_state_tx = state_tx.clone();
85        tokio::spawn(async move {
86            loop {
87                mirror_state_tx.send_replace(map_channel_state(*pond_states.borrow_and_update()));
88                if pond_states.changed().await.is_err() {
89                    break;
90                }
91            }
92        });
93
94        let mut pond_events = channel.subscribe_events();
95        let mirror_message_tx = message_tx.clone();
96        tokio::spawn(async move {
97            while let Ok(event) = pond_events.recv().await {
98                if let Some((event, payload)) = map_channel_event(event) {
99                    let _ = mirror_message_tx.send((event, payload));
100                }
101            }
102        });
103
104        Self {
105            channel,
106            state_tx,
107            message_tx,
108        }
109    }
110
111    pub(crate) fn name(&self) -> &str {
112        self.channel.name()
113    }
114
115    pub(crate) fn subscribe_state(&self) -> watch::Receiver<ChannelState> {
116        self.state_tx.subscribe()
117    }
118
119    pub(crate) fn subscribe_messages(&self) -> broadcast::Receiver<(String, EventData)> {
120        self.message_tx.subscribe()
121    }
122
123    pub(crate) async fn join(&self) -> Result<()> {
124        self.channel.join().await;
125        Ok(())
126    }
127
128    pub(crate) async fn leave(&self) -> Result<()> {
129        self.channel.leave().await;
130        Ok(())
131    }
132
133    pub(crate) async fn send_message(&self, event: &str, payload: EventData) -> Result<()> {
134        self.channel.send_message(event, Some(payload)).await;
135        Ok(())
136    }
137}
138
139fn map_connection_state(state: PondConnectionState) -> ConnectionState {
140    match state {
141        PondConnectionState::Connecting => ConnectionState::Connecting,
142        PondConnectionState::Connected => ConnectionState::Connected,
143        PondConnectionState::Disconnected => ConnectionState::Disconnected,
144    }
145}
146
147fn map_channel_state(state: PondChannelState) -> ChannelState {
148    match state {
149        PondChannelState::Idle => ChannelState::Idle,
150        PondChannelState::Joining => ChannelState::Joining,
151        PondChannelState::Joined => ChannelState::Joined,
152        PondChannelState::Closed => ChannelState::Closed,
153        PondChannelState::Declined => ChannelState::Declined,
154        PondChannelState::Stalled => ChannelState::Joining,
155    }
156}
157
158fn map_channel_event(event: ChannelEvent) -> Option<(String, EventData)> {
159    match event {
160        ChannelEvent::Message(message) => Some((message.event, message.payload)),
161        ChannelEvent::Presence(_) => None,
162    }
163}
164
165impl From<ClientError> for VoxRtcError {
166    fn from(value: ClientError) -> Self {
167        match value {
168            ClientError::Url(err) => Self::InvalidUrl(err),
169            ClientError::Serialization(err) => Self::Json(err),
170            ClientError::WebSocket(err) => Self::PondSocketClient(err.to_string()),
171            ClientError::NotConnected | ClientError::ChannelClosed => Self::Disconnected,
172            other => Self::PondSocketClient(other.to_string()),
173        }
174    }
175}