1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
extern crate nix;
extern crate bincode;
extern crate rustc_serialize;

use std::os::unix::io::RawFd;
use self::nix::unistd::ForkResult;
use self::nix::sys::socket;
use std::process::exit;

use bincode::SizeLimit;

pub struct Chan<T> where T: rustc_serialize::Decodable {
    start: (RawFd,RawFd),
    data:  (RawFd,RawFd),
    end:   (RawFd,RawFd),
    phantom: std::marker::PhantomData<T>,
}

impl<T> Iterator for Chan<T> where T: rustc_serialize::Decodable {
    type Item = Result<T, nix::Error>;

    fn next(&mut self) -> Option<Result<T, nix::Error>> {
        let r = self.recv();
        match r {
            Err(e)      => Some(Err(e)),
            Ok(Some(v)) => Some(Ok(v)),
            Ok(None)    => None,
        }
    }
}

impl<T> Chan<T> where T: rustc_serialize::Decodable {

    pub fn set_timeout(&self, dur: Option<std::time::Duration>) -> Result<(), nix::Error> {
        let tv = nix::sys::time::TimeVal {
            tv_sec:   match dur { Some(e) => e.as_secs() as i64, None => 0 },
            tv_usec:  match dur { Some(e) => {
                if e.subsec_nanos() < 1000 {
                    1 as i64
                } else {
                    (e.subsec_nanos() / 1000) as i64
                }
            }, None => 0},
        };

        socket::setsockopt(self.start.0, socket::sockopt::ReceiveTimeout, &tv)?;
        socket::setsockopt(self.start.1, socket::sockopt::ReceiveTimeout, &tv)?;
        socket::setsockopt(self.data.0,  socket::sockopt::ReceiveTimeout, &tv)?;
        socket::setsockopt(self.data.1,  socket::sockopt::ReceiveTimeout, &tv)?;
        socket::setsockopt(self.end.0,   socket::sockopt::ReceiveTimeout, &tv)?;
        socket::setsockopt(self.end.1,   socket::sockopt::ReceiveTimeout, &tv)?;

        return Ok(())
    }

    pub fn send(&self, t:&T) -> Result<(), nix::Error>  where T: rustc_serialize::Encodable {
        let encoded: Vec<u8> = bincode::rustc_serialize::encode(t, SizeLimit::Bounded(8000)).unwrap();
        self._write(&encoded[..])
    }

    fn _write(&self, b: &[u8]) -> Result<(), nix::Error>{
        let mut void = [0;1];

        socket::recv(self.start.0, &mut void, socket::MSG_EOR)?;
        socket::send(self.data.1, b, socket::MSG_EOR)?;
        socket::recv(self.end.0, &mut void,    socket::MSG_EOR)?;

        return Ok(());
    }

    pub fn recv(&self) -> Result<Option<T>, nix::Error> where T: rustc_serialize::Decodable {
        let mut buf = [0;8000];

        socket::send(self.start.1, &[0;1]  , socket::MSG_EOR )?;
        let u = socket::recv(self.data.0, &mut buf, socket::MSG_EOR )?;
        if  u < 1 {
            return Ok(None);
        }
        socket::send(self.end.1, &[0;1]  , socket::MSG_EOR )?;

        let decoded: T = bincode::rustc_serialize::decode(&buf[..]).unwrap();
        Ok(Some(decoded))
    }

    pub fn close(&self) -> Result<(), nix::Error> {
        //TODO: this might not be the correct semantics. double check the theory behind close
        socket::send(self.start.1, &[0;0], socket::MSG_EOR)?;
        socket::send(self.data.1,  &[0;0], socket::MSG_EOR)?;
        socket::send(self.end.1,   &[0;0], socket::MSG_EOR)?;
        return Ok(())
    }
}

pub fn channel<T>() -> Result<Chan<T>, nix::Error>  where T: rustc_serialize::Decodable {
    let start = socket::socketpair(socket::AddressFamily::Unix, socket::SockType::Datagram, 0, socket::SockFlag::empty());
    let data  = socket::socketpair(socket::AddressFamily::Unix, socket::SockType::Datagram, 0, socket::SockFlag::empty());
    let end   = socket::socketpair(socket::AddressFamily::Unix, socket::SockType::Datagram, 0, socket::SockFlag::empty());
    Ok(Chan {start: start?, data: data?, end: end? , phantom: std::marker::PhantomData})
}

pub struct Proc {
}


pub fn fork<F>(start: F) -> Proc where F: Fn(){
    if let ForkResult::Child = nix::unistd::fork().expect("fork failed") {
        start();
        exit_must_not_continue();
    };
    Proc{}
}

#[allow(unreachable_code)]
fn exit_must_not_continue() ->!{
    exit(0);
    loop{}
}