async_ws/connection/
mod.rs

1mod config;
2mod decode;
3mod encode;
4mod inner;
5mod open;
6mod reader;
7mod send;
8mod waker;
9mod writer;
10
11pub use crate::connection::config::WsConfig;
12pub use crate::connection::reader::WsMessageReader;
13pub use crate::connection::send::WsSend;
14pub use crate::connection::writer::WsMessageWriter;
15
16use crate::connection::inner::WsConnectionInner;
17use crate::connection::waker::{new_waker, Wakers};
18use crate::frame::{FrameDecodeError, WsDataFrameKind};
19use crate::message::WsMessageKind;
20use futures::prelude::*;
21use std::ops::DerefMut;
22use std::pin::Pin;
23use std::sync::{Arc, Mutex};
24use std::task::{Context, Poll};
25
26pub struct WsConnection<T: AsyncRead + AsyncWrite + Unpin> {
27    parent: Arc<Mutex<(WsConnectionInner<T>, Wakers)>>,
28}
29
30impl<T: AsyncRead + AsyncWrite + Unpin> WsConnection<T> {
31    pub fn with_config(transport: T, config: WsConfig) -> Self {
32        Self {
33            parent: Arc::new(Mutex::new((
34                WsConnectionInner::with_config(transport, config),
35                Wakers::default(),
36            ))),
37        }
38    }
39    pub fn send(&self, kind: WsMessageKind) -> WsSend<T> {
40        WsSend::new(&self.parent, kind)
41    }
42    pub fn err(&self) -> Option<Arc<WsConnectionError>> {
43        self.parent.lock().unwrap().0.err()
44    }
45}
46
47impl<T: AsyncRead + AsyncWrite + Unpin> Stream for WsConnection<T> {
48    type Item = WsMessageReader<T>;
49
50    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        let mut guard = self.parent.lock().unwrap();
52        let (inner, wakers) = guard.deref_mut();
53        wakers.stream_waker = Some(cx.waker().clone());
54        let waker = new_waker(Arc::downgrade(&self.parent));
55        inner
56            .poll_next_reader(&mut Context::from_waker(&waker))
57            .map(|o| o.map(|kind| WsMessageReader::new(kind, &self.parent)))
58    }
59}
60
61#[derive(thiserror::Error, Debug)]
62pub enum WsConnectionError {
63    #[error("invalid utf8 in text message")]
64    InvalidUtf8,
65    #[error("incomplete utf8 in text message")]
66    IncompleteUtf8,
67    #[error("io error: {0}")]
68    Io(#[from] std::io::Error),
69    #[error("parse error: {0}")]
70    FrameDecodeError(#[from] FrameDecodeError),
71    #[error("timeout")]
72    Timeout,
73    #[error("unexpected frame kind {0}")]
74    UnexpectedFrameKind(WsDataFrameKind),
75}
76
77impl From<WsDataFrameKind> for WsConnectionError {
78    fn from(kind: WsDataFrameKind) -> Self {
79        WsConnectionError::UnexpectedFrameKind(kind)
80    }
81}