queen_core/wire_protocol/
message.rs

1use std::mem;
2use std::io::{Read, Write};
3
4use byteorder::{ReadBytesExt, WriteBytesExt};
5
6use super::header::Header;
7use super::header::OpCode;
8use super::error::Result;
9
10#[derive(Debug)]
11pub struct Message {
12    header: Header,
13    target: String,
14    origin: String,
15    content_type: String,
16    body: Vec<u8>
17}
18
19impl Message {
20    pub fn new(request_id: i32, response_to: i32, op_code: OpCode, target: String, origin: String, content_type: String, body: Vec<u8>) -> Result<Message> {
21        let header_length = mem::size_of::<Header>();
22        let target_length = target.len() + 1;
23        let origin_length = origin.len() + 1;
24        let content_type_length = content_type.len() + 1;
25        let body_length = body.len();
26
27        let total_length = header_length + target_length + origin_length + content_type_length + body_length;
28
29        let header = Header::new(total_length as i32, request_id, response_to, op_code);
30
31        Ok(Message {
32            header: header,
33            target: target,
34            origin: origin,
35            content_type: content_type,
36            body: body
37        })
38    }
39
40    pub fn get_opcode(&self) -> &OpCode {
41        &self.header.op_code
42    }
43
44    pub fn len(&self) -> usize {
45        self.header.message_length as usize
46    }
47
48    pub fn write<W: Write>(&self, buffer: &mut W) -> Result<()> {
49        self.header.write(buffer)?;
50
51        write_cstring(buffer, &self.target)?;
52        write_cstring(buffer, &self.origin)?;
53        write_cstring(buffer, &self.content_type)?;
54
55        buffer.write(&self.body)?;
56
57        Ok(())
58    }
59
60    pub fn read<R: Read>(buffer: &mut R) -> Result<Option<Message>> {
61        let header = Header::read(buffer)?;
62
63        let mut length = header.message_length as usize - mem::size_of::<Header>();
64
65        let target = read_cstring(buffer)?;
66        length -= target.len() + 1;
67
68        let origin = read_cstring(buffer)?;
69        length -= origin.len() + 1;
70
71        let content_type = read_cstring(buffer)?;
72        length -= content_type.len() + 1;
73
74        let mut body = vec![0u8; length];
75
76        let read_size = buffer.read(&mut body)?;
77
78        if read_size < length {
79            return Ok(None)
80        }
81
82        Ok(Some(Message {
83            header: header,
84            target: target,
85            origin: origin,
86            content_type: content_type,
87            body: body
88        }))
89    }
90}
91
92
93fn write_cstring<W>(writer: &mut W, s: &str) -> Result<()>
94    where W: Write + ?Sized
95{
96    writer.write_all(s.as_bytes())?;
97    writer.write_u8(0)?;
98    Ok(())
99}
100
101fn read_cstring<R: Read + ?Sized>(reader: &mut R) -> Result<String> {
102    let mut v = Vec::new();
103
104    loop {
105        let c = reader.read_u8()?;
106        if c == 0 {
107            break;
108        }
109        v.push(c);
110    }
111
112    Ok(String::from_utf8(v)?)
113}