krpc/
stream.rs

1use super::*;
2use super::schema::*;
3use super::rpc::*;
4
5use std::io::prelude::*;
6use std::net::*;
7use std::collections::BTreeMap;
8
9use protobuf::*;
10
11static STREAM_HELLO_MESSAGE : [u8; 12] = [0x48, 0x45, 0x4C, 0x4C, 0x4F, 0x2D, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4D];
12static STREAM_OK : [u8; 2] = [0x4F, 0x4B];
13pub struct Stream {
14    socket : TcpStream,
15    rpc : Rpc
16}
17impl Stream {
18    pub fn new<T : ToSocketAddrs>(address : T, rpc : Rpc) -> Result<Self, ConnectionErr> {
19        let mut socket : TcpStream = unwrap_ret!(TcpStream::connect(address), ConnectionErr::TcpConnectionFailure);
20        unwrap_ret!(socket.write_all(&STREAM_HELLO_MESSAGE), ConnectionErr::HelloMessageFailed);
21        unwrap_ret!(socket.write_all(&rpc.id), ConnectionErr::IdMessageFailed);
22        let ref mut buf = [0; 2];
23        unwrap_ret!(socket.read(buf), ConnectionErr::OkMessageFailed);
24        if buf[..] != STREAM_OK[..] { return Err(ConnectionErr::OkMessageWrong) };
25        Ok(Stream {
26            socket : socket,
27            rpc : rpc
28        })
29    }
30    pub fn add_stream(&mut self, service : String, procedure : String, args : Vec<Vec<u8>>) -> Result<u32, TransceiverError> {
31        let mut buf = vec!();
32        let mut request : Request = make_request!(service, procedure, args);
33        request.write_to_vec(&mut buf)?;
34        let mut v = vec!();
35        v.push(buf);
36        let response =
37            if let Some(i) = self.rpc.invoke("KRPC".to_owned(), "AddStream".to_owned(), v)? { i }
38            else { return Err(TransceiverError::ResponseHasError("No id".to_owned())) };
39        let id = CodedInputStream::from_bytes(&response).read_uint32()?;
40        Ok(id)
41    }
42    pub fn remove_stream(&mut self, id : u32) -> Result<(), TransceiverError> {
43        let mut v = vec!();
44        CodedOutputStream::vec(&mut v).write_uint32_no_tag(id)?;
45        self.rpc.invoke("KRPC".to_owned(), "RemoveStream".to_owned(), vec!())?;
46        Ok(())
47    }
48    pub fn receive(&mut self) -> Result<BTreeMap<u32, Option<Vec<u8>>>, TransceiverError> {
49        let mut len = 0;
50        {
51            let ref mut buf = [0];
52            let mut i = 0;
53            loop {
54                self.socket.read_exact(buf)?;
55                len += ((buf[0] & 0b01111111) as usize) << i;
56                if buf[0] & 0b10000000 == 0 { break }
57                i += 7;
58            }
59        }
60        let mut buffer = vec!(0; len);
61        self.socket.read_exact(&mut buffer)?;
62        let mut message = parse_from_bytes::<StreamMessage>(&buffer)?;
63        let mut ret = BTreeMap::new();
64        for i in message.responses.iter_mut() {
65            let response = match i.response.take() {
66                Some(i) => unwrap_response!(i)?,
67                _ => continue
68            };
69            ret.insert(i.id, response);
70        }
71        Ok(ret)
72    }
73}