mod connection_group;
use crate::{client::SafeKey, event::NetworkEvent, event::NetworkTx, CoreError, CoreFuture};
use connection_group::ConnectionGroup;
use futures::{future, Future};
use quic_p2p::Config as QuicP2pConfig;
use safe_nd::{Message, PublicId, Response};
use std::{
cell::RefCell,
collections::{hash_map::Entry, HashMap},
rc::Rc,
time::Duration,
};
use tokio::prelude::FutureExt;
const CONNECTION_TIMEOUT_SECS: u64 = 30;
#[derive(Clone)]
pub struct ConnectionManager {
inner: Rc<RefCell<Inner>>,
}
impl ConnectionManager {
pub fn new(mut config: QuicP2pConfig, net_tx: &NetworkTx) -> Result<Self, CoreError> {
config.port = None;
let inner = Rc::new(RefCell::new(Inner {
config,
groups: HashMap::default(),
net_tx: net_tx.clone(),
}));
Ok(Self { inner })
}
pub fn has_connection_to(&self, pub_id: &PublicId) -> bool {
let inner = self.inner.borrow();
inner.groups.contains_key(&pub_id)
}
pub fn send(&mut self, pub_id: &PublicId, msg: &Message) -> Box<CoreFuture<Response>> {
self.inner.borrow_mut().send(pub_id, msg)
}
pub fn bootstrap(&mut self, full_id: SafeKey) -> Box<CoreFuture<()>> {
self.inner.borrow_mut().bootstrap(full_id)
}
pub fn restart_network(&mut self) {
unimplemented!();
}
pub fn disconnect(&mut self, pub_id: &PublicId) -> Box<CoreFuture<()>> {
self.inner.borrow_mut().disconnect(pub_id)
}
}
struct Inner {
config: QuicP2pConfig,
groups: HashMap<PublicId, ConnectionGroup>,
net_tx: NetworkTx,
}
impl Drop for Inner {
fn drop(&mut self) {
trace!("Dropped ConnectionManager - terminating gracefully");
let _ = self.net_tx.unbounded_send(NetworkEvent::Disconnected);
}
}
impl Inner {
fn bootstrap(&mut self, full_id: SafeKey) -> Box<CoreFuture<()>> {
trace!("Trying to bootstrap with group {:?}", full_id.public_id());
let elders = Default::default();
let (connected_tx, connected_rx) = futures::oneshot();
if let Entry::Vacant(value) = self.groups.entry(full_id.public_id()) {
let _ = value.insert(fry!(ConnectionGroup::new(
self.config.clone(),
full_id,
elders,
connected_tx
)));
Box::new(
connected_rx
.map_err(|err| CoreError::from(format!("{}", err)))
.and_then(|res| res)
.timeout(Duration::from_secs(CONNECTION_TIMEOUT_SECS))
.map_err(|_e| CoreError::RequestTimeout),
)
} else {
trace!("Group {} is already connected", full_id.public_id());
ok!(())
}
}
fn send(&mut self, pub_id: &PublicId, msg: &Message) -> Box<CoreFuture<Response>> {
let msg_id = if let Message::Request { message_id, .. } = msg {
*message_id
} else {
return Box::new(future::err(CoreError::Unexpected(
"Not a Request".to_string(),
)));
};
let conn_group = fry!(self.groups.get_mut(&pub_id).ok_or_else(|| {
CoreError::Unexpected(
"No connection group found - did you call `bootstrap`?".to_string(),
)
}));
conn_group.send(msg_id, msg)
}
pub fn disconnect(&mut self, pub_id: &PublicId) -> Box<CoreFuture<()>> {
trace!("Disconnecting group {:?}", pub_id);
let group = self.groups.remove(&pub_id);
if let Some(mut group) = group {
Box::new(group.close().map(move |res| {
let _ = group;
res
}))
} else {
error!("No group found for {}", pub_id); ok!(())
}
}
}