async_ws/connection/
send.rs1use crate::connection::waker::{new_waker, Wakers};
2use crate::connection::writer::WsMessageWriter;
3use crate::connection::{WsConnectionError, WsConnectionInner};
4use crate::message::WsMessageKind;
5use futures::{AsyncRead, AsyncWrite, Future};
6use std::ops::DerefMut;
7use std::pin::Pin;
8use std::sync::{Arc, Mutex};
9use std::task::{Context, Poll};
10
11pub struct WsSend<T: AsyncRead + AsyncWrite + Unpin> {
12 kind: WsMessageKind,
13 parent: Arc<Mutex<(WsConnectionInner<T>, Wakers)>>,
14}
15
16impl<T: AsyncRead + AsyncWrite + Unpin> WsSend<T> {
17 pub(crate) fn new(
18 parent: &Arc<Mutex<(WsConnectionInner<T>, Wakers)>>,
19 kind: WsMessageKind,
20 ) -> Self {
21 Self {
22 kind,
23 parent: parent.clone(),
24 }
25 }
26 pub fn kind(&self) -> WsMessageKind {
27 self.kind
28 }
29 pub fn err(&self) -> Option<Arc<WsConnectionError>> {
30 self.parent.lock().unwrap().0.err()
31 }
32}
33
34impl<T: AsyncRead + AsyncWrite + Unpin> Future for WsSend<T> {
35 type Output = Option<WsMessageWriter<T>>;
36
37 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38 let mut guard = self.parent.lock().unwrap();
39 let (inner, wakers) = guard.deref_mut();
40 wakers.send_waker = Some(cx.waker().clone());
41 let waker = new_waker(Arc::downgrade(&self.parent));
42 match inner.poll_next_writer(self.kind, &mut Context::from_waker(&waker)) {
43 Poll::Ready(Some(_)) => {
44 Poll::Ready(Some(WsMessageWriter::new(self.kind, &self.parent)))
45 }
46 Poll::Ready(None) => {
47 wakers.wake();
48 Poll::Ready(None)
49 }
50 Poll::Pending => Poll::Pending,
51 }
52 }
53}