async_ws/connection/
mod.rs1mod config;
2mod decode;
3mod encode;
4mod inner;
5mod open;
6mod reader;
7mod send;
8mod waker;
9mod writer;
10
11pub use crate::connection::config::WsConfig;
12pub use crate::connection::reader::WsMessageReader;
13pub use crate::connection::send::WsSend;
14pub use crate::connection::writer::WsMessageWriter;
15
16use crate::connection::inner::WsConnectionInner;
17use crate::connection::waker::{new_waker, Wakers};
18use crate::frame::{FrameDecodeError, WsDataFrameKind};
19use crate::message::WsMessageKind;
20use futures::prelude::*;
21use std::ops::DerefMut;
22use std::pin::Pin;
23use std::sync::{Arc, Mutex};
24use std::task::{Context, Poll};
25
26pub struct WsConnection<T: AsyncRead + AsyncWrite + Unpin> {
27 parent: Arc<Mutex<(WsConnectionInner<T>, Wakers)>>,
28}
29
30impl<T: AsyncRead + AsyncWrite + Unpin> WsConnection<T> {
31 pub fn with_config(transport: T, config: WsConfig) -> Self {
32 Self {
33 parent: Arc::new(Mutex::new((
34 WsConnectionInner::with_config(transport, config),
35 Wakers::default(),
36 ))),
37 }
38 }
39 pub fn send(&self, kind: WsMessageKind) -> WsSend<T> {
40 WsSend::new(&self.parent, kind)
41 }
42 pub fn err(&self) -> Option<Arc<WsConnectionError>> {
43 self.parent.lock().unwrap().0.err()
44 }
45}
46
47impl<T: AsyncRead + AsyncWrite + Unpin> Stream for WsConnection<T> {
48 type Item = WsMessageReader<T>;
49
50 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 let mut guard = self.parent.lock().unwrap();
52 let (inner, wakers) = guard.deref_mut();
53 wakers.stream_waker = Some(cx.waker().clone());
54 let waker = new_waker(Arc::downgrade(&self.parent));
55 inner
56 .poll_next_reader(&mut Context::from_waker(&waker))
57 .map(|o| o.map(|kind| WsMessageReader::new(kind, &self.parent)))
58 }
59}
60
61#[derive(thiserror::Error, Debug)]
62pub enum WsConnectionError {
63 #[error("invalid utf8 in text message")]
64 InvalidUtf8,
65 #[error("incomplete utf8 in text message")]
66 IncompleteUtf8,
67 #[error("io error: {0}")]
68 Io(#[from] std::io::Error),
69 #[error("parse error: {0}")]
70 FrameDecodeError(#[from] FrameDecodeError),
71 #[error("timeout")]
72 Timeout,
73 #[error("unexpected frame kind {0}")]
74 UnexpectedFrameKind(WsDataFrameKind),
75}
76
77impl From<WsDataFrameKind> for WsConnectionError {
78 fn from(kind: WsDataFrameKind) -> Self {
79 WsConnectionError::UnexpectedFrameKind(kind)
80 }
81}