agecli_skill_protocol/
reader.rs1use super::message::{SkillMessage, SkillMessageType};
4use tokio::io::{AsyncRead, AsyncReadExt};
5
6pub struct SkillMessageReader<R> {
8 reader: R,
9}
10
11impl<R: AsyncRead + Unpin> SkillMessageReader<R> {
12 pub fn new(reader: R) -> Self {
13 Self { reader }
14 }
15
16 pub async fn read_message(&mut self) -> std::io::Result<Option<SkillMessage>> {
19 let mut type_buf = [0u8; 4];
21 match read_exact_or_eof(&mut self.reader, &mut type_buf).await? {
22 0 => return Ok(None), n if n < 4 => {
24 return Err(std::io::Error::new(
25 std::io::ErrorKind::UnexpectedEof,
26 "incomplete message type header",
27 ));
28 }
29 _ => {}
30 }
31
32 let msg_type_raw = u32::from_be_bytes(type_buf);
33 let msg_type = SkillMessageType::from_u32(msg_type_raw).ok_or_else(|| {
34 std::io::Error::new(
35 std::io::ErrorKind::InvalidData,
36 format!("unknown message type: {msg_type_raw}"),
37 )
38 })?;
39
40 let mut len_buf = [0u8; 4];
42 self.reader.read_exact(&mut len_buf).await?;
43 let payload_len = u32::from_be_bytes(len_buf) as usize;
44
45 if payload_len > 16 * 1024 * 1024 {
47 return Err(std::io::Error::new(
48 std::io::ErrorKind::InvalidData,
49 format!("message payload too large: {payload_len}"),
50 ));
51 }
52
53 let mut payload = vec![0u8; payload_len];
55 if payload_len > 0 {
56 self.reader.read_exact(&mut payload).await?;
57 }
58
59 Ok(Some(SkillMessage::new(msg_type, payload)))
60 }
61}
62
63async fn read_exact_or_eof<R: AsyncRead + Unpin>(
65 reader: &mut R,
66 buf: &mut [u8],
67) -> std::io::Result<usize> {
68 let mut offset = 0;
69 while offset < buf.len() {
70 let n = reader.read(&mut buf[offset..]).await?;
71 if n == 0 {
72 return Ok(offset);
73 }
74 offset += n;
75 }
76 Ok(offset)
77}