mod connection_group;
mod response_manager;
use tokio::time::timeout;
use crate::{client::SafeKey, network_event::NetworkEvent, network_event::NetworkTx, CoreError};
use connection_group::ConnectionGroup;
use futures::lock::Mutex;
use log::{error, trace};
use quic_p2p::Config as QuicP2pConfig;
use safe_nd::{Message, PublicId, Response};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
time::Duration,
};
const CONNECTION_TIMEOUT_SECS: u64 = 30;
#[derive(Clone)]
pub struct ConnectionManager {
inner: Arc<Mutex<Inner>>,
}
impl ConnectionManager {
pub fn new(mut config: QuicP2pConfig, net_tx: &NetworkTx) -> Result<Self, CoreError> {
config.port = Some(0);
let inner = Arc::new(Mutex::new(Inner {
config,
groups: HashMap::default(),
net_tx: net_tx.clone(),
}));
Ok(Self { inner })
}
pub async fn has_connection_to(&self, pub_id: &PublicId) -> bool {
let inner = self.inner.lock().await;
inner.groups.contains_key(&pub_id)
}
pub async fn send(&mut self, pub_id: &PublicId, msg: &Message) -> Result<Response, CoreError> {
self.inner.lock().await.send(pub_id, msg).await
}
pub async fn bootstrap(&mut self, full_id: SafeKey) -> Result<(), CoreError> {
self.inner.lock().await.bootstrap(full_id).await
}
pub fn restart_network(&mut self) {
unimplemented!();
}
pub async fn disconnect(&mut self, pub_id: &PublicId) -> Result<(), CoreError> {
self.inner.lock().await.disconnect(pub_id).await
}
}
struct Inner {
config: QuicP2pConfig,
groups: HashMap<PublicId, ConnectionGroup>,
net_tx: NetworkTx,
}
impl Drop for Inner {
fn drop(&mut self) {
trace!("Dropped ConnectionManager - terminating gracefully");
let _ = self.net_tx.unbounded_send(NetworkEvent::Disconnected);
}
}
impl Inner {
async fn bootstrap(&mut self, full_id: SafeKey) -> Result<(), CoreError> {
trace!("Trying to bootstrap with group {:?}", full_id.public_id());
let (connected_tx, connected_rx) = futures::channel::oneshot::channel();
if let Entry::Vacant(value) = self.groups.entry(full_id.public_id()) {
let _ = value
.insert(ConnectionGroup::new(self.config.clone(), full_id, connected_tx).await?);
match timeout(Duration::from_secs(CONNECTION_TIMEOUT_SECS), connected_rx).await {
Ok(response) => response.map_err(|err| CoreError::from(format!("{}", err)))?,
Err(_) => Err(CoreError::from(
"Connection timed out when bootstrapping to the network",
)),
}
} else {
trace!("Group {} is already connected", full_id.public_id());
Ok(())
}
}
async fn send(&mut self, pub_id: &PublicId, msg: &Message) -> Result<Response, CoreError> {
let msg_id = if let Message::Request { message_id, .. } = msg {
*message_id
} else {
return Err(CoreError::Unexpected("Not a Request".to_string()));
};
let conn_group = self.groups.get_mut(&pub_id).ok_or_else(|| {
CoreError::Unexpected(
"No connection group found - did you call `bootstrap`?".to_string(),
)
})?;
conn_group.send(msg_id, msg).await
}
pub async fn disconnect(&mut self, pub_id: &PublicId) -> Result<(), CoreError> {
trace!("Disconnecting group {:?}", pub_id);
let group = self.groups.remove(&pub_id);
if let Some(mut group) = group {
group.close().await.map(move |res| {
let _ = group;
res
})
} else {
error!("No group found for {}", pub_id); Ok(())
}
}
}