use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use futures::channel::oneshot;
use samod_core::ConnectionId;
use crate::{
AcceptorEvent, ConnFinishedReason, PeerInfo,
unbounded::{self, UnboundedReceiver, UnboundedSender},
};
#[derive(Clone)]
pub struct ConnectionHandle {
id: ConnectionId,
tx: UnboundedSender<Vec<u8>>,
inner: Arc<RwLock<Inner>>,
}
struct Inner {
info: Option<PeerInfo>,
handshake_listeners: Vec<oneshot::Sender<Result<PeerInfo, ConnFinishedReason>>>,
finished_listeners: Vec<oneshot::Sender<ConnFinishedReason>>,
event_listeners: Vec<unbounded::UnboundedSender<AcceptorEvent>>,
finished_reason: Option<ConnFinishedReason>,
rx: Option<UnboundedReceiver<Vec<u8>>>,
}
impl ConnectionHandle {
pub(crate) fn new(id: ConnectionId) -> Self {
let (tx, rx) = unbounded::channel();
Self {
id,
tx,
inner: Arc::new(RwLock::new(Inner {
info: None,
handshake_listeners: Vec::new(),
finished_listeners: Vec::new(),
event_listeners: Vec::new(),
finished_reason: None,
rx: Some(rx),
})),
}
}
pub(crate) fn id(&self) -> ConnectionId {
self.id
}
pub(crate) fn info(&self) -> Option<PeerInfo> {
let inner = Self::read(&self.inner);
inner.info.clone()
}
pub(crate) fn take_rx(&self) -> UnboundedReceiver<Vec<u8>> {
let mut inner = Self::write(&self.inner);
inner
.rx
.take()
.expect("receiver for connection already taken")
}
pub(crate) fn send(&self, msg: Vec<u8>) {
let _ = self.tx.unbounded_send(msg);
}
pub fn handshake_complete(
&self,
) -> impl Future<Output = Result<PeerInfo, ConnFinishedReason>> + 'static {
let inner = self.inner.clone();
async move {
let rx = {
let mut inner = Self::write(&inner);
if let Some(finished_reason) = inner.finished_reason.take() {
return Err(finished_reason);
}
if let Some(info) = &inner.info {
return Ok(info.clone());
}
let (tx, rx) = oneshot::channel();
inner.handshake_listeners.push(tx);
rx
};
rx.await.unwrap()
}
}
pub fn events(&self) -> impl futures::Stream<Item = AcceptorEvent> + Unpin {
let (tx, rx) = unbounded::channel();
let mut inner = self.inner.write().unwrap();
inner.event_listeners.push(tx);
rx
}
pub(crate) fn notify_client_connected(&self, peer_info: PeerInfo) {
let mut inner = self.inner.write().unwrap();
let event = AcceptorEvent::ClientConnected {
peer_info,
connection_id: self.id,
};
inner
.event_listeners
.retain(|tx| tx.unbounded_send(event.clone()).is_ok());
}
pub(crate) fn notify_client_disconnected(&self, reason: ConnFinishedReason) {
let mut inner = self.inner.write().unwrap();
let event = AcceptorEvent::ClientDisconnected {
connection_id: self.id,
reason,
};
inner
.event_listeners
.retain(|tx| tx.unbounded_send(event.clone()).is_ok());
}
pub(crate) fn finished(
&self,
) -> impl Future<Output = ConnFinishedReason> + Sync + Send + 'static {
let inner = self.inner.clone();
async move {
{
let rx = {
let mut inner = Self::write(&inner);
if let Some(reason) = inner.finished_reason.take() {
return reason;
}
let (tx, rx) = oneshot::channel();
inner.finished_listeners.push(tx);
rx
};
rx.await.unwrap()
}
}
}
pub(crate) fn notify_handshake_complete(&self, peer_info: PeerInfo) {
let mut inner = Self::write(&self.inner);
inner.info = Some(peer_info.clone());
for l in inner.handshake_listeners.drain(..) {
let _ = l.send(Ok(peer_info.clone()));
}
}
pub(crate) fn notify_finished(&self, reason: ConnFinishedReason) {
let mut inner = Self::write(&self.inner);
inner.finished_reason = Some(reason.clone());
for l in inner.finished_listeners.drain(..) {
let _ = l.send(reason.clone());
}
}
fn read(inner: &Arc<RwLock<Inner>>) -> RwLockReadGuard<'_, Inner> {
match inner.read() {
Ok(r) => r,
Err(e) => {
inner.clear_poison();
e.into_inner()
}
}
}
fn write(inner: &Arc<RwLock<Inner>>) -> RwLockWriteGuard<'_, Inner> {
match inner.write() {
Ok(r) => r,
Err(e) => {
inner.clear_poison();
e.into_inner()
}
}
}
}