pub use self::errors::ServiceDiscoveryError;
mod errors;
use common::{Core, State};
use maidsafe_utilities::serialisation::{deserialise, serialise};
use mio::net::UdpSocket;
use mio::{Poll, PollOpt, Ready, Token};
use rand;
use std::any::Any;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io::ErrorKind;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::rc::Rc;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::u16;
#[derive(Serialize, Deserialize)]
enum DiscoveryMsg {
Request { guid: u64 },
Response(Vec<SocketAddr>),
}
pub struct ServiceDiscovery {
token: Token,
socket: UdpSocket,
remote_addr: SocketAddr,
listen: bool,
read_buf: [u8; 1024],
our_listeners: Arc<Mutex<Vec<SocketAddr>>>,
seek_peers_req: Vec<u8>,
reply_to: VecDeque<SocketAddr>,
observers: Vec<Sender<Vec<SocketAddr>>>,
guid: u64,
}
impl ServiceDiscovery {
pub fn start(
core: &mut Core,
poll: &Poll,
our_listeners: Arc<Mutex<Vec<SocketAddr>>>,
token: Token,
listener_port: u16,
remote_port: u16,
) -> Result<(), ServiceDiscoveryError> {
let udp_socket = UdpSocket::bind(&ipv4_addr(0, 0, 0, 0, listener_port))?;
udp_socket.set_broadcast(true)?;
let guid = rand::random();
let remote_addr = ipv4_addr(255, 255, 255, 255, remote_port);
let service_discovery = ServiceDiscovery {
token,
socket: udp_socket,
remote_addr,
listen: false,
read_buf: [0; 1024],
our_listeners,
seek_peers_req: serialise(&DiscoveryMsg::Request { guid })?,
reply_to: VecDeque::new(),
observers: Vec::new(),
guid,
};
poll.register(
&service_discovery.socket,
token,
Ready::readable(),
PollOpt::edge(),
)?;
let _ = core.insert_state(token, Rc::new(RefCell::new(service_discovery)));
Ok(())
}
pub fn set_listen(&mut self, listen: bool) {
self.listen = listen;
}
pub fn seek_peers(&mut self) -> Result<(), ServiceDiscoveryError> {
let _ = self
.socket
.send_to(&self.seek_peers_req, &self.remote_addr)?;
Ok(())
}
pub fn register_observer(&mut self, obs: Sender<Vec<SocketAddr>>) {
self.observers.push(obs);
}
fn read(&mut self, core: &mut Core, poll: &Poll) {
loop {
match self.socket.recv_from(&mut self.read_buf) {
Ok((bytes_rxd, peer_addr)) => {
self.handle_incoming_msg(bytes_rxd, peer_addr, core, poll);
}
Err(ref e)
if e.kind() == ErrorKind::Interrupted || e.kind() == ErrorKind::WouldBlock =>
{
return
}
Err(e) => {
debug!("ServiceDiscovery error in read: {:?}", e);
self.terminate(core, poll);
return;
}
};
}
}
fn handle_incoming_msg(
&mut self,
bytes_rxd: usize,
peer_addr: SocketAddr,
core: &mut Core,
poll: &Poll,
) {
let msg: DiscoveryMsg = match deserialise(&self.read_buf[..bytes_rxd]) {
Ok(msg) => msg,
Err(e) => {
debug!("Bogus message serialisation error: {:?}", e);
return;
}
};
match msg {
DiscoveryMsg::Request { guid } => {
if self.listen && self.guid != guid {
self.reply_to.push_back(peer_addr);
self.write(core, poll)
}
}
DiscoveryMsg::Response(peer_listeners) => {
self.observers
.retain(|obs| obs.send(peer_listeners.clone()).is_ok());
}
}
}
fn write(&mut self, core: &mut Core, poll: &Poll) {
if let Err(e) = self.write_impl(poll) {
debug!("Error in ServiceDiscovery write: {:?}", e);
self.terminate(core, poll);
}
}
fn write_impl(&mut self, poll: &Poll) -> Result<(), ServiceDiscoveryError> {
let our_current_listeners = unwrap!(self.our_listeners.lock()).iter().cloned().collect();
let resp = DiscoveryMsg::Response(our_current_listeners);
let serialised_resp = serialise(&resp)?;
if let Some(peer_addr) = self.reply_to.pop_front() {
match self.socket.send_to(&serialised_resp[..], &peer_addr) {
Ok(_bytes_send) => (),
Err(ref e)
if e.kind() == ErrorKind::Interrupted || e.kind() == ErrorKind::WouldBlock =>
{
self.reply_to.push_front(peer_addr)
}
Err(e) => return Err(From::from(e)),
}
}
let kind = if self.reply_to.is_empty() {
Ready::readable()
} else {
Ready::readable() | Ready::writable()
};
poll.reregister(&self.socket, self.token, kind, PollOpt::edge())?;
Ok(())
}
}
impl State for ServiceDiscovery {
fn ready(&mut self, core: &mut Core, poll: &Poll, kind: Ready) {
if kind.is_readable() {
self.read(core, poll);
}
if kind.is_writable() {
self.write(core, poll);
}
}
fn terminate(&mut self, core: &mut Core, poll: &Poll) {
let _ = poll.deregister(&self.socket);
let _ = core.remove_state(self.token);
}
fn as_any(&mut self) -> &mut Any {
self
}
}
fn ipv4_addr(a: u8, b: u8, c: u8, d: u8, port: u16) -> SocketAddr {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(a, b, c, d), port))
}
#[cfg(test)]
mod tests {
use super::*;
use common::{self, CoreMessage};
use mio::Token;
use std::str::FromStr;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{net, thread};
#[test]
fn service_discovery() {
const SERVICE_DISCOVERY_TOKEN: usize = 0;
let el0 = unwrap!(
common::spawn_event_loop(SERVICE_DISCOVERY_TOKEN + 1, Some("EL0")),
"Could not run el0"
);
let addr = unwrap!(net::SocketAddr::from_str("138.139.140.150:54321"));
let listeners_0 = Arc::new(Mutex::new(vec![addr]));
let listeners_0_clone = listeners_0.clone();
{
let token_0 = Token(SERVICE_DISCOVERY_TOKEN);
unwrap!(
el0.send(CoreMessage::new(move |core, poll| {
unwrap!(
ServiceDiscovery::start(
core,
poll,
listeners_0_clone,
token_0,
65_530,
65_530
),
"Could not spawn ServiceDiscovery_0"
);
})),
"Could not send to el0"
);
unwrap!(el0.send(CoreMessage::new(move |core, _| {
let state = unwrap!(core.get_state(token_0));
let mut inner = state.borrow_mut();
unwrap!(inner.as_any().downcast_mut::<ServiceDiscovery>()).set_listen(true);
})));
}
thread::sleep(Duration::from_millis(100));
let el1 = unwrap!(
common::spawn_event_loop(SERVICE_DISCOVERY_TOKEN + 1, Some("EL1")),
"Could not run el1"
);
let (tx, rx) = mpsc::channel();
{
let listeners_1 = Arc::new(Mutex::new(vec![]));
let token_1 = Token(SERVICE_DISCOVERY_TOKEN);
unwrap!(
el1.send(CoreMessage::new(move |core, poll| {
unwrap!(
ServiceDiscovery::start(core, poll, listeners_1, token_1, 0, 65_530),
"Could not spawn ServiceDiscovery_1"
);
})),
"Could not send to el1"
);
unwrap!(el1.send(CoreMessage::new(move |core, _| {
let state = unwrap!(core.get_state(token_1));
let mut inner = state.borrow_mut();
unwrap!(inner.as_any().downcast_mut::<ServiceDiscovery>()).register_observer(tx);
})));
unwrap!(
el1.send(CoreMessage::new(move |core, _| {
let state = unwrap!(core.get_state(token_1));
let mut inner = state.borrow_mut();
let sd = unwrap!(inner.as_any().downcast_mut::<ServiceDiscovery>());
unwrap!(sd.seek_peers());
})),
"Could not send to el1"
);
}
let peer_listeners = unwrap!(rx.recv_timeout(Duration::from_secs(30)));
assert_eq!(
peer_listeners.into_iter().collect::<Vec<_>>(),
*unwrap!(listeners_0.lock())
);
}
}