zello_client/
client.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2// SPDX-FileCopyrightText: 2024 John C. Murray
3
4//! Zello client implementation
5
6use crate::error::{Result, ZelloError};
7use crate::handlers::handle_message;
8use crate::message::IncomingMessage;
9use crate::message::Message;
10use crate::message::Response;
11use crate::protocol::Protocol;
12use audiopus::coder::Decoder;
13use crossbeam_channel::Sender;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::Mutex;
17use tokio::time::{Duration, timeout};
18use tracing::{debug, error, info};
19
20/// Configuration for Zello client
21#[derive(Debug, Clone)]
22pub struct ZelloConfig {
23    /// Username for authentication
24    pub username: Option<String>,
25    /// Password for authentication
26    pub password: Option<String>,
27    /// Channel to join
28    pub channel: String,
29    /// Optional authentication token (alternative to username/password)
30    pub auth_token: Option<String>,
31}
32
33impl ZelloConfig {
34    /// Create a new configuration with username, password and token
35    #[must_use]
36    pub fn new(username: String, password: String, auth_token: String, channel: String) -> Self {
37        Self {
38            username: Some(username),
39            password: Some(password),
40            channel,
41            auth_token: Some(auth_token),
42        }
43    }
44
45    /// Validate the configuration
46    ///
47    /// #Errors
48    ///
49    /// Returns an error if the configuration is invalid
50    pub fn validate(&self) -> Result<()> {
51        if self.channel.is_empty() {
52            return Err(ZelloError::ConfigError(
53                "Channel cannot be empty".to_string(),
54            ));
55        }
56
57        if !(self.auth_token.is_some() && self.username.is_some() && self.password.is_some()) {
58            return Err(ZelloError::ConfigError(
59                "Must provide auth_token and username/password".to_string(),
60            ));
61        }
62
63        if let (Some(username), Some(password), Some(token)) =
64            (&self.username, &self.password, &self.auth_token)
65            && (username.is_empty() || password.is_empty() || token.is_empty())
66        {
67            return Err(ZelloError::ConfigError(
68                "Username, password, and auth token cannot be empty".to_string(),
69            ));
70        }
71
72        Ok(())
73    }
74}
75
76/// Zello client for interacting with the Zello API
77#[derive(Debug)]
78pub struct ZelloClient {
79    protocol: Protocol,
80    config: ZelloConfig,
81    authenticated: bool,
82    active_streams: HashMap<u32, StreamInfo>,
83    active_inbound_streams: HashMap<u32, StreamInfo>,
84    refresh_token: String,
85}
86
87/// Attributes of a Zello stream
88#[derive(Debug, Clone, Default)]
89pub struct StreamInfo {
90    pub channel: String,
91    pub codec: String,
92    pub callsign: Option<String>,
93}
94
95/// Zello client for interacting with the Zello API
96impl ZelloClient {
97    /// Create a new Zello client and connect
98    ///
99    /// #Errors
100    ///
101    /// Returns an error if fail to create a new Zello client
102    pub async fn new(config: ZelloConfig) -> Result<Self> {
103        config.validate()?;
104
105        let protocol = Protocol::connect(None).await?;
106
107        let mut client = Self {
108            protocol,
109            config,
110            authenticated: false,
111            active_streams: HashMap::new(),
112            active_inbound_streams: HashMap::new(),
113            refresh_token: String::new(),
114        };
115
116        client.authenticate().await?;
117
118        Ok(client)
119    }
120
121    /// Authenticate with the Zello server
122    ///
123    /// #Errors
124    ///
125    /// Returns an error if fail to authenticate with the Zello server
126    async fn authenticate(&mut self) -> Result<()> {
127        let message = match (
128            &self.config.username,
129            &self.config.password,
130            &self.config.auth_token,
131        ) {
132            (Some(user), Some(password), Some(token)) => Message::logon_password(
133                self.protocol.next_seq(),
134                user.clone(),
135                password.clone(),
136                token.clone(),
137                self.config.channel.clone(),
138            ),
139            (_, _, Some(token)) => Message::logon_token(
140                self.protocol.next_seq(),
141                token.clone(),
142                self.config.channel.clone(),
143            ),
144            _ => {
145                return Err(ZelloError::AuthenticationError(
146                    "Insufficient Authentication credentials provided".to_string(),
147                ));
148            }
149        };
150
151        self.protocol.send(message).await?;
152
153        // Wait for authentication response
154        let response = timeout(Duration::from_secs(10), self.protocol.receive())
155            .await
156            .map_err(|_| ZelloError::Timeout)?;
157
158        debug!("Received response: {response:?}");
159
160        match response? {
161            Some(IncomingMessage::Response(Response::Logon {
162                success: true,
163                refresh_token,
164                ..
165            })) => {
166                self.authenticated = true;
167                self.refresh_token = refresh_token;
168                Ok(())
169            }
170
171            Some(IncomingMessage::Response(Response::Logon {
172                success: false,
173                error,
174                ..
175            })) => Err(ZelloError::AuthenticationError(error.unwrap_or_default())),
176
177            _ => Err(ZelloError::ProtocolError(
178                "Unexpected response to logon".to_string(),
179            )),
180        }
181    }
182
183    /// Run the main message processing loop
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if message receiving fails
188    pub async fn run_message_loop(
189        &mut self,
190        decoder: Arc<Mutex<Decoder>>,
191        pcm_tx: &Sender<Vec<i16>>,
192    ) -> Result<()> {
193        info!("Listening for messages (press Ctrl+C to exit)...");
194
195        loop {
196            match self.receive_message().await {
197                Ok(Some(message)) => {
198                    handle_message(self, message, decoder.clone(), pcm_tx).await;
199                }
200                Ok(None) => {
201                    info!("Connection closed");
202                    break;
203                }
204                Err(e) => {
205                    error!("Error receiving message: {e}");
206                    break;
207                }
208            }
209        }
210
211        Ok(())
212    }
213
214    /// Send a text message to the channel
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if fail to send a text message to the channel
219    pub async fn send_text_message(&mut self, text: &str) -> Result<()> {
220        if !self.authenticated {
221            return Err(ZelloError::NotConnected);
222        }
223
224        let message = Message::send_text(
225            self.protocol.next_seq(),
226            self.config.channel.clone(),
227            text.to_string(),
228        );
229
230        self.protocol.send(message).await?;
231
232        info!(
233            "Sent text message to channel [{}]: {}",
234            self.config.channel, text
235        );
236
237        Ok(())
238    }
239
240    /// Send a text message to the channel
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if fail to send a text message to the channel
245    pub async fn send_text_message_to_callsign(
246        &mut self,
247        text: &str,
248        callsign: &str,
249    ) -> Result<()> {
250        if !self.authenticated {
251            return Err(ZelloError::NotConnected);
252        }
253
254        let message = Message::send_text_for_callsign(
255            self.protocol.next_seq(),
256            self.config.channel.clone(),
257            text.to_string(),
258            callsign.to_string(),
259        );
260
261        self.protocol.send(message).await?;
262
263        info!("Sent text message to callsign [{}]: {}", callsign, text,);
264
265        Ok(())
266    }
267
268    /// Start an audio stream
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if fail to start an audio stream
273    pub async fn start_audio_stream(&mut self, codec: &str, packet_duration: u32) -> Result<u32> {
274        if !self.authenticated {
275            return Err(ZelloError::NotConnected);
276        }
277
278        let seq = self.protocol.next_seq();
279        let message = Message::start_stream(
280            seq,
281            self.config.channel.clone(),
282            codec.to_string(),
283            packet_duration,
284        );
285
286        self.protocol.send(message).await?;
287
288        // Wait for response
289        let response = timeout(Duration::from_secs(5), self.protocol.receive())
290            .await
291            .map_err(|_| ZelloError::Timeout)?;
292
293        match response? {
294            Some(IncomingMessage::Response(Response::Generic { success: true, .. })) => {
295                let stream_id = seq; // Use seq as stream_id for now
296                self.active_streams.insert(
297                    stream_id,
298                    StreamInfo {
299                        channel: self.config.channel.clone(),
300                        codec: codec.to_string(),
301                        ..Default::default()
302                    },
303                );
304                Ok(stream_id)
305            }
306            Some(IncomingMessage::Response(Response::Generic {
307                success: false,
308                error,
309                ..
310            })) => Err(ZelloError::AudioError(
311                error.unwrap_or_else(|| "Failed to start stream".to_string()),
312            )),
313            _ => Err(ZelloError::ProtocolError(
314                "Unexpected response to start_stream".to_string(),
315            )),
316        }
317    }
318
319    /// Send audio data packet
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if fail to send audio data packet
324    pub async fn send_audio_packet(&mut self, stream_id: u32, data: Vec<u8>) -> Result<()> {
325        if !self.active_streams.contains_key(&stream_id) {
326            return Err(ZelloError::AudioError("Invalid stream ID".to_string()));
327        }
328
329        self.protocol.send_audio_data(data).await?;
330        Ok(())
331    }
332
333    /// Stop an audio stream
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if fail to stop an audio stream
338    pub async fn stop_audio_stream(&mut self, stream_id: u32) -> Result<()> {
339        if !self.active_streams.contains_key(&stream_id) {
340            return Err(ZelloError::AudioError("Invalid stream ID".to_string()));
341        }
342
343        let message = Message::stop_stream(self.protocol.next_seq(), stream_id);
344        self.protocol.send(message).await?;
345
346        self.active_streams.remove(&stream_id);
347        Ok(())
348    }
349
350    /// Receive the next message
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if fail to receive the next message
355    pub async fn receive_message(&mut self) -> Result<Option<IncomingMessage>> {
356        self.protocol.receive().await
357    }
358
359    /// Check if client is authenticated
360    pub fn is_authenticated(&self) -> bool {
361        self.authenticated
362    }
363
364    /// Get the current channel
365    pub fn channel(&self) -> &str {
366        &self.config.channel
367    }
368
369    /// Close the connection
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if fail to close the connection
374    pub async fn close(self) -> Result<()> {
375        self.protocol.close().await
376    }
377
378    /// Add a new inbound stream to the client
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if fail to add a new inbound stream
383    pub fn add_inbound_stream(
384        &mut self,
385        stream_id: u32,
386        channel: String,
387        codec: String,
388        callsign: Option<String>,
389    ) -> Result<()> {
390        self.active_inbound_streams.insert(
391            stream_id,
392            StreamInfo {
393                channel,
394                codec,
395                callsign,
396            },
397        );
398        Ok(())
399    }
400
401    /// Get an inbound stream from the client
402    pub fn get_inbound_stream(&self, stream_id: u32) -> Option<&StreamInfo> {
403        self.active_inbound_streams.get(&stream_id)
404    }
405
406    /// Remove an inbound stream from the client
407    ///
408    /// # Errors
409    ///
410    /// Returns an error if fail to remove an inbound stream
411    pub fn remove_inbound_stream(&mut self, stream_id: u32) -> Result<()> {
412        self.active_inbound_streams.remove(&stream_id);
413        Ok(())
414    }
415}
416
417/// Command line interface for Zello
418#[derive(Debug)]
419pub struct Credentials {
420    pub username: String,
421    pub password: String,
422    pub token: String,
423    pub channel: String,
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[test]
431    fn test_config_validation() {
432        let config = ZelloConfig::new(
433            "user".to_string(),
434            "pass".to_string(),
435            "token".to_string(),
436            "channel".to_string(),
437        );
438        assert!(config.validate().is_ok());
439
440        let config = ZelloConfig::new(
441            "user".to_string(),
442            "pass".to_string(),
443            "token".to_string(),
444            String::new(),
445        );
446        assert!(config.validate().is_err());
447
448        let config = ZelloConfig::new(
449            String::new(),
450            String::new(),
451            "token".to_string(),
452            "channel".to_string(),
453        );
454        assert!(config.validate().is_err());
455    }
456}