1use std::io::{self, Read, Write};
3
4#[derive(Debug, Clone, PartialEq, Eq)]
6pub struct Frame {
7 pub channel: String,
9 pub payload: Vec<u8>,
11}
12
13impl Frame {
14 pub fn new(channel: impl Into<String>, payload: impl Into<Vec<u8>>) -> Self {
16 Frame {
17 channel: channel.into(),
18 payload: payload.into(),
19 }
20 }
21 pub fn control(json: &[u8]) -> Self {
24 let mut payload = Vec::with_capacity(json.len() + 1);
25 payload.extend_from_slice(json);
26 payload.push(b'\n');
27 Frame {
28 channel: String::new(),
29 payload,
30 }
31 }
32}
33
34pub fn write_frame<W: Write>(w: &mut W, frame: &Frame) -> io::Result<()> {
36 let mut message = Vec::with_capacity(frame.channel.len() + 1 + frame.payload.len());
37 message.extend_from_slice(frame.channel.as_bytes());
38 message.push(b'\n');
39 message.extend_from_slice(&frame.payload);
40 writeln!(w, "{}", message.len())?;
41 w.write_all(&message)?;
42 w.flush()
43}
44
45pub fn read_frame<R: Read>(r: &mut R) -> io::Result<Option<Frame>> {
47 let mut len_buf = Vec::new();
48 let mut byte = [0u8; 1];
49 loop {
50 if r.read(&mut byte)? == 0 {
51 if len_buf.is_empty() {
52 return Ok(None);
53 }
54 return Err(io::Error::new(
55 io::ErrorKind::UnexpectedEof,
56 "eof in frame length",
57 ));
58 }
59 if byte[0] == b'\n' {
60 break;
61 }
62 len_buf.push(byte[0]);
63 }
64 let len: usize = std::str::from_utf8(&len_buf)
65 .ok()
66 .and_then(|s| s.trim().parse().ok())
67 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "bad frame length"))?;
68 let mut message = vec![0u8; len];
69 r.read_exact(&mut message)?;
70 let nl = message.iter().position(|&b| b == b'\n').ok_or_else(|| {
71 io::Error::new(
72 io::ErrorKind::InvalidData,
73 "frame missing channel separator",
74 )
75 })?;
76 let channel = String::from_utf8_lossy(&message[..nl]).into_owned();
77 let payload = message[nl + 1..].to_vec();
78 Ok(Some(Frame { channel, payload }))
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 use std::io::Cursor;
85
86 #[test]
87 fn encodes_protocol_doc_example() {
88 let mut out = Vec::new();
89 write_frame(&mut out, &Frame::new("a5", b"abc".to_vec())).unwrap();
90 assert_eq!(out, b"6\na5\nabc");
91 }
92
93 #[test]
94 fn control_frame_has_empty_channel() {
95 let mut out = Vec::new();
96 write_frame(&mut out, &Frame::new("", b"{}".to_vec())).unwrap();
97 assert_eq!(out, b"3\n\n{}");
99 }
100
101 #[test]
102 fn round_trips() {
103 let frames = [
104 Frame::new("", b"{\"command\":\"init\"}".to_vec()),
105 Frame::new("ch1", b"payload bytes".to_vec()),
106 ];
107 let mut buf = Vec::new();
108 for f in &frames {
109 write_frame(&mut buf, f).unwrap();
110 }
111 let mut cur = Cursor::new(buf);
112 for f in &frames {
113 assert_eq!(read_frame(&mut cur).unwrap().unwrap(), *f);
114 }
115 assert_eq!(read_frame(&mut cur).unwrap(), None); }
117
118 #[test]
119 fn rejects_missing_channel_separator() {
120 let mut cur = Cursor::new(b"3\nabc".to_vec());
122 assert!(read_frame(&mut cur).is_err());
123 }
124}