use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, oneshot};
use super::{error::ConnectionManagerError, peer_connection::PeerConnection};
use crate::{
connection_manager::manager::{ConnectionManagerEvent, ListenerInfo},
peer_manager::NodeId,
};
#[derive(Debug)]
pub enum ConnectionManagerRequest {
DialPeer {
node_id: NodeId,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
},
CancelDial(NodeId),
NotifyListening(oneshot::Sender<ListenerInfo>),
}
#[derive(Clone)]
pub struct ConnectionManagerRequester {
sender: mpsc::Sender<ConnectionManagerRequest>,
event_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
}
impl ConnectionManagerRequester {
pub fn new(
sender: mpsc::Sender<ConnectionManagerRequest>,
event_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
) -> Self {
Self { sender, event_tx }
}
}
impl ConnectionManagerRequester {
pub fn get_event_subscription(&self) -> broadcast::Receiver<Arc<ConnectionManagerEvent>> {
self.event_tx.subscribe()
}
pub(crate) fn get_event_publisher(&self) -> broadcast::Sender<Arc<ConnectionManagerEvent>> {
self.event_tx.clone()
}
pub async fn dial_peer(&mut self, node_id: NodeId) -> Result<PeerConnection, ConnectionManagerError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.send_dial_peer(node_id, Some(reply_tx)).await?;
reply_rx
.await
.map_err(|_| ConnectionManagerError::ActorRequestCanceled)?
}
pub async fn cancel_dial(&mut self, node_id: NodeId) -> Result<(), ConnectionManagerError> {
self.sender
.send(ConnectionManagerRequest::CancelDial(node_id))
.await
.map_err(|_| ConnectionManagerError::SendToActorFailed)?;
Ok(())
}
pub(crate) async fn send_dial_peer(
&mut self,
node_id: NodeId,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
) -> Result<(), ConnectionManagerError> {
self.sender
.send(ConnectionManagerRequest::DialPeer { node_id, reply_tx })
.await
.map_err(|_| ConnectionManagerError::SendToActorFailed)?;
Ok(())
}
pub async fn wait_until_listening(&mut self) -> Result<ListenerInfo, ConnectionManagerError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(ConnectionManagerRequest::NotifyListening(reply_tx))
.await
.map_err(|_| ConnectionManagerError::SendToActorFailed)?;
reply_rx.await.map_err(|_| ConnectionManagerError::ActorRequestCanceled)
}
}