1use crate::error::{Result, VoxRtcError};
2use crate::types::{ChannelState, ConnectionState, EventData};
3use pondsocket_client::{
4 Channel as PondChannel, ClientError, ClientOptions, ConnectionState as PondConnectionState,
5 PondClient,
6};
7use pondsocket_common::{ChannelEvent, ChannelState as PondChannelState};
8use std::time::Duration;
9use tokio::sync::{broadcast, watch};
10
11#[derive(Clone)]
12pub(crate) struct RawSocketClient {
13 client: PondClient,
14 params: EventData,
15 state_tx: watch::Sender<ConnectionState>,
16}
17
18#[derive(Clone)]
19pub(crate) struct RawSocketChannel {
20 channel: PondChannel,
21 state_tx: watch::Sender<ChannelState>,
22 message_tx: broadcast::Sender<(String, EventData)>,
23}
24
25impl RawSocketClient {
26 pub(crate) fn new(endpoint: &str, params: EventData) -> Result<Self> {
27 let options = ClientOptions {
28 connection_timeout: Duration::from_secs(10),
29 ..ClientOptions::default()
30 };
31 let client = PondClient::with_options(endpoint, Some(params.clone()), options)?;
32 let (state_tx, _) = watch::channel(map_connection_state(client.state()));
33 Ok(Self {
34 client,
35 params,
36 state_tx,
37 })
38 }
39
40 pub(crate) fn state(&self) -> ConnectionState {
41 map_connection_state(self.client.state())
42 }
43
44 pub(crate) fn subscribe_state(&self) -> watch::Receiver<ConnectionState> {
45 self.state_tx.subscribe()
46 }
47
48 pub(crate) async fn connect(&self) -> Result<()> {
49 self.state_tx
50 .send_replace(map_connection_state(self.client.state()));
51 self.client.connect().await?;
52 self.state_tx
53 .send_replace(map_connection_state(self.client.state()));
54 Ok(())
55 }
56
57 pub(crate) async fn disconnect(&self) {
58 self.client.disconnect().await;
59 self.state_tx
60 .send_replace(map_connection_state(self.client.state()));
61 }
62
63 pub(crate) async fn create_channel(
64 &self,
65 name: impl Into<String>,
66 params: EventData,
67 ) -> RawSocketChannel {
68 let channel = self.client.create_channel(name, Some(params)).await;
69 RawSocketChannel::new(channel)
70 }
71
72 #[allow(dead_code)]
73 pub(crate) fn params(&self) -> &EventData {
74 &self.params
75 }
76}
77
78impl RawSocketChannel {
79 fn new(channel: PondChannel) -> Self {
80 let (state_tx, _) = watch::channel(map_channel_state(channel.state()));
81 let (message_tx, _) = broadcast::channel(1024);
82
83 let mut pond_states = channel.subscribe_state();
84 let mirror_state_tx = state_tx.clone();
85 tokio::spawn(async move {
86 loop {
87 mirror_state_tx.send_replace(map_channel_state(*pond_states.borrow_and_update()));
88 if pond_states.changed().await.is_err() {
89 break;
90 }
91 }
92 });
93
94 let mut pond_events = channel.subscribe_events();
95 let mirror_message_tx = message_tx.clone();
96 tokio::spawn(async move {
97 while let Ok(event) = pond_events.recv().await {
98 if let Some((event, payload)) = map_channel_event(event) {
99 let _ = mirror_message_tx.send((event, payload));
100 }
101 }
102 });
103
104 Self {
105 channel,
106 state_tx,
107 message_tx,
108 }
109 }
110
111 pub(crate) fn name(&self) -> &str {
112 self.channel.name()
113 }
114
115 pub(crate) fn subscribe_state(&self) -> watch::Receiver<ChannelState> {
116 self.state_tx.subscribe()
117 }
118
119 pub(crate) fn subscribe_messages(&self) -> broadcast::Receiver<(String, EventData)> {
120 self.message_tx.subscribe()
121 }
122
123 pub(crate) async fn join(&self) -> Result<()> {
124 self.channel.join().await;
125 Ok(())
126 }
127
128 pub(crate) async fn leave(&self) -> Result<()> {
129 self.channel.leave().await;
130 Ok(())
131 }
132
133 pub(crate) async fn send_message(&self, event: &str, payload: EventData) -> Result<()> {
134 self.channel.send_message(event, Some(payload)).await;
135 Ok(())
136 }
137}
138
139fn map_connection_state(state: PondConnectionState) -> ConnectionState {
140 match state {
141 PondConnectionState::Connecting => ConnectionState::Connecting,
142 PondConnectionState::Connected => ConnectionState::Connected,
143 PondConnectionState::Disconnected => ConnectionState::Disconnected,
144 }
145}
146
147fn map_channel_state(state: PondChannelState) -> ChannelState {
148 match state {
149 PondChannelState::Idle => ChannelState::Idle,
150 PondChannelState::Joining => ChannelState::Joining,
151 PondChannelState::Joined => ChannelState::Joined,
152 PondChannelState::Closed => ChannelState::Closed,
153 PondChannelState::Declined => ChannelState::Declined,
154 PondChannelState::Stalled => ChannelState::Joining,
155 }
156}
157
158fn map_channel_event(event: ChannelEvent) -> Option<(String, EventData)> {
159 match event {
160 ChannelEvent::Message(message) => Some((message.event, message.payload)),
161 ChannelEvent::Presence(_) => None,
162 }
163}
164
165impl From<ClientError> for VoxRtcError {
166 fn from(value: ClientError) -> Self {
167 match value {
168 ClientError::Url(err) => Self::InvalidUrl(err),
169 ClientError::Serialization(err) => Self::Json(err),
170 ClientError::WebSocket(err) => Self::PondSocketClient(err.to_string()),
171 ClientError::NotConnected | ClientError::ChannelClosed => Self::Disconnected,
172 other => Self::PondSocketClient(other.to_string()),
173 }
174 }
175}