#![allow(unused_imports)]
#![allow(dead_code)]
#![allow(unused_variables)]
use std::sync::atomic;
use std::ptr;
use std::rt::heap::allocate;
use std::mem::size_of;
use std::rt::heap::deallocate;
use std::sync::Mutex;
use std::sync::Condvar;
use std::time::duration::Duration;
use std::mem::transmute_copy;
use std::mem::uninitialized;
use std::intrinsics::copy_memory;
use time::Timespec;
use time::get_time;
use timespec;
use net::Net;
use rawmessage::RawMessage;
use message::Message;
use message::MessagePayload;
pub enum IoErrorCode {
TimedOut,
NoMessages,
}
pub struct IoError {
pub code: IoErrorCode,
}
pub enum IoResult<T> {
Err(IoError),
Ok(T),
}
impl<T> IoResult<T> {
pub fn ok(self) -> T {
match self {
IoResult::Ok(v) => v,
IoResult::Err(e) => panic!("not `IoResult::Ok`!"),
}
}
pub fn err(self) -> IoError {
match self {
IoResult::Ok(v) => panic!("not `IoResult::Err`!"),
IoResult::Err(e) => e,
}
}
pub fn is_ok(&self) -> bool {
match *self {
IoResult::Ok(_) => true,
IoResult::Err(_) => false,
}
}
pub fn is_err(&self) -> bool {
match *self {
IoResult::Ok(_) => false,
IoResult::Err(_) => true,
}
}
}
struct Internal {
lock: Mutex<uint>,
refcnt: uint,
messages: Vec<Message>,
cwaker: Condvar,
wakeupat: Timespec,
wakeinprogress: bool,
limitpending: uint,
limitmemory: uint,
memoryused: uint,
}
pub struct Endpoint {
i: *mut Internal,
pub eid: u64,
pub sid: u64,
net: Net,
}
impl Drop for Endpoint {
fn drop(&mut self) {
unsafe {
let mut dealloc: bool = false;
{
let locked = (*self.i).lock.lock();
if (*self.i).refcnt == 0 {
panic!("drop called with refcnt zero");
}
(*self.i).refcnt -= 1;
if (*self.i).refcnt == 0 {
dealloc = true;
}
}
if dealloc {
drop(ptr::read(&*self.i));
deallocate(self.i as *mut u8, size_of::<Internal>(), size_of::<uint>());
}
}
}
}
impl Clone for Endpoint {
fn clone(&self) -> Endpoint {
unsafe {
let locked = (*self.i).lock.lock();
(*self.i).refcnt += 1;
Endpoint {
i: self.i,
eid: self.eid,
sid: self.sid,
net: self.net.clone(),
}
}
}
}
impl Endpoint {
pub fn new(sid: u64, eid: u64, net: Net) -> Endpoint {
let i: *mut Internal;
unsafe {
i = allocate(size_of::<Internal>(), size_of::<uint>()) as *mut Internal;
(*i).lock = Mutex::new(0);
(*i).refcnt = 1;
(*i).messages = Vec::new();
(*i).cwaker = Condvar::new();
(*i).wakeupat = Timespec { nsec: 0i32, sec: 0x7fffffffffffffffi64 };
(*i).wakeinprogress = false;
(*i).limitpending = 1024;
(*i).limitmemory = 1024 * 1024 * 512;
(*i).memoryused = 0;
}
Endpoint {
i: i,
eid: eid,
sid: sid,
net: net,
}
}
pub fn getwaketime(&self) -> Timespec {
unsafe {
let lock = (*self.i).lock.lock();
(*self.i).wakeupat
}
}
pub fn givesync(&mut self, msg: Message) {
unsafe {
let lock = (*self.i).lock.lock();
(*self.i).memoryused += msg.cap();
(*self.i).messages.push(msg);
self.wakeonewaiter_nolock();
}
}
pub fn give(&mut self, msg: &Message) {
unsafe {
if self.eid == msg.dsteid || msg.dsteid == 0 {
if self.sid == msg.dstsid || msg.dstsid == 0 {
let lock = (*self.i).lock.lock();
(*self.i).messages.push((*msg).clone());
(*self.i).memoryused += msg.cap();
self.wakeonewaiter_nolock();
}
}
}
}
pub fn hasmessages(&self) -> bool {
unsafe {
let lock = (*self.i).lock.lock();
if (*self.i).messages.len() > 0 {
return true;
}
}
false
}
pub fn wakeonewaiter(&self) -> bool {
unsafe {
let lock = (*self.i).lock.lock();
if !(*self.i).wakeinprogress {
(*self.i).cwaker.notify_all();
(*self.i).wakeinprogress = true;
return true;
}
false
}
}
fn wakeonewaiter_nolock(&self) {
unsafe {
(*self.i).cwaker.notify_one();
}
}
pub fn setlimitpending(&mut self, limit: uint) {
unsafe {
(*self.i).limitpending = limit;
}
}
pub fn setlimitmemory(&mut self, limit: uint) {
unsafe {
(*self.i).limitmemory = limit;
}
}
pub fn send(&self, msg: &Message) {
self.net.sendas(msg, self.sid, self.eid);
}
pub fn sendsync(&self, msg: Message) {
self.net.sendsyncas(msg, self.sid, self.eid);
}
pub fn sendorblock(&self, msg: &Message) {
unimplemented!();
}
fn neverwakeme_nolock(&self) {
unsafe {
(*self.i).wakeupat = Timespec { sec: 0x7fffffffffffffffi64, nsec: 0i32 };
}
}
pub fn recvorblock(&self, duration: Timespec) -> IoResult<Message> {
let mut when: Timespec = get_time();
when = timespec::add(when, duration);
unsafe {
let lock = (*self.i).lock.lock();
while (*self.i).messages.len() < 1 {
if (*self.i).wakeupat > when {
(*self.i).wakeupat = when;
}
(*self.i).cwaker.wait(&lock);
let ctime: Timespec = get_time();
if ctime > when {
return IoResult::Err(IoError { code: IoErrorCode::TimedOut });
}
}
self.neverwakeme_nolock();
(*self.i).wakeinprogress = false;
self.recv_nolock()
}
}
pub fn recv(&self) -> IoResult<Message> {
unsafe {
let lock = (*self.i).lock.lock();
self.recv_nolock()
}
}
pub fn recv_nolock(&self) -> IoResult<Message> {
unsafe {
if (*self.i).messages.len() < 1 {
return IoResult::Err(IoError { code: IoErrorCode::NoMessages });
}
let msg = (*self.i).messages.remove(0).unwrap();
(*self.i).memoryused -= msg.cap();
match msg.payload {
MessagePayload::Raw(_) => IoResult::Ok(msg.dup()),
MessagePayload::Sync(_) => IoResult::Ok(msg),
}
}
}
}