Skip to main content

faf_radio_rust/
lib.rs

1mod error;
2mod types;
3
4pub use error::{RadioError, Result};
5pub use types::{ClientAction, ConnectionState, RadioConfig, ServerMessage};
6
7use futures_util::stream::SplitSink;
8use futures_util::stream::SplitStream;
9use futures_util::{SinkExt, StreamExt};
10use std::sync::Arc;
11use tokio::net::TcpStream;
12use tokio::sync::{RwLock, mpsc};
13use tokio::time::{Duration, interval};
14use tokio_tungstenite::{
15    MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message,
16};
17
18type WsWrite = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
19type WsRead = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
20
21/// Radio Protocol WebSocket client
22pub struct RadioClient {
23    config: RadioConfig,
24    state: Arc<RwLock<ConnectionState>>,
25    tx: mpsc::UnboundedSender<ClientAction>,
26}
27
28impl RadioClient {
29    /// Create a new Radio client
30    pub fn new(config: RadioConfig) -> Self {
31        let (tx, _rx) = mpsc::unbounded_channel();
32        Self {
33            config,
34            state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
35            tx,
36        }
37    }
38
39    /// Create a new Radio client with URL
40    pub fn with_url(url: impl Into<String>) -> Self {
41        Self::new(RadioConfig::new(url))
42    }
43
44    /// Get current connection state
45    pub async fn state(&self) -> ConnectionState {
46        *self.state.read().await
47    }
48
49    /// Connect to the Radio Protocol server
50    pub async fn connect(&mut self) -> Result<()> {
51        let state = *self.state.read().await;
52        if state != ConnectionState::Disconnected {
53            return Err(RadioError::AlreadyConnected);
54        }
55
56        *self.state.write().await = ConnectionState::Connecting;
57
58        let (ws_stream, _) = connect_async(&self.config.url).await?;
59        let (write, read) = ws_stream.split();
60
61        let (tx, rx) = mpsc::unbounded_channel();
62        self.tx = tx.clone();
63
64        let state_clone = Arc::clone(&self.state);
65        let config_clone = self.config.clone();
66
67        // Spawn message handler task
68        tokio::spawn(Self::message_loop(
69            write,
70            read,
71            rx,
72            state_clone,
73            config_clone,
74            tx.clone(),
75        ));
76
77        *self.state.write().await = ConnectionState::Connected;
78        Ok(())
79    }
80
81    /// Disconnect from the server
82    pub async fn disconnect(&self) -> Result<()> {
83        *self.state.write().await = ConnectionState::Disconnected;
84        Ok(())
85    }
86
87    /// Tune to frequencies
88    pub async fn tune(&self, frequencies: Vec<String>) -> Result<()> {
89        self.validate_frequencies(&frequencies)?;
90        self.tx
91            .send(ClientAction::Tune { frequencies })
92            .map_err(|_| RadioError::NotConnected)?;
93        Ok(())
94    }
95
96    /// Untune from frequencies
97    pub async fn untune(&self, frequencies: Vec<String>) -> Result<()> {
98        self.validate_frequencies(&frequencies)?;
99        self.tx
100            .send(ClientAction::Untune { frequencies })
101            .map_err(|_| RadioError::NotConnected)?;
102        Ok(())
103    }
104
105    /// Broadcast an event on a frequency
106    pub async fn broadcast(&self, frequency: &str, event: serde_json::Value) -> Result<()> {
107        self.validate_frequencies(&[frequency.to_string()])?;
108        self.tx
109            .send(ClientAction::Broadcast {
110                frequency: frequency.to_string(),
111                event,
112            })
113            .map_err(|_| RadioError::NotConnected)?;
114        Ok(())
115    }
116
117    /// Validate frequency range (40.0-108.0 FM)
118    pub fn validate_frequencies(&self, frequencies: &[String]) -> Result<()> {
119        for freq in frequencies {
120            if let Ok(f) = freq.parse::<f64>() {
121                if !(40.0..=108.0).contains(&f) {
122                    return Err(RadioError::InvalidFrequency(freq.clone()));
123                }
124            } else {
125                return Err(RadioError::InvalidFrequency(freq.clone()));
126            }
127        }
128        Ok(())
129    }
130
131    /// Main message loop with auto-reconnect
132    async fn message_loop(
133        mut write: WsWrite,
134        mut read: WsRead,
135        mut rx: mpsc::UnboundedReceiver<ClientAction>,
136        state: Arc<RwLock<ConnectionState>>,
137        config: RadioConfig,
138        tx: mpsc::UnboundedSender<ClientAction>,
139    ) {
140        let mut heartbeat = interval(Duration::from_millis(config.heartbeat_interval_ms));
141        let mut attempt: u32 = 0;
142
143        loop {
144            // Inner select loop — runs until connection breaks
145            let broke_cleanly = loop {
146                tokio::select! {
147                    Some(action) = rx.recv() => {
148                        let json = serde_json::to_string(&action).unwrap();
149                        let msg = Message::text(json);
150                        if write.send(msg).await.is_err() {
151                            eprintln!("[Radio] Send error");
152                            break false;
153                        }
154                    }
155
156                    Some(msg) = read.next() => {
157                        match msg {
158                            Ok(Message::Text(text)) => {
159                                // Connection is alive — reset attempt counter
160                                attempt = 0;
161                                if let Ok(server_msg) = serde_json::from_str::<ServerMessage>(&text) {
162                                    Self::handle_server_message(server_msg);
163                                }
164                            }
165                            Ok(Message::Close(_)) => {
166                                println!("[Radio] Connection closed by server");
167                                break true;
168                            }
169                            Err(_) => {
170                                eprintln!("[Radio] Read error");
171                                break false;
172                            }
173                            _ => {}
174                        }
175                    }
176
177                    _ = heartbeat.tick() => {
178                        let _ = tx.send(ClientAction::Ping);
179                    }
180                }
181            };
182
183            // Should we reconnect?
184            if !config.auto_reconnect {
185                break;
186            }
187            if broke_cleanly {
188                // Server sent Close frame — intentional disconnect, don't retry
189                break;
190            }
191            if config.max_reconnect_attempts > 0 && attempt >= config.max_reconnect_attempts {
192                eprintln!(
193                    "[Radio] Max reconnect attempts reached ({})",
194                    config.max_reconnect_attempts
195                );
196                break;
197            }
198
199            // Exponential backoff
200            attempt += 1;
201            let delay = std::cmp::min(
202                config.reconnect_delay_ms * 2u64.saturating_pow(attempt - 1),
203                config.max_reconnect_delay_ms,
204            );
205            eprintln!(
206                "[Radio] Reconnecting in {}ms (attempt {}/{})...",
207                delay,
208                attempt,
209                if config.max_reconnect_attempts == 0 {
210                    "∞".to_string()
211                } else {
212                    config.max_reconnect_attempts.to_string()
213                }
214            );
215
216            *state.write().await = ConnectionState::Reconnecting;
217            tokio::time::sleep(Duration::from_millis(delay)).await;
218
219            // Attempt reconnection
220            match connect_async(&config.url).await {
221                Ok((ws_stream, _)) => {
222                    let (new_write, new_read) = ws_stream.split();
223                    write = new_write;
224                    read = new_read;
225                    *state.write().await = ConnectionState::Connected;
226                    heartbeat = interval(Duration::from_millis(config.heartbeat_interval_ms));
227                    eprintln!("[Radio] Reconnected successfully");
228                }
229                Err(e) => {
230                    eprintln!("[Radio] Reconnect failed: {}", e);
231                    continue;
232                }
233            }
234        }
235
236        *state.write().await = ConnectionState::Disconnected;
237    }
238
239    /// Handle server messages
240    fn handle_server_message(msg: ServerMessage) {
241        match msg {
242            ServerMessage::Connected {
243                client_id, message, ..
244            } => {
245                println!("[Radio] ✅ {} (Client ID: {})", message, client_id);
246            }
247            ServerMessage::Tuned {
248                frequencies,
249                message,
250            } => {
251                println!("[Radio] ✅ {} - {:?}", message, frequencies);
252            }
253            ServerMessage::Broadcast {
254                frequency,
255                event,
256                timestamp,
257            } => {
258                println!(
259                    "[Radio] 📻 Broadcast on {} FM at {}: {:?}",
260                    frequency, timestamp, event
261                );
262            }
263            ServerMessage::Pong => {
264                println!("[Radio] 💓 Heartbeat");
265            }
266            ServerMessage::Error { message } => {
267                eprintln!("[Radio] ❌ Error: {}", message);
268            }
269        }
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn test_frequency_validation() {
279        let config = RadioConfig::new("wss://example.com");
280        let client = RadioClient::new(config);
281
282        // Valid frequencies
283        assert!(client.validate_frequencies(&["91.0".to_string()]).is_ok());
284        assert!(
285            client
286                .validate_frequencies(&["40.0".to_string(), "108.0".to_string()])
287                .is_ok()
288        );
289
290        // Invalid frequencies
291        assert!(client.validate_frequencies(&["39.9".to_string()]).is_err());
292        assert!(client.validate_frequencies(&["108.1".to_string()]).is_err());
293        assert!(
294            client
295                .validate_frequencies(&["invalid".to_string()])
296                .is_err()
297        );
298    }
299
300    #[test]
301    fn test_initial_state() {
302        let client = RadioClient::with_url("wss://example.com");
303        // State check requires async, so we can't test it in a sync test
304        // This is just a placeholder for the sync test
305        assert_eq!(client.config.url, "wss://example.com");
306    }
307
308    #[test]
309    fn test_grok_preset() {
310        let config = RadioConfig::grok();
311        assert_eq!(config.url, "wss://faf-beacon.wolfejam2020.workers.dev/radio");
312        assert!(config.auto_reconnect);
313        assert_eq!(config.max_reconnect_attempts, 5);
314    }
315
316    #[tokio::test]
317    async fn test_broadcast_invalid_frequency() {
318        let client = RadioClient::new(RadioConfig::grok());
319        let result = client
320            .broadcast("999.0", serde_json::json!({"type": "test"}))
321            .await;
322        assert!(result.is_err());
323        assert!(matches!(
324            result.unwrap_err(),
325            RadioError::InvalidFrequency(_)
326        ));
327    }
328
329    #[tokio::test]
330    async fn test_broadcast_when_disconnected() {
331        let client = RadioClient::new(RadioConfig::grok());
332        let result = client
333            .broadcast("91.0", serde_json::json!({"type": "test"}))
334            .await;
335        assert!(result.is_err());
336        assert!(matches!(result.unwrap_err(), RadioError::NotConnected));
337    }
338}