async_wsocket/
socket.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright (c) 2022-2024 Yuki Kishimoto
// Distributed under the MIT software license

use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

#[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
use arti_client::DataStream;
use futures_util::{Sink, Stream};
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use url::Url;

#[cfg(target_arch = "wasm32")]
use crate::wasm::WsStream;
use crate::{ConnectionMode, Error, Message};

#[cfg(not(target_arch = "wasm32"))]
type WsStream<T> = WebSocketStream<MaybeTlsStream<T>>;

pub enum WebSocket {
    #[cfg(not(target_arch = "wasm32"))]
    Tokio(WsStream<TcpStream>),
    #[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
    Tor(WsStream<DataStream>),
    #[cfg(target_arch = "wasm32")]
    Wasm(WsStream),
}

impl WebSocket {
    pub async fn connect(
        url: &Url,
        _mode: &ConnectionMode,
        timeout: Duration,
    ) -> Result<Self, Error> {
        #[cfg(not(target_arch = "wasm32"))]
        let socket: WebSocket = crate::native::connect(url, _mode, timeout).await?;

        #[cfg(target_arch = "wasm32")]
        let socket: WebSocket = crate::wasm::connect(url, timeout).await?;

        Ok(socket)
    }
}

impl Sink<Message> for WebSocket {
    type Error = Error;

    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        match self.deref_mut() {
            #[cfg(not(target_arch = "wasm32"))]
            Self::Tokio(s) => Pin::new(s).poll_ready(cx).map_err(Into::into),
            #[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
            Self::Tor(s) => Pin::new(s).poll_ready(cx).map_err(Into::into),
            #[cfg(target_arch = "wasm32")]
            Self::Wasm(s) => Pin::new(s).poll_ready(cx).map_err(Into::into),
        }
    }

    fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
        match self.deref_mut() {
            #[cfg(not(target_arch = "wasm32"))]
            Self::Tokio(s) => Pin::new(s).start_send(item.into()).map_err(Into::into),
            #[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
            Self::Tor(s) => Pin::new(s).start_send(item.into()).map_err(Into::into),
            #[cfg(target_arch = "wasm32")]
            Self::Wasm(s) => Pin::new(s).start_send(item).map_err(Into::into),
        }
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        match self.deref_mut() {
            #[cfg(not(target_arch = "wasm32"))]
            Self::Tokio(s) => Pin::new(s).poll_flush(cx).map_err(Into::into),
            #[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
            Self::Tor(s) => Pin::new(s).poll_flush(cx).map_err(Into::into),
            #[cfg(target_arch = "wasm32")]
            Self::Wasm(s) => Pin::new(s).poll_flush(cx).map_err(Into::into),
        }
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        match self.deref_mut() {
            #[cfg(not(target_arch = "wasm32"))]
            Self::Tokio(s) => Pin::new(s).poll_close(cx).map_err(Into::into),
            #[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
            Self::Tor(s) => Pin::new(s).poll_close(cx).map_err(Into::into),
            #[cfg(target_arch = "wasm32")]
            Self::Wasm(s) => Pin::new(s).poll_close(cx).map_err(Into::into),
        }
    }
}

impl Stream for WebSocket {
    type Item = Result<Message, Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.deref_mut() {
            #[cfg(not(target_arch = "wasm32"))]
            Self::Tokio(s) => Pin::new(s)
                .poll_next(cx)
                .map(|i| i.map(|res| res.map(Message::from_native)))
                .map_err(Into::into),
            #[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
            Self::Tor(s) => Pin::new(s)
                .poll_next(cx)
                .map(|i| i.map(|res| res.map(Message::from_native)))
                .map_err(Into::into),
            #[cfg(target_arch = "wasm32")]
            Self::Wasm(s) => Pin::new(s).poll_next(cx).map_err(Into::into),
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        match self {
            #[cfg(not(target_arch = "wasm32"))]
            Self::Tokio(s) => s.size_hint(),
            #[cfg(all(feature = "tor", not(target_arch = "wasm32")))]
            Self::Tor(s) => s.size_hint(),
            #[cfg(target_arch = "wasm32")]
            Self::Wasm(s) => s.size_hint(),
        }
    }
}