use crate::runtime::config::*;
use crate::runtime::RuntimeProperties;
use async_std::net::UdpSocket;
use futures::prelude::*;
use socket2::{Domain, Socket, Type};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use zenoh_protocol::core::{whatami, PeerId, WhatAmI};
use zenoh_protocol::io::{RBuf, WBuf};
use zenoh_protocol::link::Locator;
use zenoh_protocol::proto::{Hello, Scout, SessionBody, SessionMessage};
use zenoh_protocol::session::{Session, SessionManager};
use zenoh_util::core::{ZError, ZErrorKind, ZResult};
use zenoh_util::zerror;
const RCV_BUF_SIZE: usize = 65536;
const SEND_BUF_INITIAL_SIZE: usize = 8;
const SCOUT_INITIAL_PERIOD: u64 = 1000; const SCOUT_MAX_PERIOD: u64 = 8000; const SCOUT_PERIOD_INCREASE_FACTOR: u64 = 2;
const ROUTER_DEFAULT_LISTENER: &str = "tcp/0.0.0.0:7447";
const PEER_DEFAULT_LISTENER: &str = "tcp/0.0.0.0:0";
pub enum Loop {
Continue,
Break,
}
#[derive(Clone)]
pub struct SessionOrchestrator {
pub whatami: WhatAmI,
pub manager: SessionManager,
}
impl SessionOrchestrator {
pub fn new(manager: SessionManager, whatami: WhatAmI) -> SessionOrchestrator {
SessionOrchestrator { whatami, manager }
}
pub async fn init(&mut self, config: RuntimeProperties) -> ZResult<()> {
match self.whatami {
whatami::CLIENT => self.init_client(config).await,
whatami::PEER => self.init_peer(config).await,
whatami::ROUTER => self.init_broker(config).await,
_ => {
log::error!("Unknown mode");
zerror!(ZErrorKind::Other {
descr: "Unknown mode".to_string()
})
}
}
}
async fn init_client(&mut self, config: RuntimeProperties) -> ZResult<()> {
let peers = config
.get_or(&ZN_PEER_KEY, "")
.split(',')
.filter_map(|s| match s.trim() {
"" => None,
s => Some(s.parse().unwrap()),
})
.collect::<Vec<Locator>>();
let scouting = config
.get_or(&ZN_MULTICAST_SCOUTING_KEY, ZN_MULTICAST_SCOUTING_DEFAULT)
.to_lowercase()
== ZN_TRUE;
let addr = config
.get_or(&ZN_MULTICAST_ADDRESS_KEY, ZN_MULTICAST_ADDRESS_DEFAULT)
.parse()
.unwrap();
let iface = config.get_or(&ZN_MULTICAST_INTERFACE_KEY, ZN_MULTICAST_INTERFACE_DEFAULT);
let timeout = std::time::Duration::from_secs_f64(
config
.get_or(&ZN_SCOUTING_TIMEOUT_KEY, ZN_SCOUTING_TIMEOUT_DEFAULT)
.parse()
.unwrap(),
);
match peers.len() {
0 => {
if scouting {
log::info!("Scouting for router ...");
let iface = SessionOrchestrator::get_interface(iface)?;
let socket = SessionOrchestrator::bind_ucast_port(iface).await?;
self.connect_first(&socket, whatami::ROUTER, &addr, timeout)
.await
} else {
zerror!(ZErrorKind::Other {
descr: "No peer specified and multicast scouting desactivated!".to_string()
})
}
}
_ => {
for locator in &peers {
match self.manager.open_session(&locator).await {
Ok(_) => return Ok(()),
Err(err) => log::warn!("Unable to connect to {}! {}", locator, err),
}
}
log::error!("Unable to connect to any of {:?}! ", peers);
zerror!(ZErrorKind::IOError {
descr: "".to_string()
})
}
}
}
pub async fn init_peer(&mut self, config: RuntimeProperties) -> ZResult<()> {
let listeners = config
.get_or(&ZN_LISTENER_KEY, PEER_DEFAULT_LISTENER)
.split(',')
.filter_map(|s| match s.trim() {
"" => None,
s => Some(s.parse().unwrap()),
})
.collect::<Vec<Locator>>();
let peers = config
.get_or(&ZN_PEER_KEY, "")
.split(',')
.filter_map(|s| match s.trim() {
"" => None,
s => Some(s.parse().unwrap()),
})
.collect::<Vec<Locator>>();
let scouting = config
.get_or(&ZN_MULTICAST_SCOUTING_KEY, ZN_MULTICAST_SCOUTING_DEFAULT)
.to_lowercase()
== ZN_TRUE;
let addr = config
.get_or(&ZN_MULTICAST_ADDRESS_KEY, ZN_MULTICAST_ADDRESS_DEFAULT)
.parse()
.unwrap();
let iface = config.get_or(&ZN_MULTICAST_INTERFACE_KEY, ZN_MULTICAST_INTERFACE_DEFAULT);
let delay = std::time::Duration::from_secs_f64(
config
.get_or(&ZN_SCOUTING_DELAY_KEY, ZN_SCOUTING_DELAY_DEFAULT)
.parse()
.unwrap(),
);
self.bind_listeners(&listeners).await?;
let this = self.clone();
async_std::task::spawn(async move { this.connector(peers).await });
if scouting {
let mcast_socket = SessionOrchestrator::bind_mcast_port(&addr).await?;
let iface = SessionOrchestrator::get_interface(iface)?;
let ucast_socket = SessionOrchestrator::bind_ucast_port(iface).await?;
let this = self.clone();
async_std::task::spawn(async move {
async_std::prelude::FutureExt::race(
this.responder(&mcast_socket, &ucast_socket),
this.connect_all(&ucast_socket, whatami::PEER | whatami::ROUTER, &addr),
)
.await;
});
}
async_std::task::sleep(delay).await;
Ok(())
}
pub async fn init_broker(&mut self, config: RuntimeProperties) -> ZResult<()> {
let listeners = config
.get_or(&ZN_LISTENER_KEY, ROUTER_DEFAULT_LISTENER)
.split(',')
.filter_map(|s| match s.trim() {
"" => None,
s => Some(s.parse().unwrap()),
})
.collect::<Vec<Locator>>();
let peers = config
.get_or(&ZN_PEER_KEY, "")
.split(',')
.filter_map(|s| match s.trim() {
"" => None,
s => Some(s.parse().unwrap()),
})
.collect::<Vec<Locator>>();
let scouting = config
.get_or(&ZN_MULTICAST_SCOUTING_KEY, ZN_MULTICAST_SCOUTING_DEFAULT)
.to_lowercase()
== ZN_TRUE;
let addr = config
.get_or(&ZN_MULTICAST_ADDRESS_KEY, ZN_MULTICAST_ADDRESS_DEFAULT)
.parse()
.unwrap();
let iface = config.get_or(&ZN_MULTICAST_INTERFACE_KEY, ZN_MULTICAST_INTERFACE_DEFAULT);
self.bind_listeners(&listeners).await?;
let this = self.clone();
async_std::task::spawn(async move { this.connector(peers).await });
if scouting {
let mcast_socket = SessionOrchestrator::bind_mcast_port(&addr).await?;
let iface = SessionOrchestrator::get_interface(iface)?;
let ucast_socket = SessionOrchestrator::bind_ucast_port(iface).await?;
let this = self.clone();
async_std::task::spawn(async move {
this.responder(&mcast_socket, &ucast_socket).await;
});
}
Ok(())
}
async fn bind_listeners(&self, listeners: &[Locator]) -> ZResult<()> {
for listener in listeners {
match self.manager.add_listener(&listener).await {
Ok(listener) => log::debug!("Listener {} added", listener),
Err(err) => {
log::error!("Unable to open listener {} : {}", listener, err);
return zerror!(
ZErrorKind::IOError {
descr: "".to_string()
},
err
);
}
}
}
for locator in self.manager.get_locators().await {
log::info!("zenohd can be reached on {}", locator);
}
Ok(())
}
pub fn get_interface(name: &str) -> ZResult<IpAddr> {
if name == "auto" {
match zenoh_util::net::get_default_multicast_interface() {
Some(addr) => Ok(addr),
None => {
log::warn!(
"Unable to find active, non-loopback multicast interface. Will use 0.0.0.0"
);
Ok(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
}
}
} else {
match name.parse::<IpAddr>() {
Ok(addr) => Ok(addr),
Err(_) => match zenoh_util::net::get_interface(name) {
Ok(opt_addr) => match opt_addr {
Some(addr) => Ok(addr),
None => {
log::error!("Unable to find interface {}", name);
zerror!(ZErrorKind::IOError {
descr: format!("Unable to find interface {}", name)
})
}
},
Err(err) => {
log::error!("Unable to find interface {} : {}", name, err);
zerror!(ZErrorKind::IOError {
descr: format!("Unable to find interface {} : {}", name, err)
})
}
},
}
}
}
pub async fn bind_mcast_port(sockaddr: &SocketAddr) -> ZResult<UdpSocket> {
let socket = match Socket::new(Domain::ipv4(), Type::dgram(), None) {
Ok(socket) => socket,
Err(err) => {
log::error!("Unable to create datagram socket : {}", err);
return zerror!(
ZErrorKind::IOError {
descr: "Unable to create datagram socket".to_string()
},
err
);
}
};
if let Err(err) = socket.set_reuse_address(true) {
log::error!("Unable to set SO_REUSEADDR option : {}", err);
return zerror!(
ZErrorKind::IOError {
descr: "Unable to set SO_REUSEADDR option".to_string()
},
err
);
}
let addr = {
#[cfg(unix)]
{
sockaddr.ip()
} #[cfg(windows)]
{
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))
}
};
match socket.bind(&SocketAddr::new(addr, sockaddr.port()).into()) {
Ok(()) => log::debug!("UDP port bound to {}", sockaddr),
Err(err) => {
log::error!("Unable to bind udp port {} : {}", sockaddr, err);
return zerror!(
ZErrorKind::IOError {
descr: format!("Unable to bind udp port {}", sockaddr)
},
err
);
}
}
let join_multicast = match sockaddr.ip() {
IpAddr::V4(addr) => socket.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)),
IpAddr::V6(addr) => socket.join_multicast_v6(&addr, 0),
};
match join_multicast {
Ok(()) => log::debug!("Joined multicast group {}", sockaddr.ip()),
Err(err) => {
log::error!("Unable to join multicast group {} : {}", sockaddr.ip(), err);
return zerror!(
ZErrorKind::IOError {
descr: format!("Unable to join multicast group {}", sockaddr.ip())
},
err
);
}
}
log::info!("zenohd listening scout messages on {}", sockaddr);
Ok(socket.into_udp_socket().into())
}
pub async fn bind_ucast_port(addr: IpAddr) -> ZResult<UdpSocket> {
let socket = match Socket::new(Domain::ipv4(), Type::dgram(), None) {
Ok(socket) => socket,
Err(err) => {
log::error!("Unable to create datagram socket : {}", err);
return zerror!(
ZErrorKind::IOError {
descr: "Unable to create datagram socket".to_string()
},
err
);
}
};
match socket.bind(&SocketAddr::new(addr, 0).into()) {
Ok(()) => {
#[allow(clippy::or_fun_call)]
let local_addr = socket
.local_addr()
.or::<std::io::Error>(Ok(SocketAddr::new(addr, 0).into()))
.unwrap()
.as_std()
.or(Some(SocketAddr::new(addr, 0)))
.unwrap();
log::debug!("UDP port bound to {}", local_addr);
}
Err(err) => {
log::error!("Unable to bind udp port {}:0 : {}", addr.to_string(), err);
return zerror!(
ZErrorKind::IOError {
descr: format!("Unable to bind udp port {}:0", addr.to_string())
},
err
);
}
}
Ok(socket.into_udp_socket().into())
}
async fn connector(&self, peers: Vec<Locator>) {
futures::future::join_all(peers.into_iter().map(|peer| async move {
loop {
log::trace!("Trying to connect to configured peer {}", peer);
if self.manager.open_session(&peer).await.is_ok() {
log::debug!("Successfully connected to configured peer {}", peer);
break;
} else {
log::warn!("Unable to connect to configured peer {}", peer);
}
async_std::task::sleep(Duration::new(5, 0)).await;
}
}))
.await;
}
pub async fn scout<Fut, F>(socket: &UdpSocket, what: WhatAmI, mcast_addr: &SocketAddr, mut f: F)
where
F: FnMut(Hello) -> Fut,
Fut: Future<Output = Loop>,
Self: Sized,
{
let send = async {
let mut delay = SCOUT_INITIAL_PERIOD;
let mut wbuf = WBuf::new(SEND_BUF_INITIAL_SIZE, false);
wbuf.write_session_message(&SessionMessage::make_scout(Some(what), true, None));
loop {
log::trace!("Send scout to {}", mcast_addr);
if let Err(err) = socket
.send_to(&RBuf::from(&wbuf).to_vec(), mcast_addr.to_string())
.await
{
log::error!("Unable to send scout to {} : {}", mcast_addr, err);
}
async_std::task::sleep(Duration::from_millis(delay)).await;
if delay * SCOUT_PERIOD_INCREASE_FACTOR <= SCOUT_MAX_PERIOD {
delay *= SCOUT_PERIOD_INCREASE_FACTOR;
}
}
};
let recv = async {
let mut buf = vec![0; RCV_BUF_SIZE];
loop {
let (n, _peer) = socket.recv_from(&mut buf).await.unwrap();
let mut rbuf = RBuf::from(&buf[..n]);
log::trace!("Received UDP datagram {}", rbuf);
if let Some(msg) = rbuf.read_session_message() {
log::trace!("Received {:?}", msg);
if let SessionBody::Hello(hello) = msg.get_body() {
let whatami = hello.whatami.or(Some(whatami::ROUTER)).unwrap();
if whatami & what != 0 {
if let Loop::Break = f(hello.clone()).await {
break;
}
} else {
log::warn!("Received unexpected hello : {:?}", msg);
}
}
}
}
};
async_std::prelude::FutureExt::race(send, recv).await;
}
async fn connect(&self, locators: &[Locator]) -> ZResult<Session> {
for locator in locators {
let session = self.manager.open_session(locator).await;
if session.is_ok() {
return session;
}
}
zerror!(ZErrorKind::Other {
descr: format!("Unable to connect any of {:?}", locators)
})
}
pub async fn connect_peer(&self, pid: &PeerId, locators: &[Locator]) {
if pid != &self.manager.pid() {
if self.manager.get_session(pid).await.is_none() {
let session = self.connect(locators).await;
if session.is_ok() {
log::debug!("Successfully connected to newly scouted {}", pid);
} else {
log::warn!("Unable to connect to scouted {}", pid);
}
} else {
log::trace!("Scouted already connected peer : {}", pid);
}
}
}
async fn connect_first(
&self,
socket: &UdpSocket,
what: WhatAmI,
addr: &SocketAddr,
timeout: std::time::Duration,
) -> ZResult<()> {
let scout = async {
SessionOrchestrator::scout(socket, what, addr, async move |hello| {
log::info!("Found {:?}", hello);
if let Some(locators) = &hello.locators {
if self.connect(locators).await.is_ok() {
log::debug!("Successfully connected to newly scouted {:?}", hello);
return Loop::Break;
}
log::warn!("Unable to connect to scouted {:?}", hello);
} else {
log::warn!("Received hello with no locators : {:?}", hello);
}
Loop::Continue
})
.await;
Ok(())
};
let timeout = async {
async_std::task::sleep(timeout).await;
zerror!(ZErrorKind::Timeout {})
};
async_std::prelude::FutureExt::race(scout, timeout).await
}
async fn connect_all(&self, ucast_socket: &UdpSocket, what: WhatAmI, addr: &SocketAddr) {
SessionOrchestrator::scout(ucast_socket, what, addr, async move |hello| {
match &hello.pid {
Some(pid) => {
if let Some(locators) = &hello.locators {
self.connect_peer(pid, locators).await
} else {
log::warn!("Received hello with no locators : {:?}", hello);
}
}
None => {
log::warn!("Received hello with no pid : {:?}", hello);
}
}
Loop::Continue
})
.await
}
async fn responder(&self, mcast_socket: &UdpSocket, ucast_socket: &UdpSocket) {
let mut buf = vec![0; RCV_BUF_SIZE];
log::debug!("Waiting for UDP datagram...");
loop {
let (n, peer) = mcast_socket.recv_from(&mut buf).await.unwrap();
if let Ok(local_addr) = ucast_socket.local_addr() {
if local_addr == peer {
log::trace!("Ignore UDP datagram from own socket");
continue;
}
}
let mut rbuf = RBuf::from(&buf[..n]);
log::trace!("Received UDP datagram {}", rbuf);
if let Some(msg) = rbuf.read_session_message() {
log::trace!("Received {:?}", msg);
if let SessionBody::Scout(Scout {
what, pid_request, ..
}) = msg.get_body()
{
let what = what.or(Some(whatami::ROUTER)).unwrap();
if what & self.whatami != 0 {
let mut wbuf = WBuf::new(SEND_BUF_INITIAL_SIZE, false);
let pid = if *pid_request {
Some(self.manager.pid())
} else {
None
};
let hello = SessionMessage::make_hello(
pid,
Some(self.whatami),
Some(self.manager.get_locators().await.clone()),
None,
);
log::trace!("Send {:?} to {}", hello, peer);
wbuf.write_session_message(&hello);
if let Err(err) = ucast_socket
.send_to(&RBuf::from(&wbuf).to_vec(), peer)
.await
{
log::error!("Unable to send {:?} to {} : {}", hello, peer, err);
}
}
}
}
}
}
pub async fn close(&mut self) -> ZResult<()> {
log::trace!("SessionOrchestrator::close())");
for session in &mut self.manager.get_sessions().await {
session.close().await?;
}
Ok(())
}
}