simple_pub_sub/
stream.rs

1use crate::message;
2use crate::Header;
3use anyhow::bail;
4use anyhow::Context;
5use anyhow::Result;
6use log::{debug, trace};
7use tokio::io::AsyncReadExt;
8
9/// reads a data from a `TcpStream` and returns a `Msg`.
10pub(crate) async fn read_message<S>(s: &mut S) -> Result<message::Msg>
11where
12    S: AsyncReadExt + Unpin + Send,
13{
14    let mut pkt_buf: Vec<u8>;
15    pkt_buf = vec![0; 512];
16
17    trace!("The size of buffer is: {}", pkt_buf.len());
18    let n = s
19        .read(&mut pkt_buf)
20        .await
21        .context("Error while reading data")?;
22    if n == 0 {
23        bail!("Error while reading data from the socket");
24    }
25    debug!("Incoming pkt: {:?}", pkt_buf[..8].to_vec().clone());
26    let header: Header = Header::try_from(&pkt_buf[..8])?;
27    debug!("{:?}", header);
28
29    let topic: String = String::from_utf8(pkt_buf[8..(8 + header.topic_length).into()].to_vec())
30        .context("Error while parsing the topic string")?;
31    let message_position: usize = ((8 + header.topic_length) as u16 + header.message_length).into();
32
33    if 504 - u16::from(header.topic_length) < header.message_length {
34        let bytes_remaining = header.message_length - (504 - u16::from(header.topic_length));
35        trace!("The message is bigger, reading the remaining chunk");
36        trace!("{} bytes remaining", bytes_remaining);
37
38        let mut buf: Vec<u8> = Vec::with_capacity(bytes_remaining.into());
39        trace!("Reading next bytes");
40
41        let n = s.read_buf(&mut buf).await?;
42        if n == 0 {
43            bail!("Error while reading the data from socket");
44        }
45
46        pkt_buf.extend(buf);
47    }
48    Ok(message::Msg {
49        header: header.clone(),
50        topic,
51        message: pkt_buf[(8 + header.topic_length).into()..message_position].to_vec(),
52        channel: None,
53        client_id: None,
54    })
55}