#![allow(unused_imports)]
#![allow(dead_code)]
#![allow(unused_variables)]
#![allow(unused_must_use)]
#![allow(deprecated)]
extern crate time;
extern crate water;
use water::Net;
use water::Endpoint;
use water::RawMessage;
use water::NoPointers;
use water::MessagePayload;
use water::Message;
use water::IoResult;
use water::IoError;
use water::IoErrorCode;
use std::thread::JoinGuard;
use std::thread::Thread;
use std::io::timer::sleep;
use std::time::duration::Duration;
use time::Timespec;
struct SafeStructure {
a: u64,
b: u32,
c: u8,
}
impl NoPointers for SafeStructure {}
const THREADCNT: uint = 3;
fn funnyworker(mut net: Net, dbgid: uint) {
let ep: Endpoint = net.new_endpoint();
while net.getepcount() < THREADCNT + 1 { }
let limit = 100u;
let mut sentmsgcnt: uint = 0u;
let mut recvmsgcnt: uint = 0u;
println!("thread[{}] started", dbgid);
let mut msgtosend = Message::new_raw(32);
msgtosend.dstsid = 0;
msgtosend.dsteid = 0;
while recvmsgcnt < limit * (THREADCNT - 1) && sentmsgcnt < limit {
loop {
let result = ep.recv();
if result.is_err() {
break;
}
let safestruct: SafeStructure = result.ok().get_raw().readstruct(0);
if safestruct.b != 0x10 {
recvmsgcnt += 1;
assert!(safestruct.b == 0x12345678u32);
assert!(safestruct.c == 0x12u8);
}
}
if sentmsgcnt < limit {
let safestruct = SafeStructure {
a: dbgid as u64,
b: 0x12345678,
c: 0x12,
};
msgtosend.get_rawmutref().writestructref(0, &safestruct);
let rby = ep.sendraw(&msgtosend);
if rby < THREADCNT {
panic!("send to only {} threads", rby);
}
sentmsgcnt += 1;
}
}
let safestruct = SafeStructure {
a: dbgid as u64,
b: 0x10,
c: 0x10,
};
msgtosend.get_rawmutref().writestructref(0, &safestruct);
ep.sendraw(&msgtosend);
println!("thread[{}]: exiting (sent buffer {})", dbgid, msgtosend.get_rawref().id());
}
#[test]
fn rawmessage() {
let m = RawMessage::new_fromstr("ABCDE");
assert!(m.readu8(0) == 65);
assert!(m.readu8(1) == 66);
assert!(m.readu8(2) == 67);
assert!(m.readu8(3) == 68);
assert!(m.readu8(4) == 69);
assert!(m.len() == 5);
}
#[test]
fn rawmsgstress() {
let mut v: Vec<RawMessage> = Vec::new();
for i in range(0u, 10000u) {
let rm = RawMessage::new(32);
v.push(rm.dup());
v.push(rm);
}
}
#[test]
fn basicio() {
for u in range(0u, 20u) {
println!("making test");
let t = Thread::spawn(move || { _basicio(); });
t.join();
}
}
fn _basicio() {
println!("entered basicio");
let mut net: Net = Net::new(234);
let ep = net.new_endpoint();
let mut completedcnt: u32 = 0u32;
println!("spawning threads");
let mut threadterm = [0u, 0u, 0u];
let mut threads: Vec<JoinGuard<()>> = Vec::new();
for i in range(0, THREADCNT) {
let netclone = net.clone();
println!("creating thread {}", i);
threads.push(Thread::spawn(move || { funnyworker(netclone, i); }));
}
let mut sectowait = 6i64;
println!("main: entering loop with ep.id:{}", ep.id());
loop {
let result = ep.recvorblock(Timespec { sec: sectowait, nsec: 0 });
sectowait = 6i64;
if result.is_err() {
panic!("timed out waiting for messages likely..");
}
let raw = result.ok().get_raw();
let safestruct: SafeStructure = raw.readstruct(0);
if safestruct.b == 0x10 {
if threadterm[safestruct.a as uint] != 0 {
panic!("got termination message from same thread twice!");
}
threadterm[safestruct.a as uint] = 1;
completedcnt += 1;
println!("main: got termination message #{} from thread {} for buffer {}", completedcnt, safestruct.b, raw.id());
if completedcnt > 2 {
break;
}
}
}
}
fn main() {
loop {
println!("making test");
let t = Thread::spawn(move || { _basicio(); });
t.join();
}
}