aggligator_transport_websocket_web/
lib.rs1#![warn(missing_docs)]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/surban/aggligator/master/.misc/aggligator.png",
5 html_favicon_url = "https://raw.githubusercontent.com/surban/aggligator/master/.misc/aggligator.png",
6 issue_tracker_base_url = "https://github.com/surban/aggligator/issues/"
7)]
8
9#[cfg(not(target_family = "wasm"))]
12compile_error!("aggligator-transport-websocket-web requires a WebAssembly target");
13
14use async_trait::async_trait;
15use bytes::Bytes;
16use futures::{future, StreamExt};
17use std::{
18 any::Any,
19 cmp::Ordering,
20 collections::HashSet,
21 fmt,
22 hash::{Hash, Hasher},
23 io::{Error, ErrorKind, Result},
24 sync::Arc,
25};
26use threadporter::{thread_bound, ThreadBound};
27use tokio::sync::watch;
28use tokio_util::io::{SinkWriter, StreamReader};
29
30#[doc(no_inline)]
31pub use websocket_web::{Interface, WebSocketBuilder};
32
33use aggligator::{
34 control::Direction,
35 io::{IoBox, StreamBox},
36 transport::{ConnectingTransport, LinkTag, LinkTagBox},
37};
38
39static NAME: &str = "websocket";
40
41#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub struct OutgoingWebSocketLinkTag {
44 pub url: String,
46}
47
48impl fmt::Display for OutgoingWebSocketLinkTag {
49 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50 write!(f, "{}", &self.url)
51 }
52}
53
54impl LinkTag for OutgoingWebSocketLinkTag {
55 fn transport_name(&self) -> &str {
56 NAME
57 }
58
59 fn direction(&self) -> Direction {
60 Direction::Outgoing
61 }
62
63 fn user_data(&self) -> Vec<u8> {
64 "web".into()
65 }
66
67 fn as_any(&self) -> &dyn Any {
68 self
69 }
70
71 fn box_clone(&self) -> LinkTagBox {
72 Box::new(self.clone())
73 }
74
75 fn dyn_cmp(&self, other: &dyn LinkTag) -> Ordering {
76 let other = other.as_any().downcast_ref::<Self>().unwrap();
77 Ord::cmp(self, other)
78 }
79
80 fn dyn_hash(&self, mut state: &mut dyn Hasher) {
81 Hash::hash(self, &mut state)
82 }
83}
84
85#[derive(Clone)]
89pub struct WebSocketConnector {
90 urls: Vec<String>,
91 cfg_fn: Arc<dyn Fn(&mut WebSocketBuilder) + Send + Sync + 'static>,
92}
93
94impl fmt::Debug for WebSocketConnector {
95 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
96 f.debug_struct("WebSocketConnector").field("urls", &self.urls).finish()
97 }
98}
99
100impl fmt::Display for WebSocketConnector {
101 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102 let urls: Vec<_> = self.urls.iter().map(|url| url.to_string()).collect();
103 if self.urls.len() > 1 {
104 write!(f, "[{}]", urls.join(", "))
105 } else {
106 write!(f, "{}", &urls[0])
107 }
108 }
109}
110
111impl WebSocketConnector {
112 pub async fn new(urls: impl IntoIterator<Item = impl AsRef<str>>) -> Result<Self> {
118 let urls = urls.into_iter().map(|url| url.as_ref().to_string()).collect::<Vec<_>>();
119
120 if urls.is_empty() {
121 return Err(Error::new(ErrorKind::InvalidInput, "at least one URL is required"));
122 }
123
124 Ok(Self { urls, cfg_fn: Arc::new(|_| ()) })
125 }
126
127 pub fn set_cfg(&mut self, cfg_fn: impl Fn(&mut WebSocketBuilder) + Send + Sync + 'static) {
130 self.cfg_fn = Arc::new(cfg_fn);
131 }
132}
133
134#[async_trait]
135impl ConnectingTransport for WebSocketConnector {
136 fn name(&self) -> &str {
137 NAME
138 }
139
140 async fn link_tags(&self, tx: watch::Sender<HashSet<LinkTagBox>>) -> Result<()> {
141 let mut tags: HashSet<LinkTagBox> = HashSet::new();
142 for url in &self.urls {
143 tags.insert(Box::new(OutgoingWebSocketLinkTag { url: url.clone() }));
144 }
145
146 tx.send_replace(tags);
147
148 future::pending().await
149 }
150
151 async fn connect(&self, tag: &dyn LinkTag) -> Result<StreamBox> {
152 let tag: &OutgoingWebSocketLinkTag = tag.as_any().downcast_ref().unwrap();
153
154 thread_bound(async {
158 let mut builder = WebSocketBuilder::new(&tag.url);
160 (self.cfg_fn)(&mut builder);
161
162 let websocket = builder.connect().await?;
164
165 let (tx, rx) = websocket.into_split();
167 let write = SinkWriter::new(ThreadBound::new(tx));
168 let read =
169 StreamReader::new(ThreadBound::new(rx.map(|res| res.map(|msg| Bytes::from(msg.to_vec())))));
170
171 Ok(IoBox::new(read, write).into())
172 })
173 .await
174 }
175}