zello_client/
protocol.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2// SPDX-FileCopyrightText: 2024 John C. Murray
3
4//! Zello protocol implementation
5
6use bytes::Buf;
7use futures_util::{SinkExt, StreamExt};
8use tokio::net::TcpStream;
9use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
10use tracing::debug;
11use tungstenite::protocol::Message as WsMessage;
12
13use crate::ZELLO_DEFAULT_URL;
14use crate::error::{Result, ZelloError};
15use crate::message::{Event, IncomingMessage, Message};
16
17/// Zello protocol handler
18#[derive(Debug)]
19pub struct Protocol {
20    ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
21    sequence: u32,
22}
23
24impl Protocol {
25    /// Connect to Zello server
26    ///
27    /// # Errors
28    ///
29    /// Returns an error if the WebSocket connection fails
30    pub async fn connect(url: Option<&str>) -> Result<Self> {
31        let url = url.unwrap_or(ZELLO_DEFAULT_URL);
32        let (ws, _) = connect_async(url)
33            .await
34            .map_err(|e| ZelloError::ConnectionError(e.to_string()))?;
35
36        Ok(Self { ws, sequence: 1 })
37    }
38
39    /// Send a message
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if serialization or sending fails
44    pub async fn send(&mut self, message: Message) -> Result<()> {
45        let json = serde_json::to_string(&message)?;
46        debug!("Sending message: {json}");
47        self.ws
48            .send(WsMessage::Text(json.into()))
49            .await
50            .map_err(|e| ZelloError::ConnectionError(e.to_string()))?;
51        Ok(())
52    }
53
54    /// Send a message and return its sequence number
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if sending fails
59    pub async fn send_with_seq(&mut self, mut message: Message) -> Result<u32> {
60        let seq = self.next_seq();
61
62        // Set sequence number based on message type
63        match &mut message {
64            Message::Logon { seq: s, .. }
65            | Message::SendTextMessage { seq: s, .. }
66            | Message::StartStream { seq: s, .. }
67            | Message::StopStream { seq: s, .. } => *s = seq,
68        }
69
70        self.send(message).await?;
71        Ok(seq)
72    }
73
74    /// Receive the next message
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the connection fails or message parsing fails
79    pub async fn receive(&mut self) -> Result<Option<IncomingMessage>> {
80        loop {
81            match self.ws.next().await {
82                Some(Ok(WsMessage::Text(text))) => {
83                    debug!("Receiving message: {text}");
84                    let message: IncomingMessage = serde_json::from_str(&text)?;
85                    debug!("Parsed message: {message:?}");
86                    return Ok(Some(message));
87                }
88                Some(Ok(WsMessage::Binary(mut data))) => {
89                    let data_length = data.len();
90                    let data_type = data.get_u8();
91                    let stream_id = data.get_u32();
92                    let packet_id = data.get_u32();
93                    let audio_data = data.split_to(data.len());
94
95                    debug!(
96                        "Received binary message of {data_length} bytes, type: {data_type}, \
97                         stream_id: {stream_id}, packet_id: {packet_id}, audio_data_len: {}",
98                        audio_data.len()
99                    );
100
101                    let message = IncomingMessage::Event(Event::AudioData {
102                        stream_id,
103                        packet_id,
104                        data: audio_data.to_vec(),
105                    });
106                    return Ok(Some(message));
107                }
108                Some(Ok(WsMessage::Ping(_) | WsMessage::Pong(_))) => {
109                    // Continue loop for ping/pong messages
110                }
111                Some(Ok(WsMessage::Close(_))) => {
112                    return Err(ZelloError::ConnectionError("Connection closed".to_string()));
113                }
114                Some(Ok(WsMessage::Frame(_))) => {
115                    return Err(ZelloError::ProtocolError(
116                        "Unexpected frame message".to_string(),
117                    ));
118                }
119                Some(Err(e)) => {
120                    return Err(ZelloError::WebSocketError(Box::new(e)));
121                }
122                None => return Ok(None),
123            }
124        }
125    }
126
127    /// Get the next sequence number
128    #[must_use]
129    pub fn next_seq(&mut self) -> u32 {
130        let seq = self.sequence;
131        self.sequence = self.sequence.wrapping_add(1);
132        seq
133    }
134
135    /// Close the connection
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if closing the WebSocket fails
140    pub async fn close(mut self) -> Result<()> {
141        self.ws
142            .close(None)
143            .await
144            .map_err(|e| ZelloError::ConnectionError(e.to_string()))?;
145        Ok(())
146    }
147
148    /// Send raw audio data
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if sending fails
153    pub async fn send_audio_data(&mut self, data: Vec<u8>) -> Result<()> {
154        self.ws
155            .send(WsMessage::Binary(data.into()))
156            .await
157            .map_err(|e| ZelloError::AudioError(e.to_string()))?;
158        Ok(())
159    }
160}