1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
extern crate nix;
extern crate bincode;
extern crate rustc_serialize;
use std::os::unix::io::RawFd;
use self::nix::unistd::ForkResult;
use self::nix::sys::socket;
use std::process::exit;
use bincode::SizeLimit;
pub struct Chan<T> where T: rustc_serialize::Decodable {
start: (RawFd,RawFd),
data: (RawFd,RawFd),
end: (RawFd,RawFd),
phantom: std::marker::PhantomData<T>,
}
impl<T> Iterator for Chan<T> where T: rustc_serialize::Decodable {
type Item = Result<T, nix::Error>;
fn next(&mut self) -> Option<Result<T, nix::Error>> {
let r = self.recv();
match r {
Err(e) => Some(Err(e)),
Ok(Some(v)) => Some(Ok(v)),
Ok(None) => None,
}
}
}
impl<T> Chan<T> where T: rustc_serialize::Decodable {
pub fn set_timeout(&self, dur: Option<std::time::Duration>) -> Result<(), nix::Error> {
let tv = nix::sys::time::TimeVal {
tv_sec: match dur { Some(e) => e.as_secs() as i64, None => 0 },
tv_usec: match dur { Some(e) => {
if e.subsec_nanos() < 1000 {
1 as i64
} else {
(e.subsec_nanos() / 1000) as i64
}
}, None => 0},
};
socket::setsockopt(self.start.0, socket::sockopt::ReceiveTimeout, &tv)?;
socket::setsockopt(self.start.1, socket::sockopt::ReceiveTimeout, &tv)?;
socket::setsockopt(self.data.0, socket::sockopt::ReceiveTimeout, &tv)?;
socket::setsockopt(self.data.1, socket::sockopt::ReceiveTimeout, &tv)?;
socket::setsockopt(self.end.0, socket::sockopt::ReceiveTimeout, &tv)?;
socket::setsockopt(self.end.1, socket::sockopt::ReceiveTimeout, &tv)?;
return Ok(())
}
pub fn send(&self, t:&T) -> Result<(), nix::Error> where T: rustc_serialize::Encodable {
let encoded: Vec<u8> = bincode::rustc_serialize::encode(t, SizeLimit::Bounded(8000)).unwrap();
self._write(&encoded[..])
}
fn _write(&self, b: &[u8]) -> Result<(), nix::Error>{
let mut void = [0;1];
socket::recv(self.start.0, &mut void, socket::MSG_EOR)?;
socket::send(self.data.1, b, socket::MSG_EOR)?;
socket::recv(self.end.0, &mut void, socket::MSG_EOR)?;
return Ok(());
}
pub fn recv(&self) -> Result<Option<T>, nix::Error> where T: rustc_serialize::Decodable {
let mut buf = [0;8000];
socket::send(self.start.1, &[0;1] , socket::MSG_EOR )?;
let u = socket::recv(self.data.0, &mut buf, socket::MSG_EOR )?;
if u < 1 {
return Ok(None);
}
socket::send(self.end.1, &[0;1] , socket::MSG_EOR )?;
let decoded: T = bincode::rustc_serialize::decode(&buf[..]).unwrap();
Ok(Some(decoded))
}
pub fn close(&self) -> Result<(), nix::Error> {
socket::send(self.start.1, &[0;0], socket::MSG_EOR)?;
socket::send(self.data.1, &[0;0], socket::MSG_EOR)?;
socket::send(self.end.1, &[0;0], socket::MSG_EOR)?;
return Ok(())
}
}
pub fn channel<T>() -> Result<Chan<T>, nix::Error> where T: rustc_serialize::Decodable {
let start = socket::socketpair(socket::AddressFamily::Unix, socket::SockType::Datagram, 0, socket::SockFlag::empty());
let data = socket::socketpair(socket::AddressFamily::Unix, socket::SockType::Datagram, 0, socket::SockFlag::empty());
let end = socket::socketpair(socket::AddressFamily::Unix, socket::SockType::Datagram, 0, socket::SockFlag::empty());
Ok(Chan {start: start?, data: data?, end: end? , phantom: std::marker::PhantomData})
}
pub struct Proc {
}
pub fn fork<F>(start: F) -> Proc where F: Fn(){
if let ForkResult::Child = nix::unistd::fork().expect("fork failed") {
start();
exit_must_not_continue();
};
Proc{}
}
#[allow(unreachable_code)]
fn exit_must_not_continue() ->!{
exit(0);
loop{}
}