Skip to main content

agecli_skill_protocol/
reader.rs

1//! Read skill messages from a stream.
2
3use super::message::{SkillMessage, SkillMessageType};
4use tokio::io::{AsyncRead, AsyncReadExt};
5
6/// Reads binary-framed skill messages from a stream.
7pub struct SkillMessageReader<R> {
8    reader: R,
9}
10
11impl<R: AsyncRead + Unpin> SkillMessageReader<R> {
12    pub fn new(reader: R) -> Self {
13        Self { reader }
14    }
15
16    /// Read the next message from the stream.
17    /// Returns `None` on clean EOF.
18    pub async fn read_message(&mut self) -> std::io::Result<Option<SkillMessage>> {
19        // Read 4-byte message type
20        let mut type_buf = [0u8; 4];
21        match read_exact_or_eof(&mut self.reader, &mut type_buf).await? {
22            0 => return Ok(None), // EOF
23            n if n < 4 => {
24                return Err(std::io::Error::new(
25                    std::io::ErrorKind::UnexpectedEof,
26                    "incomplete message type header",
27                ));
28            }
29            _ => {}
30        }
31
32        let msg_type_raw = u32::from_be_bytes(type_buf);
33        let msg_type = SkillMessageType::from_u32(msg_type_raw).ok_or_else(|| {
34            std::io::Error::new(
35                std::io::ErrorKind::InvalidData,
36                format!("unknown message type: {msg_type_raw}"),
37            )
38        })?;
39
40        // Read 4-byte payload length
41        let mut len_buf = [0u8; 4];
42        self.reader.read_exact(&mut len_buf).await?;
43        let payload_len = u32::from_be_bytes(len_buf) as usize;
44
45        // Sanity check
46        if payload_len > 16 * 1024 * 1024 {
47            return Err(std::io::Error::new(
48                std::io::ErrorKind::InvalidData,
49                format!("message payload too large: {payload_len}"),
50            ));
51        }
52
53        // Read payload
54        let mut payload = vec![0u8; payload_len];
55        if payload_len > 0 {
56            self.reader.read_exact(&mut payload).await?;
57        }
58
59        Ok(Some(SkillMessage::new(msg_type, payload)))
60    }
61}
62
63/// Read exactly `buf.len()` bytes, returning 0 on immediate EOF.
64async fn read_exact_or_eof<R: AsyncRead + Unpin>(
65    reader: &mut R,
66    buf: &mut [u8],
67) -> std::io::Result<usize> {
68    let mut offset = 0;
69    while offset < buf.len() {
70        let n = reader.read(&mut buf[offset..]).await?;
71        if n == 0 {
72            return Ok(offset);
73        }
74        offset += n;
75    }
76    Ok(offset)
77}