queen_core/wire_protocol/
message.rs1use std::mem;
2use std::io::{Read, Write};
3
4use byteorder::{ReadBytesExt, WriteBytesExt};
5
6use super::header::Header;
7use super::header::OpCode;
8use super::error::Result;
9
10#[derive(Debug)]
11pub struct Message {
12 header: Header,
13 target: String,
14 origin: String,
15 content_type: String,
16 body: Vec<u8>
17}
18
19impl Message {
20 pub fn new(request_id: i32, response_to: i32, op_code: OpCode, target: String, origin: String, content_type: String, body: Vec<u8>) -> Result<Message> {
21 let header_length = mem::size_of::<Header>();
22 let target_length = target.len() + 1;
23 let origin_length = origin.len() + 1;
24 let content_type_length = content_type.len() + 1;
25 let body_length = body.len();
26
27 let total_length = header_length + target_length + origin_length + content_type_length + body_length;
28
29 let header = Header::new(total_length as i32, request_id, response_to, op_code);
30
31 Ok(Message {
32 header: header,
33 target: target,
34 origin: origin,
35 content_type: content_type,
36 body: body
37 })
38 }
39
40 pub fn get_opcode(&self) -> &OpCode {
41 &self.header.op_code
42 }
43
44 pub fn len(&self) -> usize {
45 self.header.message_length as usize
46 }
47
48 pub fn write<W: Write>(&self, buffer: &mut W) -> Result<()> {
49 self.header.write(buffer)?;
50
51 write_cstring(buffer, &self.target)?;
52 write_cstring(buffer, &self.origin)?;
53 write_cstring(buffer, &self.content_type)?;
54
55 buffer.write(&self.body)?;
56
57 Ok(())
58 }
59
60 pub fn read<R: Read>(buffer: &mut R) -> Result<Option<Message>> {
61 let header = Header::read(buffer)?;
62
63 let mut length = header.message_length as usize - mem::size_of::<Header>();
64
65 let target = read_cstring(buffer)?;
66 length -= target.len() + 1;
67
68 let origin = read_cstring(buffer)?;
69 length -= origin.len() + 1;
70
71 let content_type = read_cstring(buffer)?;
72 length -= content_type.len() + 1;
73
74 let mut body = vec![0u8; length];
75
76 let read_size = buffer.read(&mut body)?;
77
78 if read_size < length {
79 return Ok(None)
80 }
81
82 Ok(Some(Message {
83 header: header,
84 target: target,
85 origin: origin,
86 content_type: content_type,
87 body: body
88 }))
89 }
90}
91
92
93fn write_cstring<W>(writer: &mut W, s: &str) -> Result<()>
94 where W: Write + ?Sized
95{
96 writer.write_all(s.as_bytes())?;
97 writer.write_u8(0)?;
98 Ok(())
99}
100
101fn read_cstring<R: Read + ?Sized>(reader: &mut R) -> Result<String> {
102 let mut v = Vec::new();
103
104 loop {
105 let c = reader.read_u8()?;
106 if c == 0 {
107 break;
108 }
109 v.push(c);
110 }
111
112 Ok(String::from_utf8(v)?)
113}