use {HandleBase, Handle, HandleRef, INVALID_HANDLE, Peered, Status, Time};
use {sys, handle_drop, into_result, size_to_u32_sat};
use conv::{ValueInto};
use std::mem;
#[derive(Debug, Eq, PartialEq)]
pub struct Channel(Handle);
impl HandleBase for Channel {
fn get_ref(&self) -> HandleRef {
self.0.get_ref()
}
fn from_handle(handle: Handle) -> Self {
Channel(handle)
}
}
impl Peered for Channel {
}
impl Channel {
pub fn create(opts: ChannelOpts) -> Result<(Channel, Channel), Status> {
unsafe {
let mut handle0 = 0;
let mut handle1 = 0;
let status = sys::mx_channel_create(opts as u32, &mut handle0, &mut handle1);
into_result(status, ||
(Self::from_handle(Handle(handle0)),
Self::from_handle(Handle(handle1))))
}
}
pub fn read_raw(&self, opts: u32, buf: &mut MessageBuf)
-> Result<Result<(), Status>, (usize, usize)>
{
unsafe {
buf.reset_handles();
let raw_handle = self.raw_handle();
let mut num_bytes: u32 = size_to_u32_sat(buf.bytes.capacity());
let mut num_handles: u32 = size_to_u32_sat(buf.handles.capacity());
let status = sys::mx_channel_read(raw_handle, opts,
buf.bytes.as_mut_ptr(), buf.handles.as_mut_ptr(),
num_bytes, num_handles, &mut num_bytes, &mut num_handles);
if status == sys::MX_ERR_BUFFER_TOO_SMALL {
Err((num_bytes as usize, num_handles as usize))
} else {
Ok(into_result(status, || {
buf.bytes.set_len(num_bytes as usize);
buf.handles.set_len(num_handles as usize);
}))
}
}
}
pub fn read(&self, opts: u32, buf: &mut MessageBuf) -> Result<(), Status> {
loop {
match self.read_raw(opts, buf) {
Ok(result) => return result,
Err((num_bytes, num_handles)) => {
buf.ensure_capacity_bytes(num_bytes);
buf.ensure_capacity_handles(num_handles);
}
}
}
}
pub fn write(&self, bytes: &[u8], handles: &mut Vec<Handle>, opts: u32)
-> Result<(), Status>
{
let n_bytes = try!(bytes.len().value_into().map_err(|_| Status::ErrOutOfRange));
let n_handles = try!(handles.len().value_into().map_err(|_| Status::ErrOutOfRange));
unsafe {
let status = sys::mx_channel_write(self.raw_handle(), opts, bytes.as_ptr(), n_bytes,
handles.as_ptr() as *const sys::mx_handle_t, n_handles);
into_result(status, || {
handles.set_len(0);
})
}
}
pub fn call(&self, options: u32, timeout: Time, bytes: &[u8], handles: &mut Vec<Handle>,
buf: &mut MessageBuf) -> Result<(), (Status, Status)>
{
let write_num_bytes = try!(bytes.len().value_into().map_err(
|_| (Status::ErrOutOfRange, Status::NoError)));
let write_num_handles = try!(handles.len().value_into().map_err(
|_| (Status::ErrOutOfRange, Status::NoError)));
buf.reset_handles();
let read_num_bytes: u32 = size_to_u32_sat(buf.bytes.capacity());
let read_num_handles: u32 = size_to_u32_sat(buf.handles.capacity());
let args = sys::mx_channel_call_args_t {
wr_bytes: bytes.as_ptr(),
wr_handles: handles.as_ptr() as *const sys::mx_handle_t,
rd_bytes: buf.bytes.as_mut_ptr(),
rd_handles: buf.handles.as_mut_ptr(),
wr_num_bytes: write_num_bytes,
wr_num_handles: write_num_handles,
rd_num_bytes: read_num_bytes,
rd_num_handles: read_num_handles,
};
let mut actual_read_bytes: u32 = 0;
let mut actual_read_handles: u32 = 0;
let mut read_status = sys::MX_OK;
let status = unsafe {
sys::mx_channel_call(self.raw_handle(), options, timeout, &args, &mut actual_read_bytes,
&mut actual_read_handles, &mut read_status)
};
if status == sys::MX_OK || status == sys::MX_ERR_TIMED_OUT || status == sys::MX_ERR_CALL_FAILED
{
unsafe { handles.set_len(0); }
}
unsafe {
buf.bytes.set_len(actual_read_bytes as usize);
buf.handles.set_len(actual_read_handles as usize);
}
if status == sys::MX_OK {
Ok(())
} else {
Err((Status::from_raw(status), Status::from_raw(read_status)))
}
}
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ChannelOpts {
Normal = 0,
}
impl Default for ChannelOpts {
fn default() -> Self {
ChannelOpts::Normal
}
}
#[derive(Default)]
#[derive(Debug)]
pub struct MessageBuf {
bytes: Vec<u8>,
handles: Vec<sys::mx_handle_t>,
}
impl MessageBuf {
pub fn new() -> Self {
Default::default()
}
pub fn ensure_capacity_bytes(&mut self, n_bytes: usize) {
ensure_capacity(&mut self.bytes, n_bytes);
}
pub fn ensure_capacity_handles(&mut self, n_handles: usize) {
ensure_capacity(&mut self.handles, n_handles);
}
pub fn bytes(&self) -> &[u8] {
self.bytes.as_slice()
}
pub fn n_handles(&self) -> usize {
self.handles.len()
}
pub fn take_handle(&mut self, index: usize) -> Option<Handle> {
self.handles.get_mut(index).and_then(|handleref|
if *handleref == INVALID_HANDLE {
None
} else {
Some(Handle(mem::replace(handleref, INVALID_HANDLE)))
}
)
}
fn drop_handles(&mut self) {
for &handle in &self.handles {
if handle != 0 {
handle_drop(handle);
}
}
}
fn reset_handles(&mut self) {
self.drop_handles();
self.handles.clear();
}
}
impl Drop for MessageBuf {
fn drop(&mut self) {
self.drop_handles();
}
}
fn ensure_capacity<T>(vec: &mut Vec<T>, size: usize) {
let len = vec.len();
if size > len {
vec.reserve(size - len);
}
}
#[cfg(test)]
mod tests {
use super::*;
use {Duration, MX_CHANNEL_READABLE, MX_CHANNEL_WRITABLE, MX_RIGHT_SAME_RIGHTS, MX_SIGNAL_LAST_HANDLE, Vmo, VmoOpts};
use deadline_after;
use std::thread;
#[test]
fn channel_basic() {
let (p1, p2) = Channel::create(ChannelOpts::Normal).unwrap();
let mut empty = vec![];
assert!(p1.write(b"hello", &mut empty, 0).is_ok());
let mut buf = MessageBuf::new();
assert!(p2.read(0, &mut buf).is_ok());
assert_eq!(buf.bytes(), b"hello");
}
#[test]
fn channel_read_raw_too_small() {
let (p1, p2) = Channel::create(ChannelOpts::Normal).unwrap();
let mut empty = vec![];
assert!(p1.write(b"hello", &mut empty, 0).is_ok());
let mut buf = MessageBuf::new();
let result = p2.read_raw(0, &mut buf);
assert_eq!(result, Err((5, 0)));
assert_eq!(buf.bytes(), b"");
}
#[test]
fn channel_send_handle() {
let hello_length: usize = 5;
let (p1, p2) = Channel::create(ChannelOpts::Normal).unwrap();
let vmo = Vmo::create(hello_length as u64, VmoOpts::Default).unwrap();
let duplicate_vmo_handle = vmo.duplicate(MX_RIGHT_SAME_RIGHTS).unwrap().into_handle();
let mut handles_to_send: Vec<Handle> = vec![duplicate_vmo_handle];
assert!(p1.write(b"", &mut handles_to_send, 0).is_ok());
assert!(handles_to_send.is_empty());
let mut buf = MessageBuf::new();
assert!(p2.read(0, &mut buf).is_ok());
assert_eq!(buf.n_handles(), 1);
let received_handle = buf.take_handle(0).unwrap();
assert_eq!(buf.n_handles(), 1);
assert!(buf.take_handle(0).is_none());
let received_vmo = Vmo::from_handle(received_handle);
assert_eq!(received_vmo.write(b"hello", 0).unwrap(), hello_length);
let mut read_vec = vec![0; hello_length];
assert_eq!(vmo.read(&mut read_vec, 0).unwrap(), hello_length);
assert_eq!(read_vec, b"hello");
}
#[test]
fn channel_call_timeout() {
let ten_ms: Duration = 10_000_000;
let (p1, p2) = Channel::create(ChannelOpts::Normal).unwrap();
let vmo = Vmo::create(0 as u64, VmoOpts::Default).unwrap();
let duplicate_vmo_handle = vmo.duplicate(MX_RIGHT_SAME_RIGHTS).unwrap().into_handle();
let mut handles_to_send: Vec<Handle> = vec![duplicate_vmo_handle];
let mut buf = MessageBuf::new();
assert_eq!(p1.call(0, deadline_after(ten_ms), b"call", &mut handles_to_send, &mut buf),
Err((Status::ErrTimedOut, Status::NoError)));
assert!(handles_to_send.is_empty());
let mut buf = MessageBuf::new();
assert!(p2.read(0, &mut buf).is_ok());
assert_eq!(buf.bytes(), b"call");
assert_eq!(buf.n_handles(), 1);
}
#[test]
fn channel_call() {
let hundred_ms: Duration = 100_000_000;
let (p1, p2) = Channel::create(ChannelOpts::Normal).unwrap();
let server = thread::spawn(move || {
assert_eq!(p2.wait(MX_CHANNEL_READABLE, deadline_after(hundred_ms)),
Ok(MX_CHANNEL_READABLE | MX_CHANNEL_WRITABLE | MX_SIGNAL_LAST_HANDLE));
let mut buf = MessageBuf::new();
assert_eq!(p2.read(0, &mut buf), Ok(()));
assert_eq!(buf.bytes(), b"txidcall");
assert_eq!(buf.n_handles(), 0);
let mut empty = vec![];
assert_eq!(p2.write(b"txidresponse", &mut empty, 0), Ok(()));
});
let mut empty = vec![];
let mut buf = MessageBuf::new();
buf.ensure_capacity_bytes(12);
assert_eq!(p1.call(0, deadline_after(hundred_ms), b"txidcall", &mut empty, &mut buf),
Ok(()));
assert_eq!(buf.bytes(), b"txidresponse");
assert_eq!(buf.n_handles(), 0);
assert!(server.join().is_ok());
}
}