use std::sync::Arc;
use holochain_serialized_bytes::SerializedBytes;
use must_future::MustBoxFuture;
use stream_cancel::Trigger;
use stream_cancel::Valved;
use url2::Url2;
use crate::websocket::PairShutdown;
use crate::websocket::RxFromWebsocket;
use crate::websocket::TxToWebsocket;
use crate::OutgoingMessage;
use crate::WebsocketResult;
pub struct WebsocketReceiver {
rx_from_websocket: Valved<Valved<RxFromWebsocket>>,
remote_addr: Url2,
handle: Option<ReceiverHandle>,
__pair_shutdown: Arc<PairShutdown>,
}
pub type Response = Box<
dyn FnOnce(SerializedBytes) -> MustBoxFuture<'static, WebsocketResult<()>>
+ 'static
+ Send
+ Sync,
>;
pub enum Respond {
Signal,
Request(Response),
}
pub(crate) struct CancelResponse(bool, TxToWebsocket, u64);
pub struct ReceiverHandle {
shutdown: Trigger,
}
pub(crate) enum IncomingMessage {
Close {
acknowledge: tokio::sync::oneshot::Sender<()>,
},
Msg(SerializedBytes, Respond),
}
pub type WebsocketMessage = (SerializedBytes, Respond);
impl WebsocketReceiver {
pub(crate) fn new(
rx_from_websocket: Valved<RxFromWebsocket>,
remote_addr: Url2,
pair_shutdown: Arc<PairShutdown>,
) -> Self {
let (shutdown, rx_from_websocket_valved) = Valved::new(rx_from_websocket);
let handle = Some(ReceiverHandle { shutdown });
Self {
rx_from_websocket: rx_from_websocket_valved,
remote_addr,
handle,
__pair_shutdown: pair_shutdown,
}
}
pub fn take_handle(&mut self) -> Option<ReceiverHandle> {
self.handle.take()
}
pub fn remote_addr(&self) -> &Url2 {
&self.remote_addr
}
}
impl futures::stream::Stream for WebsocketReceiver {
type Item = WebsocketMessage;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll::*;
let p = std::pin::Pin::new(&mut self.rx_from_websocket);
match futures::stream::Stream::poll_next(p, cx) {
Ready(Some(IncomingMessage::Msg(msg, resp))) => Ready(Some((msg, resp))),
Ready(Some(IncomingMessage::Close { acknowledge })) => {
acknowledge.send(()).ok();
Ready(None)
}
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
impl ReceiverHandle {
pub fn close(self) {
tracing::trace!("Closing Receiver");
self.shutdown.cancel()
}
pub async fn close_on<F>(self, f: F)
where
F: std::future::Future<Output = bool>,
{
if f.await {
self.close()
}
}
}
impl Respond {
pub fn is_request(&self) -> bool {
match self {
Respond::Signal => false,
Respond::Request(_) => true,
}
}
pub async fn respond(self, msg: SerializedBytes) -> WebsocketResult<()> {
match self {
Respond::Signal => Ok(()),
Respond::Request(r) => r(msg).await,
}
}
}
impl CancelResponse {
pub fn new(send_response: TxToWebsocket, id: u64) -> Self {
Self(true, send_response, id)
}
pub fn response_sent(mut self) {
self.0 = false;
}
}
impl Drop for CancelResponse {
fn drop(&mut self) {
if self.0 {
let tx = self.1.clone();
let id = self.2;
tokio::spawn(async move {
if let Err(e) = tx.send(OutgoingMessage::Response(None, id)).await {
tracing::warn!("Failed to cancel response on drop {:?}", e);
}
});
}
}
}