mod error;
pub mod inproc;
pub(crate) mod matrix;
pub mod multi;
#[deprecated(since = "0.3.14", note = "please use splinter::transport::socket")]
pub mod raw;
pub mod socket;
pub mod tls;
#[cfg(feature = "ws-transport")]
pub mod ws;
use mio::Evented;
pub use error::{AcceptError, ConnectError, DisconnectError, ListenError, RecvError, SendError};
pub trait Connection: Send {
fn send(&mut self, message: &[u8]) -> Result<(), SendError>;
fn recv(&mut self) -> Result<Vec<u8>, RecvError>;
fn remote_endpoint(&self) -> String;
fn local_endpoint(&self) -> String;
fn disconnect(&mut self) -> Result<(), DisconnectError>;
fn evented(&self) -> &dyn Evented;
}
pub trait Listener: Send {
fn accept(&mut self) -> Result<Box<dyn Connection>, AcceptError>;
fn endpoint(&self) -> String;
}
pub trait Incoming {
fn incoming<'a>(
&'a mut self,
) -> Box<dyn Iterator<Item = Result<Box<dyn Connection>, AcceptError>> + 'a>;
}
impl Incoming for dyn Listener {
fn incoming<'a>(
&'a mut self,
) -> Box<dyn Iterator<Item = Result<Box<dyn Connection>, AcceptError>> + 'a> {
Box::new(IncomingIter::new(self))
}
}
pub trait Transport: Send {
fn accepts(&self, address: &str) -> bool;
fn connect(&mut self, endpoint: &str) -> Result<Box<dyn Connection>, ConnectError>;
fn listen(&mut self, bind: &str) -> Result<Box<dyn Listener>, ListenError>;
}
struct IncomingIter<'a> {
listener: &'a mut dyn Listener,
}
impl<'a> IncomingIter<'a> {
pub fn new(listener: &'a mut dyn Listener) -> Self {
IncomingIter { listener }
}
}
impl<'a> Iterator for IncomingIter<'a> {
type Item = Result<Box<dyn Connection>, AcceptError>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.listener.accept())
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use std::fmt::Debug;
use std::collections::HashMap;
use std::sync::mpsc::channel;
use std::thread;
use std::time::{Duration, Instant};
use mio::{Events, Poll, PollOpt, Ready, Token};
fn assert_ok<T, E: Debug>(result: Result<T, E>) -> T {
match result {
Ok(ok) => ok,
Err(err) => panic!("Expected Ok(...), got Err({:?})", err),
}
}
macro_rules! block {
($op:expr, $err:ident) => {{
let start = Instant::now();
let duration = Duration::from_millis(60000); loop {
assert!(start.elapsed() < duration, "blocked for too long");
match $op {
Err($err::WouldBlock) => {
thread::sleep(Duration::from_millis(100));
continue;
}
Err(err) => break Err(err),
Ok(ok) => break Ok(ok),
}
}
}};
}
pub fn test_transport<T: Transport + Send + 'static>(mut transport: T, bind: &str) {
let mut listener = assert_ok(transport.listen(bind));
let endpoint = listener.endpoint();
let handle = thread::spawn(move || {
let mut client = assert_ok(transport.connect(&endpoint));
assert_eq!(client.remote_endpoint(), endpoint);
assert_ok(block!(client.send(&[0, 1, 2]), SendError));
assert_eq!(vec![3, 4, 5], assert_ok(block!(client.recv(), RecvError)));
});
let mut server = assert_ok(listener.incoming().next().unwrap());
assert_eq!(vec![0, 1, 2], assert_ok(block!(server.recv(), RecvError)));
assert_ok(block!(server.send(&[3, 4, 5]), SendError));
handle.join().unwrap();
}
pub fn test_poll<T: Transport + Send + 'static>(mut transport: T, bind: &str) {
const CONNECTIONS: usize = 16;
const POLL_DURATION: u64 = 3000;
const TOTAL_DURATION: u64 = 60000;
let mut listener = transport.listen(bind).unwrap();
let endpoint = listener.endpoint();
let (to_listener_tx, to_listener_rx) = channel();
let (to_connector_tx, to_connector_rx) = channel();
let handle = thread::spawn(move || {
let mut connections = Vec::with_capacity(CONNECTIONS);
for i in 0..CONNECTIONS {
connections.push((assert_ok(transport.connect(&endpoint)), Token(i)));
}
let poll = Poll::new().unwrap();
for (conn, token) in &connections {
poll.register(
conn.evented(),
*token,
Ready::readable() | Ready::writable(),
PollOpt::level(),
)
.unwrap();
}
to_listener_tx.send(()).unwrap();
to_connector_rx.recv().unwrap();
let mut readiness_map = HashMap::new();
let mut readable_count = 0;
let mut writable_count = 0;
let mut failure = false;
let mut events = Events::with_capacity(CONNECTIONS * 8);
let start = Instant::now();
let poll_duration = Duration::from_millis(POLL_DURATION);
let total_duration = Duration::from_millis(TOTAL_DURATION);
loop {
if start.elapsed() >= total_duration {
failure = true;
break;
}
poll.poll(&mut events, Some(poll_duration)).unwrap();
for (_conn, token) in &connections {
events
.iter()
.filter(|event| event.token() == *token)
.map(|event| event.readiness())
.for_each(|readiness| {
*readiness_map.entry(token).or_insert(readiness) |= readiness;
});
}
readable_count = readiness_map
.values()
.filter(|value| value.is_readable())
.count();
writable_count = readiness_map
.values()
.filter(|value| value.is_writable())
.count();
if readable_count >= CONNECTIONS && writable_count >= CONNECTIONS {
break;
}
}
for (mut conn, _token) in connections {
assert_eq!(b"hello".to_vec(), block!(conn.recv(), RecvError).unwrap());
assert_ok(conn.send(b"world"));
}
if failure {
assert_eq!((CONNECTIONS, CONNECTIONS), (readable_count, writable_count));
}
});
let mut connections = Vec::with_capacity(CONNECTIONS);
for _ in 0..CONNECTIONS {
connections.push(listener.accept().unwrap());
}
to_listener_rx.recv().unwrap();
for conn in &mut connections {
block!(conn.send(b"hello"), SendError).unwrap();
}
to_connector_tx.send(()).unwrap();
for mut conn in connections {
assert_eq!(b"world".to_vec(), block!(conn.recv(), RecvError).unwrap());
}
handle.join().unwrap();
}
}