1use crate::error::{Result, ZelloError};
7use crate::handlers::handle_message;
8use crate::message::IncomingMessage;
9use crate::message::Message;
10use crate::message::Response;
11use crate::protocol::Protocol;
12use audiopus::coder::Decoder;
13use crossbeam_channel::Sender;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::Mutex;
17use tokio::time::{Duration, timeout};
18use tracing::{debug, error, info};
19
20#[derive(Debug, Clone)]
22pub struct ZelloConfig {
23 pub username: Option<String>,
25 pub password: Option<String>,
27 pub channel: String,
29 pub auth_token: Option<String>,
31}
32
33impl ZelloConfig {
34 #[must_use]
36 pub fn new(username: String, password: String, auth_token: String, channel: String) -> Self {
37 Self {
38 username: Some(username),
39 password: Some(password),
40 channel,
41 auth_token: Some(auth_token),
42 }
43 }
44
45 pub fn validate(&self) -> Result<()> {
51 if self.channel.is_empty() {
52 return Err(ZelloError::ConfigError(
53 "Channel cannot be empty".to_string(),
54 ));
55 }
56
57 if !(self.auth_token.is_some() && self.username.is_some() && self.password.is_some()) {
58 return Err(ZelloError::ConfigError(
59 "Must provide auth_token and username/password".to_string(),
60 ));
61 }
62
63 if let (Some(username), Some(password), Some(token)) =
64 (&self.username, &self.password, &self.auth_token)
65 && (username.is_empty() || password.is_empty() || token.is_empty())
66 {
67 return Err(ZelloError::ConfigError(
68 "Username, password, and auth token cannot be empty".to_string(),
69 ));
70 }
71
72 Ok(())
73 }
74}
75
76#[derive(Debug)]
78pub struct ZelloClient {
79 protocol: Protocol,
80 config: ZelloConfig,
81 authenticated: bool,
82 active_streams: HashMap<u32, StreamInfo>,
83 active_inbound_streams: HashMap<u32, StreamInfo>,
84 refresh_token: String,
85}
86
87#[derive(Debug, Clone, Default)]
89pub struct StreamInfo {
90 pub channel: String,
91 pub codec: String,
92 pub callsign: Option<String>,
93}
94
95impl ZelloClient {
97 pub async fn new(config: ZelloConfig) -> Result<Self> {
103 config.validate()?;
104
105 let protocol = Protocol::connect(None).await?;
106
107 let mut client = Self {
108 protocol,
109 config,
110 authenticated: false,
111 active_streams: HashMap::new(),
112 active_inbound_streams: HashMap::new(),
113 refresh_token: String::new(),
114 };
115
116 client.authenticate().await?;
117
118 Ok(client)
119 }
120
121 async fn authenticate(&mut self) -> Result<()> {
127 let message = match (
128 &self.config.username,
129 &self.config.password,
130 &self.config.auth_token,
131 ) {
132 (Some(user), Some(password), Some(token)) => Message::logon_password(
133 self.protocol.next_seq(),
134 user.clone(),
135 password.clone(),
136 token.clone(),
137 self.config.channel.clone(),
138 ),
139 (_, _, Some(token)) => Message::logon_token(
140 self.protocol.next_seq(),
141 token.clone(),
142 self.config.channel.clone(),
143 ),
144 _ => {
145 return Err(ZelloError::AuthenticationError(
146 "Insufficient Authentication credentials provided".to_string(),
147 ));
148 }
149 };
150
151 self.protocol.send(message).await?;
152
153 let response = timeout(Duration::from_secs(10), self.protocol.receive())
155 .await
156 .map_err(|_| ZelloError::Timeout)?;
157
158 debug!("Received response: {response:?}");
159
160 match response? {
161 Some(IncomingMessage::Response(Response::Logon {
162 success: true,
163 refresh_token,
164 ..
165 })) => {
166 self.authenticated = true;
167 self.refresh_token = refresh_token;
168 Ok(())
169 }
170
171 Some(IncomingMessage::Response(Response::Logon {
172 success: false,
173 error,
174 ..
175 })) => Err(ZelloError::AuthenticationError(error.unwrap_or_default())),
176
177 _ => Err(ZelloError::ProtocolError(
178 "Unexpected response to logon".to_string(),
179 )),
180 }
181 }
182
183 pub async fn run_message_loop(
189 &mut self,
190 decoder: Arc<Mutex<Decoder>>,
191 pcm_tx: &Sender<Vec<i16>>,
192 ) -> Result<()> {
193 info!("Listening for messages (press Ctrl+C to exit)...");
194
195 loop {
196 match self.receive_message().await {
197 Ok(Some(message)) => {
198 handle_message(self, message, decoder.clone(), pcm_tx).await;
199 }
200 Ok(None) => {
201 info!("Connection closed");
202 break;
203 }
204 Err(e) => {
205 error!("Error receiving message: {e}");
206 break;
207 }
208 }
209 }
210
211 Ok(())
212 }
213
214 pub async fn send_text_message(&mut self, text: &str) -> Result<()> {
220 if !self.authenticated {
221 return Err(ZelloError::NotConnected);
222 }
223
224 let message = Message::send_text(
225 self.protocol.next_seq(),
226 self.config.channel.clone(),
227 text.to_string(),
228 );
229
230 self.protocol.send(message).await?;
231
232 info!(
233 "Sent text message to channel [{}]: {}",
234 self.config.channel, text
235 );
236
237 Ok(())
238 }
239
240 pub async fn send_text_message_to_callsign(
246 &mut self,
247 text: &str,
248 callsign: &str,
249 ) -> Result<()> {
250 if !self.authenticated {
251 return Err(ZelloError::NotConnected);
252 }
253
254 let message = Message::send_text_for_callsign(
255 self.protocol.next_seq(),
256 self.config.channel.clone(),
257 text.to_string(),
258 callsign.to_string(),
259 );
260
261 self.protocol.send(message).await?;
262
263 info!("Sent text message to callsign [{}]: {}", callsign, text,);
264
265 Ok(())
266 }
267
268 pub async fn start_audio_stream(&mut self, codec: &str, packet_duration: u32) -> Result<u32> {
274 if !self.authenticated {
275 return Err(ZelloError::NotConnected);
276 }
277
278 let seq = self.protocol.next_seq();
279 let message = Message::start_stream(
280 seq,
281 self.config.channel.clone(),
282 codec.to_string(),
283 packet_duration,
284 );
285
286 self.protocol.send(message).await?;
287
288 let response = timeout(Duration::from_secs(5), self.protocol.receive())
290 .await
291 .map_err(|_| ZelloError::Timeout)?;
292
293 match response? {
294 Some(IncomingMessage::Response(Response::Generic { success: true, .. })) => {
295 let stream_id = seq; self.active_streams.insert(
297 stream_id,
298 StreamInfo {
299 channel: self.config.channel.clone(),
300 codec: codec.to_string(),
301 ..Default::default()
302 },
303 );
304 Ok(stream_id)
305 }
306 Some(IncomingMessage::Response(Response::Generic {
307 success: false,
308 error,
309 ..
310 })) => Err(ZelloError::AudioError(
311 error.unwrap_or_else(|| "Failed to start stream".to_string()),
312 )),
313 _ => Err(ZelloError::ProtocolError(
314 "Unexpected response to start_stream".to_string(),
315 )),
316 }
317 }
318
319 pub async fn send_audio_packet(&mut self, stream_id: u32, data: Vec<u8>) -> Result<()> {
325 if !self.active_streams.contains_key(&stream_id) {
326 return Err(ZelloError::AudioError("Invalid stream ID".to_string()));
327 }
328
329 self.protocol.send_audio_data(data).await?;
330 Ok(())
331 }
332
333 pub async fn stop_audio_stream(&mut self, stream_id: u32) -> Result<()> {
339 if !self.active_streams.contains_key(&stream_id) {
340 return Err(ZelloError::AudioError("Invalid stream ID".to_string()));
341 }
342
343 let message = Message::stop_stream(self.protocol.next_seq(), stream_id);
344 self.protocol.send(message).await?;
345
346 self.active_streams.remove(&stream_id);
347 Ok(())
348 }
349
350 pub async fn receive_message(&mut self) -> Result<Option<IncomingMessage>> {
356 self.protocol.receive().await
357 }
358
359 pub fn is_authenticated(&self) -> bool {
361 self.authenticated
362 }
363
364 pub fn channel(&self) -> &str {
366 &self.config.channel
367 }
368
369 pub async fn close(self) -> Result<()> {
375 self.protocol.close().await
376 }
377
378 pub fn add_inbound_stream(
384 &mut self,
385 stream_id: u32,
386 channel: String,
387 codec: String,
388 callsign: Option<String>,
389 ) -> Result<()> {
390 self.active_inbound_streams.insert(
391 stream_id,
392 StreamInfo {
393 channel,
394 codec,
395 callsign,
396 },
397 );
398 Ok(())
399 }
400
401 pub fn get_inbound_stream(&self, stream_id: u32) -> Option<&StreamInfo> {
403 self.active_inbound_streams.get(&stream_id)
404 }
405
406 pub fn remove_inbound_stream(&mut self, stream_id: u32) -> Result<()> {
412 self.active_inbound_streams.remove(&stream_id);
413 Ok(())
414 }
415}
416
417#[derive(Debug)]
419pub struct Credentials {
420 pub username: String,
421 pub password: String,
422 pub token: String,
423 pub channel: String,
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429
430 #[test]
431 fn test_config_validation() {
432 let config = ZelloConfig::new(
433 "user".to_string(),
434 "pass".to_string(),
435 "token".to_string(),
436 "channel".to_string(),
437 );
438 assert!(config.validate().is_ok());
439
440 let config = ZelloConfig::new(
441 "user".to_string(),
442 "pass".to_string(),
443 "token".to_string(),
444 String::new(),
445 );
446 assert!(config.validate().is_err());
447
448 let config = ZelloConfig::new(
449 String::new(),
450 String::new(),
451 "token".to_string(),
452 "channel".to_string(),
453 );
454 assert!(config.validate().is_err());
455 }
456}