1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright (c) 2015-2017 Contributors as noted in the AUTHORS file.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// This file may not be copied, modified, or distributed except according to those terms.

use std::ops::Deref;
use std::rc::Rc;
use std::io::{Result, Read, Write, ErrorKind};

use byteorder::{ BigEndian, ByteOrder };

use mio::Evented;

use core::Message;
use io_error::*;

pub trait AsyncPipeStub : Sender + Receiver + Handshake + Deref<Target=Evented> {
    #[cfg(windows)]
    fn read_and_write_void(&mut self);
    #[cfg(windows)]
    fn registered(&mut self) {}
}

pub trait Sender {
    fn start_send(&mut self, msg: Rc<Message>) -> Result<bool>;
    fn resume_send(&mut self) -> Result<bool>;
    fn has_pending_send(&self) -> bool;
}

pub trait Receiver {
    fn start_recv(&mut self) -> Result<Option<Message>>;
    fn resume_recv(&mut self) -> Result<Option<Message>>;
    fn has_pending_recv(&self) -> bool;
}

pub trait Handshake {
    fn send_handshake(&mut self, pids: (u16, u16)) -> Result<()>;
    fn recv_handshake(&mut self, pids: (u16, u16)) -> Result<()>;
}

pub fn send_and_check_handshake<T:Write>(stream: &mut T, pids: (u16, u16)) -> Result<()> {
    let (proto_id, _) = pids;
    let handshake = create_handshake(proto_id);

    match try!(stream.write(&handshake)) {
        8 => Ok(()),
        _ => Err(would_block_io_error("failed to send handshake"))
    }
}

fn create_handshake(protocol_id: u16) -> [u8; 8] {
    // handshake is Zero, 'S', 'P', Version, Proto[2], Rsvd[2]
    let mut handshake = [0, 83, 80, 0, 0, 0, 0, 0];
    BigEndian::write_u16(&mut handshake[4..6], protocol_id);
    handshake
}

pub fn recv_and_check_handshake<T:Read>(stream: &mut T, pids: (u16, u16)) -> Result<()> {
    let mut handshake = [0u8; 8];

    stream.read(&mut handshake).and_then(|_| check_handshake(pids, &handshake))
}

fn check_handshake(pids: (u16, u16), handshake: &[u8; 8]) -> Result<()> {
    let (_, proto_id) = pids;
    let expected_handshake = create_handshake(proto_id);

    if handshake == &expected_handshake {
        Ok(())
    } else {
        Err(invalid_data_io_error("received bad handshake"))
    }
}

pub trait WriteBuffer {
    fn write_buffer(&mut self, buffer: &[u8], written: &mut usize) -> Result<bool>;
}

impl<T:Write> WriteBuffer for T {
    fn write_buffer(&mut self, buf: &[u8], written: &mut usize) -> Result<bool> {
        match self.write(&buf[*written..]) {
            Ok(x) => {
                *written += x;

                Ok(*written == buf.len())
            },
            Err(e) => {
                if e.kind() == ErrorKind::WouldBlock {
                    Ok(false)
                } else {
                    Err(e)
                }
            }
        }
    }
}


pub trait ReadBuffer {
    fn read_buffer(&mut self, buffer: &mut [u8]) -> Result<usize>;
}

impl<T:Read> ReadBuffer for T {
    fn read_buffer(&mut self, buf: &mut [u8]) -> Result<usize> {
        match self.read(buf) {
            Ok(x) => {
                Ok(x)
            },
            Err(e) => {
                if e.kind() == ErrorKind::WouldBlock {
                    Ok(0)
                } else {
                    Err(e)
                }
            }
        }
    }
}