extern crate cocaine;
extern crate futures;
extern crate libc;
extern crate net2;
extern crate rmp_serde as rmps;
extern crate tokio_core;
use std::io::{ErrorKind, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener};
use std::os::unix::io::IntoRawFd;
use std::thread;
use futures::sync::oneshot;
use net2::TcpStreamExt;
use tokio_core::reactor::Core;
use cocaine::{Dispatch, Error, FixedResolver, Request, Response, ServiceBuilder};
fn endpoint() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0)
}
#[test]
fn connect() {
let sock = TcpListener::bind(&endpoint()).unwrap();
let addr = sock.local_addr().unwrap();
let (tx, rx) = oneshot::channel();
let thread = thread::spawn(move || {
sock.accept().unwrap();
drop(tx.send(()));
});
let mut core = Core::new().unwrap();
let service = ServiceBuilder::new("service")
.resolver(FixedResolver::new(vec![addr]))
.build(&core.handle());
core.run(service.connect()).unwrap();
core.run(rx).unwrap();
thread.join().unwrap();
}
#[test]
fn connection_refused() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let mut core = Core::new().unwrap();
let service = ServiceBuilder::new("service")
.resolver(FixedResolver::new(vec![addr]))
.build(&core.handle());
match core.run(service.connect()).err().unwrap() {
Error::Io(ref err) => {
assert_eq!(ErrorKind::ConnectionRefused, err.kind());
}
err => panic!("expected I/O error, actual {:?}", err),
}
}
#[test]
fn connection_refused_because_invalid_framing() {
let sock = TcpListener::bind(&endpoint()).unwrap();
let addr = sock.local_addr().unwrap();
let (tx, rx) = oneshot::channel();
let thread = thread::spawn(move || {
let (mut sock, ..) = sock.accept().unwrap();
sock.set_linger(None).unwrap();
let frame = rmps::to_vec(&(1, 0, &[0u8; 0])).unwrap();
sock.write(&frame).unwrap();
tx.send(()).unwrap();
});
let mut core = Core::new().unwrap();
let service = ServiceBuilder::new("service")
.locator_addrs(vec![addr])
.build(&core.handle());
match core.run(service.connect()).err().unwrap() {
Error::InvalidDataFraming(..) |
Error::Canceled => {}
err => panic!("expected `InvalidDataFraming` error, actual {:?}", err),
}
match core.run(rx) {
Ok(()) => {}
Err(futures::Canceled) => {}
}
thread.join().unwrap();
}
#[test]
fn dispatch_receives_rst() {
let sock = TcpListener::bind(&endpoint()).unwrap();
let addr = sock.local_addr().unwrap();
let thread = thread::spawn(move || {
let (mut sock, ..) = sock.accept().unwrap();
sock.set_linger(None).unwrap();
let frame = rmps::to_vec(&(1, 0, &[0u8; 0])).unwrap();
sock.write(&frame[..2]).unwrap();
unsafe {
libc::close(sock.into_raw_fd());
}
});
let mut core = Core::new().unwrap();
let service = ServiceBuilder::new("service")
.resolver(FixedResolver::new(vec![addr]))
.build(&core.handle());
struct MockDispatch {
tx: oneshot::Sender<()>,
}
impl Dispatch for MockDispatch {
fn process(self: Box<Self>, _response: &Response) -> Option<Box<Dispatch>> {
panic!("expected calling `discard`, called `process`");
}
fn discard(self: Box<Self>, err: &Error) {
match err {
&Error::Io(ref err) => {
assert!(
ErrorKind::ConnectionReset == err.kind() ||
ErrorKind::UnexpectedEof == err.kind()
);
drop(self.tx.send(()));
}
err => panic!("expected I/O error, actual {:?}", err),
}
}
}
let (tx, rx) = oneshot::channel();
match core.run(service.call(Request::new(0, &["node"]).unwrap(), MockDispatch { tx: tx })) {
Ok(..) => {}
Err(Error::Io(ref err)) if err.kind() == ErrorKind::BrokenPipe => {}
Err(err) => panic!("expected I/O error, actual {:?}", err),
}
match core.run(rx) {
Ok(()) => {}
Err(futures::Canceled) => {}
}
thread.join().unwrap();
}