aggligator_transport_websocket_web/
lib.rs

1#![warn(missing_docs)]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/surban/aggligator/master/.misc/aggligator.png",
5    html_favicon_url = "https://raw.githubusercontent.com/surban/aggligator/master/.misc/aggligator.png",
6    issue_tracker_base_url = "https://github.com/surban/aggligator/issues/"
7)]
8
9//! [Aggligator](aggligator) transport: WebSocket on the web targeting WebAssembly.
10
11#[cfg(not(target_family = "wasm"))]
12compile_error!("aggligator-transport-websocket-web requires a WebAssembly target");
13
14use async_trait::async_trait;
15use bytes::Bytes;
16use futures::{future, StreamExt};
17use std::{
18    any::Any,
19    cmp::Ordering,
20    collections::HashSet,
21    fmt,
22    hash::{Hash, Hasher},
23    io::{Error, ErrorKind, Result},
24    sync::Arc,
25};
26use threadporter::{thread_bound, ThreadBound};
27use tokio::sync::watch;
28use tokio_util::io::{SinkWriter, StreamReader};
29
30#[doc(no_inline)]
31pub use websocket_web::{Interface, WebSocketBuilder};
32
33use aggligator::{
34    control::Direction,
35    io::{IoBox, StreamBox},
36    transport::{ConnectingTransport, LinkTag, LinkTagBox},
37};
38
39static NAME: &str = "websocket";
40
41/// Link tag for outgoing WebSocket link.
42#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub struct OutgoingWebSocketLinkTag {
44    /// Remote URL.
45    pub url: String,
46}
47
48impl fmt::Display for OutgoingWebSocketLinkTag {
49    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50        write!(f, "{}", &self.url)
51    }
52}
53
54impl LinkTag for OutgoingWebSocketLinkTag {
55    fn transport_name(&self) -> &str {
56        NAME
57    }
58
59    fn direction(&self) -> Direction {
60        Direction::Outgoing
61    }
62
63    fn user_data(&self) -> Vec<u8> {
64        "web".into()
65    }
66
67    fn as_any(&self) -> &dyn Any {
68        self
69    }
70
71    fn box_clone(&self) -> LinkTagBox {
72        Box::new(self.clone())
73    }
74
75    fn dyn_cmp(&self, other: &dyn LinkTag) -> Ordering {
76        let other = other.as_any().downcast_ref::<Self>().unwrap();
77        Ord::cmp(self, other)
78    }
79
80    fn dyn_hash(&self, mut state: &mut dyn Hasher) {
81        Hash::hash(self, &mut state)
82    }
83}
84
85/// WebSocket transport for outgoing connections using the browser's WebSocket API.
86///
87/// This transport is packet-based.
88#[derive(Clone)]
89pub struct WebSocketConnector {
90    urls: Vec<String>,
91    cfg_fn: Arc<dyn Fn(&mut WebSocketBuilder) + Send + Sync + 'static>,
92}
93
94impl fmt::Debug for WebSocketConnector {
95    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
96        f.debug_struct("WebSocketConnector").field("urls", &self.urls).finish()
97    }
98}
99
100impl fmt::Display for WebSocketConnector {
101    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102        let urls: Vec<_> = self.urls.iter().map(|url| url.to_string()).collect();
103        if self.urls.len() > 1 {
104            write!(f, "[{}]", urls.join(", "))
105        } else {
106            write!(f, "{}", &urls[0])
107        }
108    }
109}
110
111impl WebSocketConnector {
112    /// Create a new WebSocket transport for outgoing connections.
113    ///
114    /// `urls` contains one or more WebSocket URLs of the target.
115    ///
116    /// Name resolution and certificate validation is handled by the browser.
117    pub async fn new(urls: impl IntoIterator<Item = impl AsRef<str>>) -> Result<Self> {
118        let urls = urls.into_iter().map(|url| url.as_ref().to_string()).collect::<Vec<_>>();
119
120        if urls.is_empty() {
121            return Err(Error::new(ErrorKind::InvalidInput, "at least one URL is required"));
122        }
123
124        Ok(Self { urls, cfg_fn: Arc::new(|_| ()) })
125    }
126
127    /// Sets the configuration function that is applied to each
128    /// [WebSocket builder](WebSocketBuilder) before it is connected.
129    pub fn set_cfg(&mut self, cfg_fn: impl Fn(&mut WebSocketBuilder) + Send + Sync + 'static) {
130        self.cfg_fn = Arc::new(cfg_fn);
131    }
132}
133
134#[async_trait]
135impl ConnectingTransport for WebSocketConnector {
136    fn name(&self) -> &str {
137        NAME
138    }
139
140    async fn link_tags(&self, tx: watch::Sender<HashSet<LinkTagBox>>) -> Result<()> {
141        let mut tags: HashSet<LinkTagBox> = HashSet::new();
142        for url in &self.urls {
143            tags.insert(Box::new(OutgoingWebSocketLinkTag { url: url.clone() }));
144        }
145
146        tx.send_replace(tags);
147
148        future::pending().await
149    }
150
151    async fn connect(&self, tag: &dyn LinkTag) -> Result<StreamBox> {
152        let tag: &OutgoingWebSocketLinkTag = tag.as_any().downcast_ref().unwrap();
153
154        // WebSocket is not Send + Sync, thus we need to wrap the following
155        // code in a ThreadBound. It ensures that execution takes place on
156        // a single thread but appears to be Send + Sync.
157        thread_bound(async {
158            // Configure WebSocket.
159            let mut builder = WebSocketBuilder::new(&tag.url);
160            (self.cfg_fn)(&mut builder);
161
162            // Establish WebSocket connection.
163            let websocket = builder.connect().await?;
164
165            // Adapt WebSocket IO.
166            let (tx, rx) = websocket.into_split();
167            let write = SinkWriter::new(ThreadBound::new(tx));
168            let read =
169                StreamReader::new(ThreadBound::new(rx.map(|res| res.map(|msg| Bytes::from(msg.to_vec())))));
170
171            Ok(IoBox::new(read, write).into())
172        })
173        .await
174    }
175}