1use super::*;
2use super::schema::*;
3use super::rpc::*;
4
5use std::io::prelude::*;
6use std::net::*;
7use std::collections::BTreeMap;
8
9use protobuf::*;
10
11static STREAM_HELLO_MESSAGE : [u8; 12] = [0x48, 0x45, 0x4C, 0x4C, 0x4F, 0x2D, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4D];
12static STREAM_OK : [u8; 2] = [0x4F, 0x4B];
13pub struct Stream {
14 socket : TcpStream,
15 rpc : Rpc
16}
17impl Stream {
18 pub fn new<T : ToSocketAddrs>(address : T, rpc : Rpc) -> Result<Self, ConnectionErr> {
19 let mut socket : TcpStream = unwrap_ret!(TcpStream::connect(address), ConnectionErr::TcpConnectionFailure);
20 unwrap_ret!(socket.write_all(&STREAM_HELLO_MESSAGE), ConnectionErr::HelloMessageFailed);
21 unwrap_ret!(socket.write_all(&rpc.id), ConnectionErr::IdMessageFailed);
22 let ref mut buf = [0; 2];
23 unwrap_ret!(socket.read(buf), ConnectionErr::OkMessageFailed);
24 if buf[..] != STREAM_OK[..] { return Err(ConnectionErr::OkMessageWrong) };
25 Ok(Stream {
26 socket : socket,
27 rpc : rpc
28 })
29 }
30 pub fn add_stream(&mut self, service : String, procedure : String, args : Vec<Vec<u8>>) -> Result<u32, TransceiverError> {
31 let mut buf = vec!();
32 let mut request : Request = make_request!(service, procedure, args);
33 request.write_to_vec(&mut buf)?;
34 let mut v = vec!();
35 v.push(buf);
36 let response =
37 if let Some(i) = self.rpc.invoke("KRPC".to_owned(), "AddStream".to_owned(), v)? { i }
38 else { return Err(TransceiverError::ResponseHasError("No id".to_owned())) };
39 let id = CodedInputStream::from_bytes(&response).read_uint32()?;
40 Ok(id)
41 }
42 pub fn remove_stream(&mut self, id : u32) -> Result<(), TransceiverError> {
43 let mut v = vec!();
44 CodedOutputStream::vec(&mut v).write_uint32_no_tag(id)?;
45 self.rpc.invoke("KRPC".to_owned(), "RemoveStream".to_owned(), vec!())?;
46 Ok(())
47 }
48 pub fn receive(&mut self) -> Result<BTreeMap<u32, Option<Vec<u8>>>, TransceiverError> {
49 let mut len = 0;
50 {
51 let ref mut buf = [0];
52 let mut i = 0;
53 loop {
54 self.socket.read_exact(buf)?;
55 len += ((buf[0] & 0b01111111) as usize) << i;
56 if buf[0] & 0b10000000 == 0 { break }
57 i += 7;
58 }
59 }
60 let mut buffer = vec!(0; len);
61 self.socket.read_exact(&mut buffer)?;
62 let mut message = parse_from_bytes::<StreamMessage>(&buffer)?;
63 let mut ret = BTreeMap::new();
64 for i in message.responses.iter_mut() {
65 let response = match i.response.take() {
66 Some(i) => unwrap_response!(i)?,
67 _ => continue
68 };
69 ret.insert(i.id, response);
70 }
71 Ok(ret)
72 }
73}