futuresdr/blocks/
websocket_sink.rs1use async_io::Async;
2use async_tungstenite::WebSocketStream;
3use async_tungstenite::tungstenite::Message;
4use futures::Stream;
5use futures::future;
6use futures::future::Either;
7use futures::sink::Sink;
8use futures::sink::SinkExt;
9use std::marker::PhantomData;
10use std::mem::size_of;
11use std::net::SocketAddr;
12use std::net::TcpListener;
13use std::net::TcpStream;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::Context;
17use std::task::Poll;
18
19use crate::prelude::*;
20
21pub enum WebsocketSinkMode {
23 Blocking,
25 FixedBlocking(usize),
27 FixedDropping(usize),
29}
30
31#[derive(Block)]
33pub struct WebsocketSink<T: CpuSample, I: CpuBufferReader<Item = T> = DefaultCpuReader<T>> {
34 #[input]
35 input: I,
36 port: u32,
37 listener: Option<Arc<Async<TcpListener>>>,
38 conn: Option<WsStream>,
39 mode: WebsocketSinkMode,
40 _p: PhantomData<T>,
41}
42
43impl<T, I> WebsocketSink<T, I>
44where
45 T: CpuSample,
46 I: CpuBufferReader<Item = T>,
47{
48 pub fn new(port: u32, mode: WebsocketSinkMode) -> Self {
50 Self {
51 input: I::default(),
52 port,
53 listener: None,
54 conn: None,
55 mode,
56 _p: PhantomData,
57 }
58 }
59}
60
61#[doc(hidden)]
62impl<T, I> Kernel for WebsocketSink<T, I>
63where
64 T: CpuSample,
65 I: CpuBufferReader<Item = T>,
66{
67 async fn work(
68 &mut self,
69 io: &mut WorkIo,
70 _mio: &mut MessageOutputs,
71 _meta: &mut BlockMeta,
72 ) -> Result<()> {
73 if self.input.finished() {
74 io.finished = true;
75 }
76
77 let i = self.input.slice();
78 let i_len = i.len();
79
80 if let Some(ref mut conn) = self.conn {
81 if i.is_empty() {
82 return Ok(());
83 }
84
85 let mut v = Vec::new();
86
87 match &self.mode {
88 WebsocketSinkMode::Blocking => {
89 v.extend_from_slice(i);
90 self.input.consume(i_len);
91 }
92 WebsocketSinkMode::FixedBlocking(block_size) => {
93 if *block_size <= i_len {
94 v.extend_from_slice(&i[0..*block_size]);
95 self.input.consume(*block_size);
96 }
97 }
98 WebsocketSinkMode::FixedDropping(block_size) => {
99 let n = i_len / block_size;
100 if n != 0 {
101 v.extend_from_slice(&i[((n - 1) * block_size)..(n * block_size)]);
102 self.input.consume(n * block_size);
103 }
104 }
105 }
106
107 if !v.is_empty() {
108 let acc = Box::pin(
109 self.listener
110 .as_ref()
111 .ok_or_else(|| Error::RuntimeError("no listener".to_string()))?
112 .accept(),
113 );
114
115 let len = v.len() * size_of::<T>();
116 let cap = v.capacity() * size_of::<T>();
117 let ptr = v.as_ptr() as *mut u8;
118
119 std::mem::forget(v);
121
122 let v = unsafe { Vec::from_raw_parts(ptr, len, cap) };
123 let send = conn.send(Message::Binary(v.into()));
124
125 match future::select(acc, send).await {
126 Either::Left((a, _)) => {
127 if let Ok((stream, _)) = a {
128 self.conn = Some(WsStream {
129 inner: async_tungstenite::accept_async(stream).await?,
130 });
131 }
132 }
133 Either::Right((s, _)) => {
134 if s.is_err() {
135 debug!("websocket: client disconnected");
136 self.conn = None;
137 }
138 }
139 }
140 }
141 } else {
142 match self
143 .listener
144 .as_ref()
145 .ok_or_else(|| Error::RuntimeError("no listener".to_string()))?
146 .get_ref()
147 .accept()
148 {
149 Ok((stream, socket)) => {
150 debug!("Websocket Accepted client: {}", socket);
151 self.conn = Some(WsStream {
152 inner: async_tungstenite::accept_async(Async::new(stream)?).await?,
153 });
154 io.call_again = true;
155 }
156 _ => {
157 if let WebsocketSinkMode::FixedDropping(block_size) = &self.mode {
158 let n = i_len / block_size;
159 self.input.consume(n * block_size);
160 }
161
162 let l = self.listener.as_ref().unwrap().clone();
163 io.block_on(async move {
164 l.readable().await.unwrap();
165 });
166 }
167 }
168 }
169
170 Ok(())
171 }
172
173 async fn init(&mut self, _mio: &mut MessageOutputs, _meta: &mut BlockMeta) -> Result<()> {
174 self.listener = Some(Arc::new(Async::<TcpListener>::bind(
175 format!("0.0.0.0:{}", self.port).parse::<SocketAddr>()?,
176 )?));
177 Ok(())
178 }
179}
180
181pub struct WebsocketSinkBuilder<T> {
183 port: u32,
184 mode: WebsocketSinkMode,
185 _p: PhantomData<T>,
186}
187
188impl<T: CpuSample> WebsocketSinkBuilder<T> {
189 pub fn new(port: u32) -> WebsocketSinkBuilder<T> {
191 WebsocketSinkBuilder {
192 port,
193 mode: WebsocketSinkMode::Blocking,
194 _p: PhantomData,
195 }
196 }
197
198 #[must_use]
200 pub fn mode(mut self, mode: WebsocketSinkMode) -> WebsocketSinkBuilder<T> {
201 self.mode = mode;
202 self
203 }
204
205 pub fn build(self) -> WebsocketSink<T> {
207 WebsocketSink::<T>::new(self.port, self.mode)
208 }
209}
210
211struct WsStream {
212 inner: WebSocketStream<Async<TcpStream>>,
213}
214
215impl Sink<Message> for WsStream {
216 type Error = async_tungstenite::tungstenite::Error;
217
218 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
219 Pin::new(&mut self.inner).poll_ready(cx)
220 }
221
222 fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
223 Pin::new(&mut self.inner).start_send(item)
224 }
225
226 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
227 Pin::new(&mut self.inner).poll_flush(cx)
228 }
229
230 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
231 Pin::new(&mut self.inner).poll_close(cx)
232 }
233}
234
235impl Stream for WsStream {
236 type Item = async_tungstenite::tungstenite::Result<Message>;
237
238 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
239 Pin::new(&mut self.inner).poll_next(cx)
240 }
241}