use super::authenticator::{
AuthenticatedPeerLink, DummyLinkAuthenticator, DummyPeerAuthenticator, LinkAuthenticator,
PeerAuthenticator,
};
use super::defaults::{
SESSION_BATCH_SIZE, SESSION_KEEP_ALIVE, SESSION_LEASE, SESSION_OPEN_MAX_CONCURRENT,
SESSION_OPEN_RETRIES, SESSION_OPEN_TIMEOUT, SESSION_SEQ_NUM_RESOLUTION,
};
use super::transport::SessionTransport;
use super::Session;
use super::SessionHandler;
use crate::core::{PeerId, WhatAmI, ZInt};
use crate::link::{
Link, LinkManager, LinkManagerBuilder, LinkProperties, Locator, LocatorProtocol,
};
use async_std::prelude::*;
use async_std::sync::{Arc, Mutex};
use async_std::task;
use rand::{RngCore, SeedableRng};
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use zenoh_util::core::{ZError, ZErrorKind, ZResult};
use zenoh_util::crypto::{BlockCipher, PseudoRng};
use zenoh_util::{zasynclock, zerror};
pub struct SessionManagerConfig {
pub version: u8,
pub whatami: WhatAmI,
pub id: PeerId,
pub handler: Arc<dyn SessionHandler + Send + Sync>,
}
pub struct SessionManagerOptionalConfig {
pub lease: Option<ZInt>,
pub keep_alive: Option<ZInt>,
pub sn_resolution: Option<ZInt>,
pub batch_size: Option<usize>,
pub timeout: Option<u64>,
pub retries: Option<usize>,
pub max_sessions: Option<usize>,
pub max_links: Option<usize>,
pub peer_authenticator: Option<Vec<PeerAuthenticator>>,
pub link_authenticator: Option<Vec<LinkAuthenticator>>,
}
pub(super) struct SessionManagerConfigInner {
pub(super) version: u8,
pub(super) whatami: WhatAmI,
pub(super) pid: PeerId,
pub(super) lease: ZInt,
pub(super) keep_alive: ZInt,
pub(super) sn_resolution: ZInt,
pub(super) batch_size: usize,
pub(super) timeout: u64,
pub(super) retries: usize,
pub(super) max_sessions: Option<usize>,
pub(super) max_links: Option<usize>,
pub(super) peer_authenticator: Vec<PeerAuthenticator>,
pub(super) link_authenticator: Vec<LinkAuthenticator>,
pub(super) handler: Arc<dyn SessionHandler + Send + Sync>,
}
pub(super) struct Opened {
pub(super) whatami: WhatAmI,
pub(super) sn_resolution: ZInt,
pub(super) initial_sn: ZInt,
}
#[derive(Clone)]
pub struct SessionManager {
pub(super) config: Arc<SessionManagerConfigInner>,
pub(super) opened: Arc<Mutex<HashMap<PeerId, Opened>>>,
pub(super) incoming: Arc<Mutex<HashSet<Link>>>,
pub(super) prng: Arc<Mutex<PseudoRng>>,
pub(super) cipher: Arc<BlockCipher>,
protocols: Arc<Mutex<HashMap<LocatorProtocol, LinkManager>>>,
sessions: Arc<Mutex<HashMap<PeerId, Arc<SessionTransport>>>>,
}
impl SessionManager {
pub fn new(
config: SessionManagerConfig,
opt_config: Option<SessionManagerOptionalConfig>,
) -> SessionManager {
let mut lease = *SESSION_LEASE;
let mut keep_alive = *SESSION_KEEP_ALIVE;
let mut sn_resolution = *SESSION_SEQ_NUM_RESOLUTION;
let mut batch_size = *SESSION_BATCH_SIZE;
let mut timeout = *SESSION_OPEN_TIMEOUT;
let mut retries = *SESSION_OPEN_RETRIES;
let mut max_sessions = None;
let mut max_links = None;
let mut peer_authenticator = vec![DummyPeerAuthenticator::make()];
let mut link_authenticator = vec![DummyLinkAuthenticator::make()];
if let Some(opt) = opt_config {
if let Some(v) = opt.lease {
lease = v;
}
if let Some(v) = opt.keep_alive {
keep_alive = v;
}
if let Some(v) = opt.sn_resolution {
sn_resolution = v;
}
if let Some(v) = opt.batch_size {
batch_size = v;
}
if let Some(v) = opt.timeout {
timeout = v;
}
if let Some(v) = opt.retries {
retries = v;
}
max_sessions = opt.max_sessions;
max_links = opt.max_links;
if let Some(v) = opt.peer_authenticator {
peer_authenticator = v;
}
if let Some(v) = opt.link_authenticator {
link_authenticator = v;
}
}
let config_inner = SessionManagerConfigInner {
version: config.version,
whatami: config.whatami,
pid: config.id.clone(),
lease,
keep_alive,
sn_resolution,
batch_size,
timeout,
retries,
max_sessions,
max_links,
peer_authenticator,
link_authenticator,
handler: config.handler,
};
let mut prng = PseudoRng::from_entropy();
let mut key = [0u8; BlockCipher::BLOCK_SIZE];
prng.fill_bytes(&mut key);
let cipher = BlockCipher::new(key);
SessionManager {
config: Arc::new(config_inner),
protocols: Arc::new(Mutex::new(HashMap::new())),
sessions: Arc::new(Mutex::new(HashMap::new())),
opened: Arc::new(Mutex::new(HashMap::new())),
incoming: Arc::new(Mutex::new(HashSet::new())),
prng: Arc::new(Mutex::new(prng)),
cipher: Arc::new(cipher),
}
}
pub fn pid(&self) -> PeerId {
self.config.pid.clone()
}
pub async fn add_listener(&self, locator: &Locator) -> ZResult<Locator> {
let manager = self.get_or_new_link_manager(&locator.get_proto()).await;
manager.new_listener(locator).await
}
pub async fn get_listeners(&self) -> Vec<Locator> {
let mut vec: Vec<Locator> = vec![];
for p in zasynclock!(self.protocols).values() {
vec.extend_from_slice(&p.get_listeners().await);
}
vec
}
pub async fn get_locators(&self) -> Vec<Locator> {
let mut vec: Vec<Locator> = vec![];
for p in zasynclock!(self.protocols).values() {
vec.extend_from_slice(&p.get_locators().await);
}
vec
}
pub async fn del_listener(&self, locator: &Locator) -> ZResult<()> {
let manager = self.get_link_manager(&locator.get_proto()).await?;
manager.del_listener(locator).await?;
if manager.get_listeners().await.is_empty() {
self.del_link_manager(&locator.get_proto()).await?;
}
Ok(())
}
async fn get_or_new_link_manager(&self, protocol: &LocatorProtocol) -> LinkManager {
loop {
match self.get_link_manager(protocol).await {
Ok(manager) => return manager,
Err(_) => match self.new_link_manager(protocol).await {
Ok(manager) => return manager,
Err(_) => continue,
},
}
}
}
async fn new_link_manager(&self, protocol: &LocatorProtocol) -> ZResult<LinkManager> {
let mut w_guard = zasynclock!(self.protocols);
if w_guard.contains_key(protocol) {
return zerror!(ZErrorKind::Other {
descr: format!(
"Can not create the link manager for protocol ({}) because it already exists",
protocol
)
});
}
let lm = LinkManagerBuilder::make(self.clone(), protocol);
w_guard.insert(protocol.clone(), lm.clone());
Ok(lm)
}
async fn get_link_manager(&self, protocol: &LocatorProtocol) -> ZResult<LinkManager> {
match zasynclock!(self.protocols).get(protocol) {
Some(manager) => Ok(manager.clone()),
None => zerror!(ZErrorKind::Other {
descr: format!(
"Can not get the link manager for protocol ({}) because it has not been found",
protocol
)
}),
}
}
async fn del_link_manager(&self, protocol: &LocatorProtocol) -> ZResult<()> {
match zasynclock!(self.protocols).remove(protocol) {
Some(_) => Ok(()),
None => zerror!(ZErrorKind::Other {
descr: format!("Can not delete the link manager for protocol ({}) because it has not been found.", protocol)
})
}
}
pub async fn get_session(&self, peer: &PeerId) -> Option<Session> {
zasynclock!(self.sessions)
.get(peer)
.map(|t| Session::new(Arc::downgrade(&t)))
}
pub async fn get_sessions(&self) -> Vec<Session> {
zasynclock!(self.sessions)
.values()
.map(|t| Session::new(Arc::downgrade(&t)))
.collect()
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn get_or_new_session(
&self,
peer: &PeerId,
whatami: &WhatAmI,
sn_resolution: ZInt,
initial_sn_tx: ZInt,
initial_sn_rx: ZInt,
) -> Session {
loop {
match self.get_session(peer).await {
Some(session) => return session,
None => match self
.new_session(peer, whatami, sn_resolution, initial_sn_tx, initial_sn_rx)
.await
{
Ok(session) => return session,
Err(_) => continue,
},
}
}
}
pub(super) async fn del_session(&self, peer: &PeerId) -> ZResult<()> {
match zasynclock!(self.sessions).remove(peer) {
Some(_) => {
for pa in self.config.peer_authenticator.iter() {
pa.handle_close(peer).await;
}
Ok(())
}
None => {
let e = format!("Can not delete the session of peer: {}", peer);
log::trace!("{}", e);
zerror!(ZErrorKind::Other { descr: e })
}
}
}
pub(super) async fn new_session(
&self,
peer: &PeerId,
whatami: &WhatAmI,
sn_resolution: ZInt,
initial_sn_tx: ZInt,
initial_sn_rx: ZInt,
) -> ZResult<Session> {
let mut w_guard = zasynclock!(self.sessions);
if w_guard.contains_key(peer) {
let e = format!("Can not create a new session for peer: {}", peer);
log::trace!("{}", e);
return zerror!(ZErrorKind::Other { descr: e });
}
let a_ch = Arc::new(SessionTransport::new(
self.clone(),
peer.clone(),
*whatami,
sn_resolution,
initial_sn_tx,
initial_sn_rx,
));
let session = Session::new(Arc::downgrade(&a_ch));
w_guard.insert(peer.clone(), a_ch);
log::debug!(
"New session opened with {}: whatami {}, sn resolution {}, initial sn tx {}, initial sn rx {}",
peer,
whatami,
sn_resolution,
initial_sn_tx,
initial_sn_rx
);
Ok(session)
}
pub async fn open_session(&self, locator: &Locator) -> ZResult<Session> {
let to = Duration::from_millis(self.config.timeout);
let manager = self.get_or_new_link_manager(&locator.get_proto()).await;
let link = match manager.new_link(&locator).await {
Ok(link) => link,
Err(e) => {
log::warn!("Can not to create a link to locator {}: {}", locator, e);
return Err(e);
}
};
let retries = self.config.retries;
for i in 0..retries {
match super::initial::open_link(self, &link).timeout(to).await {
Ok(res) => return res,
Err(e) => log::debug!(
"Can not open a session to {}: {}. Timeout: {:?}. Attempt: {}/{}",
locator,
e,
to,
i + 1,
retries
),
}
}
let e = format!(
"Can not open a session to {}: maximum number of attemps reached ({})",
locator, retries
);
log::warn!("{}", e);
zerror!(ZErrorKind::Other { descr: e })
}
pub(crate) async fn handle_new_link(&self, link: Link, properties: Option<LinkProperties>) {
let mut guard = zasynclock!(self.incoming);
if guard.len() >= *SESSION_OPEN_MAX_CONCURRENT {
log::trace!("Closing link for preventing potential DoS: {}", link);
let _ = link.close().await;
return;
}
log::trace!("New link waiting... {}", link);
guard.insert(link.clone());
drop(guard);
let mut peer_id: Option<PeerId> = None;
for la in self.config.link_authenticator.iter() {
let res = la.handle_new_link(&link, properties.clone()).await;
match res {
Ok(pid) => {
if let Some(pid1) = peer_id.as_ref() {
if let Some(pid2) = pid.as_ref() {
if pid1 != pid2 {
log::debug!("Ambigous PeerID identification for link: {}", link);
let _ = link.close().await;
zasynclock!(self.incoming).remove(&link);
return;
}
}
} else {
peer_id = pid;
}
}
Err(e) => {
log::debug!("{}", e);
return;
}
}
}
let c_incoming = self.incoming.clone();
let c_manager = self.clone();
task::spawn(async move {
let auth_link = AuthenticatedPeerLink {
src: link.get_src(),
dst: link.get_dst(),
peer_id,
properties,
};
let to = Duration::from_millis(*SESSION_OPEN_TIMEOUT);
let res = super::initial::accept_link(&c_manager, &link, &auth_link)
.timeout(to)
.await;
match res {
Ok(res) => {
if let Err(e) = res {
log::debug!("{}", e);
}
}
Err(e) => {
log::debug!("{}", e);
let _ = link.close().await;
}
}
zasynclock!(c_incoming).remove(&link);
});
}
}