1use bytes::Buf;
7use futures_util::{SinkExt, StreamExt};
8use tokio::net::TcpStream;
9use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
10use tracing::debug;
11use tungstenite::protocol::Message as WsMessage;
12
13use crate::ZELLO_DEFAULT_URL;
14use crate::error::{Result, ZelloError};
15use crate::message::{Event, IncomingMessage, Message};
16
17#[derive(Debug)]
19pub struct Protocol {
20 ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
21 sequence: u32,
22}
23
24impl Protocol {
25 pub async fn connect(url: Option<&str>) -> Result<Self> {
31 let url = url.unwrap_or(ZELLO_DEFAULT_URL);
32 let (ws, _) = connect_async(url)
33 .await
34 .map_err(|e| ZelloError::ConnectionError(e.to_string()))?;
35
36 Ok(Self { ws, sequence: 1 })
37 }
38
39 pub async fn send(&mut self, message: Message) -> Result<()> {
45 let json = serde_json::to_string(&message)?;
46 debug!("Sending message: {json}");
47 self.ws
48 .send(WsMessage::Text(json.into()))
49 .await
50 .map_err(|e| ZelloError::ConnectionError(e.to_string()))?;
51 Ok(())
52 }
53
54 pub async fn send_with_seq(&mut self, mut message: Message) -> Result<u32> {
60 let seq = self.next_seq();
61
62 match &mut message {
64 Message::Logon { seq: s, .. }
65 | Message::SendTextMessage { seq: s, .. }
66 | Message::StartStream { seq: s, .. }
67 | Message::StopStream { seq: s, .. } => *s = seq,
68 }
69
70 self.send(message).await?;
71 Ok(seq)
72 }
73
74 pub async fn receive(&mut self) -> Result<Option<IncomingMessage>> {
80 loop {
81 match self.ws.next().await {
82 Some(Ok(WsMessage::Text(text))) => {
83 debug!("Receiving message: {text}");
84 let message: IncomingMessage = serde_json::from_str(&text)?;
85 debug!("Parsed message: {message:?}");
86 return Ok(Some(message));
87 }
88 Some(Ok(WsMessage::Binary(mut data))) => {
89 let data_length = data.len();
90 let data_type = data.get_u8();
91 let stream_id = data.get_u32();
92 let packet_id = data.get_u32();
93 let audio_data = data.split_to(data.len());
94
95 debug!(
96 "Received binary message of {data_length} bytes, type: {data_type}, \
97 stream_id: {stream_id}, packet_id: {packet_id}, audio_data_len: {}",
98 audio_data.len()
99 );
100
101 let message = IncomingMessage::Event(Event::AudioData {
102 stream_id,
103 packet_id,
104 data: audio_data.to_vec(),
105 });
106 return Ok(Some(message));
107 }
108 Some(Ok(WsMessage::Ping(_) | WsMessage::Pong(_))) => {
109 }
111 Some(Ok(WsMessage::Close(_))) => {
112 return Err(ZelloError::ConnectionError("Connection closed".to_string()));
113 }
114 Some(Ok(WsMessage::Frame(_))) => {
115 return Err(ZelloError::ProtocolError(
116 "Unexpected frame message".to_string(),
117 ));
118 }
119 Some(Err(e)) => {
120 return Err(ZelloError::WebSocketError(Box::new(e)));
121 }
122 None => return Ok(None),
123 }
124 }
125 }
126
127 #[must_use]
129 pub fn next_seq(&mut self) -> u32 {
130 let seq = self.sequence;
131 self.sequence = self.sequence.wrapping_add(1);
132 seq
133 }
134
135 pub async fn close(mut self) -> Result<()> {
141 self.ws
142 .close(None)
143 .await
144 .map_err(|e| ZelloError::ConnectionError(e.to_string()))?;
145 Ok(())
146 }
147
148 pub async fn send_audio_data(&mut self, data: Vec<u8>) -> Result<()> {
154 self.ws
155 .send(WsMessage::Binary(data.into()))
156 .await
157 .map_err(|e| ZelloError::AudioError(e.to_string()))?;
158 Ok(())
159 }
160}