1use crate::message;
2use crate::Header;
3use anyhow::bail;
4use anyhow::Context;
5use anyhow::Result;
6use log::{debug, trace};
7use tokio::io::AsyncReadExt;
8
9pub(crate) async fn read_message<S>(s: &mut S) -> Result<message::Msg>
11where
12 S: AsyncReadExt + Unpin + Send,
13{
14 let mut pkt_buf: Vec<u8>;
15 pkt_buf = vec![0; 512];
16
17 trace!("The size of buffer is: {}", pkt_buf.len());
18 let n = s
19 .read(&mut pkt_buf)
20 .await
21 .context("Error while reading data")?;
22 if n == 0 {
23 bail!("Error while reading data from the socket");
24 }
25 debug!("Incoming pkt: {:?}", pkt_buf[..8].to_vec().clone());
26 let header: Header = Header::try_from(&pkt_buf[..8])?;
27 debug!("{:?}", header);
28
29 let topic: String = String::from_utf8(pkt_buf[8..(8 + header.topic_length).into()].to_vec())
30 .context("Error while parsing the topic string")?;
31 let message_position: usize = ((8 + header.topic_length) as u16 + header.message_length).into();
32
33 if 504 - u16::from(header.topic_length) < header.message_length {
34 let bytes_remaining = header.message_length - (504 - u16::from(header.topic_length));
35 trace!("The message is bigger, reading the remaining chunk");
36 trace!("{} bytes remaining", bytes_remaining);
37
38 let mut buf: Vec<u8> = Vec::with_capacity(bytes_remaining.into());
39 trace!("Reading next bytes");
40
41 let n = s.read_buf(&mut buf).await?;
42 if n == 0 {
43 bail!("Error while reading the data from socket");
44 }
45
46 pkt_buf.extend(buf);
47 }
48 Ok(message::Msg {
49 header: header.clone(),
50 topic,
51 message: pkt_buf[(8 + header.topic_length).into()..message_position].to_vec(),
52 channel: None,
53 client_id: None,
54 })
55}