1mod error;
2mod types;
3
4pub use error::{RadioError, Result};
5pub use types::{ClientAction, ConnectionState, RadioConfig, ServerMessage};
6
7use futures_util::stream::SplitSink;
8use futures_util::stream::SplitStream;
9use futures_util::{SinkExt, StreamExt};
10use std::sync::Arc;
11use tokio::net::TcpStream;
12use tokio::sync::{RwLock, mpsc};
13use tokio::time::{Duration, interval};
14use tokio_tungstenite::{
15 MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message,
16};
17
18type WsWrite = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
19type WsRead = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
20
21pub struct RadioClient {
23 config: RadioConfig,
24 state: Arc<RwLock<ConnectionState>>,
25 tx: mpsc::UnboundedSender<ClientAction>,
26}
27
28impl RadioClient {
29 pub fn new(config: RadioConfig) -> Self {
31 let (tx, _rx) = mpsc::unbounded_channel();
32 Self {
33 config,
34 state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
35 tx,
36 }
37 }
38
39 pub fn with_url(url: impl Into<String>) -> Self {
41 Self::new(RadioConfig::new(url))
42 }
43
44 pub async fn state(&self) -> ConnectionState {
46 *self.state.read().await
47 }
48
49 pub async fn connect(&mut self) -> Result<()> {
51 let state = *self.state.read().await;
52 if state != ConnectionState::Disconnected {
53 return Err(RadioError::AlreadyConnected);
54 }
55
56 *self.state.write().await = ConnectionState::Connecting;
57
58 let (ws_stream, _) = connect_async(&self.config.url).await?;
59 let (write, read) = ws_stream.split();
60
61 let (tx, rx) = mpsc::unbounded_channel();
62 self.tx = tx.clone();
63
64 let state_clone = Arc::clone(&self.state);
65 let config_clone = self.config.clone();
66
67 tokio::spawn(Self::message_loop(
69 write,
70 read,
71 rx,
72 state_clone,
73 config_clone,
74 tx.clone(),
75 ));
76
77 *self.state.write().await = ConnectionState::Connected;
78 Ok(())
79 }
80
81 pub async fn disconnect(&self) -> Result<()> {
83 *self.state.write().await = ConnectionState::Disconnected;
84 Ok(())
85 }
86
87 pub async fn tune(&self, frequencies: Vec<String>) -> Result<()> {
89 self.validate_frequencies(&frequencies)?;
90 self.tx
91 .send(ClientAction::Tune { frequencies })
92 .map_err(|_| RadioError::NotConnected)?;
93 Ok(())
94 }
95
96 pub async fn untune(&self, frequencies: Vec<String>) -> Result<()> {
98 self.validate_frequencies(&frequencies)?;
99 self.tx
100 .send(ClientAction::Untune { frequencies })
101 .map_err(|_| RadioError::NotConnected)?;
102 Ok(())
103 }
104
105 pub async fn broadcast(&self, frequency: &str, event: serde_json::Value) -> Result<()> {
107 self.validate_frequencies(&[frequency.to_string()])?;
108 self.tx
109 .send(ClientAction::Broadcast {
110 frequency: frequency.to_string(),
111 event,
112 })
113 .map_err(|_| RadioError::NotConnected)?;
114 Ok(())
115 }
116
117 pub fn validate_frequencies(&self, frequencies: &[String]) -> Result<()> {
119 for freq in frequencies {
120 if let Ok(f) = freq.parse::<f64>() {
121 if !(40.0..=108.0).contains(&f) {
122 return Err(RadioError::InvalidFrequency(freq.clone()));
123 }
124 } else {
125 return Err(RadioError::InvalidFrequency(freq.clone()));
126 }
127 }
128 Ok(())
129 }
130
131 async fn message_loop(
133 mut write: WsWrite,
134 mut read: WsRead,
135 mut rx: mpsc::UnboundedReceiver<ClientAction>,
136 state: Arc<RwLock<ConnectionState>>,
137 config: RadioConfig,
138 tx: mpsc::UnboundedSender<ClientAction>,
139 ) {
140 let mut heartbeat = interval(Duration::from_millis(config.heartbeat_interval_ms));
141 let mut attempt: u32 = 0;
142
143 loop {
144 let broke_cleanly = loop {
146 tokio::select! {
147 Some(action) = rx.recv() => {
148 let json = serde_json::to_string(&action).unwrap();
149 let msg = Message::text(json);
150 if write.send(msg).await.is_err() {
151 eprintln!("[Radio] Send error");
152 break false;
153 }
154 }
155
156 Some(msg) = read.next() => {
157 match msg {
158 Ok(Message::Text(text)) => {
159 attempt = 0;
161 if let Ok(server_msg) = serde_json::from_str::<ServerMessage>(&text) {
162 Self::handle_server_message(server_msg);
163 }
164 }
165 Ok(Message::Close(_)) => {
166 println!("[Radio] Connection closed by server");
167 break true;
168 }
169 Err(_) => {
170 eprintln!("[Radio] Read error");
171 break false;
172 }
173 _ => {}
174 }
175 }
176
177 _ = heartbeat.tick() => {
178 let _ = tx.send(ClientAction::Ping);
179 }
180 }
181 };
182
183 if !config.auto_reconnect {
185 break;
186 }
187 if broke_cleanly {
188 break;
190 }
191 if config.max_reconnect_attempts > 0 && attempt >= config.max_reconnect_attempts {
192 eprintln!(
193 "[Radio] Max reconnect attempts reached ({})",
194 config.max_reconnect_attempts
195 );
196 break;
197 }
198
199 attempt += 1;
201 let delay = std::cmp::min(
202 config.reconnect_delay_ms * 2u64.saturating_pow(attempt - 1),
203 config.max_reconnect_delay_ms,
204 );
205 eprintln!(
206 "[Radio] Reconnecting in {}ms (attempt {}/{})...",
207 delay,
208 attempt,
209 if config.max_reconnect_attempts == 0 {
210 "∞".to_string()
211 } else {
212 config.max_reconnect_attempts.to_string()
213 }
214 );
215
216 *state.write().await = ConnectionState::Reconnecting;
217 tokio::time::sleep(Duration::from_millis(delay)).await;
218
219 match connect_async(&config.url).await {
221 Ok((ws_stream, _)) => {
222 let (new_write, new_read) = ws_stream.split();
223 write = new_write;
224 read = new_read;
225 *state.write().await = ConnectionState::Connected;
226 heartbeat = interval(Duration::from_millis(config.heartbeat_interval_ms));
227 eprintln!("[Radio] Reconnected successfully");
228 }
229 Err(e) => {
230 eprintln!("[Radio] Reconnect failed: {}", e);
231 continue;
232 }
233 }
234 }
235
236 *state.write().await = ConnectionState::Disconnected;
237 }
238
239 fn handle_server_message(msg: ServerMessage) {
241 match msg {
242 ServerMessage::Connected {
243 client_id, message, ..
244 } => {
245 println!("[Radio] ✅ {} (Client ID: {})", message, client_id);
246 }
247 ServerMessage::Tuned {
248 frequencies,
249 message,
250 } => {
251 println!("[Radio] ✅ {} - {:?}", message, frequencies);
252 }
253 ServerMessage::Broadcast {
254 frequency,
255 event,
256 timestamp,
257 } => {
258 println!(
259 "[Radio] 📻 Broadcast on {} FM at {}: {:?}",
260 frequency, timestamp, event
261 );
262 }
263 ServerMessage::Pong => {
264 println!("[Radio] 💓 Heartbeat");
265 }
266 ServerMessage::Error { message } => {
267 eprintln!("[Radio] ❌ Error: {}", message);
268 }
269 }
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[test]
278 fn test_frequency_validation() {
279 let config = RadioConfig::new("wss://example.com");
280 let client = RadioClient::new(config);
281
282 assert!(client.validate_frequencies(&["91.0".to_string()]).is_ok());
284 assert!(
285 client
286 .validate_frequencies(&["40.0".to_string(), "108.0".to_string()])
287 .is_ok()
288 );
289
290 assert!(client.validate_frequencies(&["39.9".to_string()]).is_err());
292 assert!(client.validate_frequencies(&["108.1".to_string()]).is_err());
293 assert!(
294 client
295 .validate_frequencies(&["invalid".to_string()])
296 .is_err()
297 );
298 }
299
300 #[test]
301 fn test_initial_state() {
302 let client = RadioClient::with_url("wss://example.com");
303 assert_eq!(client.config.url, "wss://example.com");
306 }
307
308 #[test]
309 fn test_grok_preset() {
310 let config = RadioConfig::grok();
311 assert_eq!(config.url, "wss://faf-beacon.wolfejam2020.workers.dev/radio");
312 assert!(config.auto_reconnect);
313 assert_eq!(config.max_reconnect_attempts, 5);
314 }
315
316 #[tokio::test]
317 async fn test_broadcast_invalid_frequency() {
318 let client = RadioClient::new(RadioConfig::grok());
319 let result = client
320 .broadcast("999.0", serde_json::json!({"type": "test"}))
321 .await;
322 assert!(result.is_err());
323 assert!(matches!(
324 result.unwrap_err(),
325 RadioError::InvalidFrequency(_)
326 ));
327 }
328
329 #[tokio::test]
330 async fn test_broadcast_when_disconnected() {
331 let client = RadioClient::new(RadioConfig::grok());
332 let result = client
333 .broadcast("91.0", serde_json::json!({"type": "test"}))
334 .await;
335 assert!(result.is_err());
336 assert!(matches!(result.unwrap_err(), RadioError::NotConnected));
337 }
338}