async_ws/connection/
send.rs

1use crate::connection::waker::{new_waker, Wakers};
2use crate::connection::writer::WsMessageWriter;
3use crate::connection::{WsConnectionError, WsConnectionInner};
4use crate::message::WsMessageKind;
5use futures::{AsyncRead, AsyncWrite, Future};
6use std::ops::DerefMut;
7use std::pin::Pin;
8use std::sync::{Arc, Mutex};
9use std::task::{Context, Poll};
10
11pub struct WsSend<T: AsyncRead + AsyncWrite + Unpin> {
12    kind: WsMessageKind,
13    parent: Arc<Mutex<(WsConnectionInner<T>, Wakers)>>,
14}
15
16impl<T: AsyncRead + AsyncWrite + Unpin> WsSend<T> {
17    pub(crate) fn new(
18        parent: &Arc<Mutex<(WsConnectionInner<T>, Wakers)>>,
19        kind: WsMessageKind,
20    ) -> Self {
21        Self {
22            kind,
23            parent: parent.clone(),
24        }
25    }
26    pub fn kind(&self) -> WsMessageKind {
27        self.kind
28    }
29    pub fn err(&self) -> Option<Arc<WsConnectionError>> {
30        self.parent.lock().unwrap().0.err()
31    }
32}
33
34impl<T: AsyncRead + AsyncWrite + Unpin> Future for WsSend<T> {
35    type Output = Option<WsMessageWriter<T>>;
36
37    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38        let mut guard = self.parent.lock().unwrap();
39        let (inner, wakers) = guard.deref_mut();
40        wakers.send_waker = Some(cx.waker().clone());
41        let waker = new_waker(Arc::downgrade(&self.parent));
42        match inner.poll_next_writer(self.kind, &mut Context::from_waker(&waker)) {
43            Poll::Ready(Some(_)) => {
44                Poll::Ready(Some(WsMessageWriter::new(self.kind, &self.parent)))
45            }
46            Poll::Ready(None) => {
47                wakers.wake();
48                Poll::Ready(None)
49            }
50            Poll::Pending => Poll::Pending,
51        }
52    }
53}