1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use std::ops::Deref;
use std::rc::Rc;
use std::io::{Result, Read, Write, ErrorKind};
use byteorder::{ BigEndian, ByteOrder };
use mio::Evented;
use core::Message;
use io_error::*;
pub trait AsyncPipeStub : Sender + Receiver + Handshake + Deref<Target=Evented> {
#[cfg(windows)]
fn read_and_write_void(&mut self);
#[cfg(windows)]
fn registered(&mut self) {}
}
pub trait Sender {
fn start_send(&mut self, msg: Rc<Message>) -> Result<bool>;
fn resume_send(&mut self) -> Result<bool>;
fn has_pending_send(&self) -> bool;
}
pub trait Receiver {
fn start_recv(&mut self) -> Result<Option<Message>>;
fn resume_recv(&mut self) -> Result<Option<Message>>;
fn has_pending_recv(&self) -> bool;
}
pub trait Handshake {
fn send_handshake(&mut self, pids: (u16, u16)) -> Result<()>;
fn recv_handshake(&mut self, pids: (u16, u16)) -> Result<()>;
}
pub fn send_and_check_handshake<T:Write>(stream: &mut T, pids: (u16, u16)) -> Result<()> {
let (proto_id, _) = pids;
let handshake = create_handshake(proto_id);
match try!(stream.write(&handshake)) {
8 => Ok(()),
_ => Err(would_block_io_error("failed to send handshake"))
}
}
fn create_handshake(protocol_id: u16) -> [u8; 8] {
let mut handshake = [0, 83, 80, 0, 0, 0, 0, 0];
BigEndian::write_u16(&mut handshake[4..6], protocol_id);
handshake
}
pub fn recv_and_check_handshake<T:Read>(stream: &mut T, pids: (u16, u16)) -> Result<()> {
let mut handshake = [0u8; 8];
stream.read(&mut handshake).and_then(|_| check_handshake(pids, &handshake))
}
fn check_handshake(pids: (u16, u16), handshake: &[u8; 8]) -> Result<()> {
let (_, proto_id) = pids;
let expected_handshake = create_handshake(proto_id);
if handshake == &expected_handshake {
Ok(())
} else {
Err(invalid_data_io_error("received bad handshake"))
}
}
pub trait WriteBuffer {
fn write_buffer(&mut self, buffer: &[u8], written: &mut usize) -> Result<bool>;
}
impl<T:Write> WriteBuffer for T {
fn write_buffer(&mut self, buf: &[u8], written: &mut usize) -> Result<bool> {
match self.write(&buf[*written..]) {
Ok(x) => {
*written += x;
Ok(*written == buf.len())
},
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
Ok(false)
} else {
Err(e)
}
}
}
}
}
pub trait ReadBuffer {
fn read_buffer(&mut self, buffer: &mut [u8]) -> Result<usize>;
}
impl<T:Read> ReadBuffer for T {
fn read_buffer(&mut self, buf: &mut [u8]) -> Result<usize> {
match self.read(buf) {
Ok(x) => {
Ok(x)
},
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
Ok(0)
} else {
Err(e)
}
}
}
}
}