futuresdr/blocks/
websocket_sink.rs

1use async_io::Async;
2use async_tungstenite::WebSocketStream;
3use async_tungstenite::tungstenite::Message;
4use futures::Stream;
5use futures::future;
6use futures::future::Either;
7use futures::sink::Sink;
8use futures::sink::SinkExt;
9use std::marker::PhantomData;
10use std::mem::size_of;
11use std::net::SocketAddr;
12use std::net::TcpListener;
13use std::net::TcpStream;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::Context;
17use std::task::Poll;
18
19use crate::prelude::*;
20
21/// Operation mode for [WebsocketSink].
22pub enum WebsocketSinkMode {
23    /// Backpressure. Block until all samples are sent.
24    Blocking,
25    /// Sent samples in fixed-size chunks. Block until all samples are sent.
26    FixedBlocking(usize),
27    /// Sent samples in fixed-size chunks. Drop first chunks if multiple are available in input buffer.
28    FixedDropping(usize),
29}
30
31/// Push samples in a WebSocket.
32#[derive(Block)]
33pub struct WebsocketSink<T: CpuSample, I: CpuBufferReader<Item = T> = DefaultCpuReader<T>> {
34    #[input]
35    input: I,
36    port: u32,
37    listener: Option<Arc<Async<TcpListener>>>,
38    conn: Option<WsStream>,
39    mode: WebsocketSinkMode,
40    _p: PhantomData<T>,
41}
42
43impl<T, I> WebsocketSink<T, I>
44where
45    T: CpuSample,
46    I: CpuBufferReader<Item = T>,
47{
48    /// Create WebsocketSink block
49    pub fn new(port: u32, mode: WebsocketSinkMode) -> Self {
50        Self {
51            input: I::default(),
52            port,
53            listener: None,
54            conn: None,
55            mode,
56            _p: PhantomData,
57        }
58    }
59}
60
61#[doc(hidden)]
62impl<T, I> Kernel for WebsocketSink<T, I>
63where
64    T: CpuSample,
65    I: CpuBufferReader<Item = T>,
66{
67    async fn work(
68        &mut self,
69        io: &mut WorkIo,
70        _mio: &mut MessageOutputs,
71        _meta: &mut BlockMeta,
72    ) -> Result<()> {
73        if self.input.finished() {
74            io.finished = true;
75        }
76
77        let i = self.input.slice();
78        let i_len = i.len();
79
80        if let Some(ref mut conn) = self.conn {
81            if i.is_empty() {
82                return Ok(());
83            }
84
85            let mut v = Vec::new();
86
87            match &self.mode {
88                WebsocketSinkMode::Blocking => {
89                    v.extend_from_slice(i);
90                    self.input.consume(i_len);
91                }
92                WebsocketSinkMode::FixedBlocking(block_size) => {
93                    if *block_size <= i_len {
94                        v.extend_from_slice(&i[0..*block_size]);
95                        self.input.consume(*block_size);
96                    }
97                }
98                WebsocketSinkMode::FixedDropping(block_size) => {
99                    let n = i_len / block_size;
100                    if n != 0 {
101                        v.extend_from_slice(&i[((n - 1) * block_size)..(n * block_size)]);
102                        self.input.consume(n * block_size);
103                    }
104                }
105            }
106
107            if !v.is_empty() {
108                let acc = Box::pin(
109                    self.listener
110                        .as_ref()
111                        .ok_or_else(|| Error::RuntimeError("no listener".to_string()))?
112                        .accept(),
113                );
114
115                let len = v.len() * size_of::<T>();
116                let cap = v.capacity() * size_of::<T>();
117                let ptr = v.as_ptr() as *mut u8;
118
119                // prevent original Vec from dropping
120                std::mem::forget(v);
121
122                let v = unsafe { Vec::from_raw_parts(ptr, len, cap) };
123                let send = conn.send(Message::Binary(v.into()));
124
125                match future::select(acc, send).await {
126                    Either::Left((a, _)) => {
127                        if let Ok((stream, _)) = a {
128                            self.conn = Some(WsStream {
129                                inner: async_tungstenite::accept_async(stream).await?,
130                            });
131                        }
132                    }
133                    Either::Right((s, _)) => {
134                        if s.is_err() {
135                            debug!("websocket: client disconnected");
136                            self.conn = None;
137                        }
138                    }
139                }
140            }
141        } else {
142            match self
143                .listener
144                .as_ref()
145                .ok_or_else(|| Error::RuntimeError("no listener".to_string()))?
146                .get_ref()
147                .accept()
148            {
149                Ok((stream, socket)) => {
150                    debug!("Websocket Accepted client: {}", socket);
151                    self.conn = Some(WsStream {
152                        inner: async_tungstenite::accept_async(Async::new(stream)?).await?,
153                    });
154                    io.call_again = true;
155                }
156                _ => {
157                    if let WebsocketSinkMode::FixedDropping(block_size) = &self.mode {
158                        let n = i_len / block_size;
159                        self.input.consume(n * block_size);
160                    }
161
162                    let l = self.listener.as_ref().unwrap().clone();
163                    io.block_on(async move {
164                        l.readable().await.unwrap();
165                    });
166                }
167            }
168        }
169
170        Ok(())
171    }
172
173    async fn init(&mut self, _mio: &mut MessageOutputs, _meta: &mut BlockMeta) -> Result<()> {
174        self.listener = Some(Arc::new(Async::<TcpListener>::bind(
175            format!("0.0.0.0:{}", self.port).parse::<SocketAddr>()?,
176        )?));
177        Ok(())
178    }
179}
180
181/// Build a [WebsocketSink].
182pub struct WebsocketSinkBuilder<T> {
183    port: u32,
184    mode: WebsocketSinkMode,
185    _p: PhantomData<T>,
186}
187
188impl<T: CpuSample> WebsocketSinkBuilder<T> {
189    /// Create WebsocketSink builder
190    pub fn new(port: u32) -> WebsocketSinkBuilder<T> {
191        WebsocketSinkBuilder {
192            port,
193            mode: WebsocketSinkMode::Blocking,
194            _p: PhantomData,
195        }
196    }
197
198    /// Set mode
199    #[must_use]
200    pub fn mode(mut self, mode: WebsocketSinkMode) -> WebsocketSinkBuilder<T> {
201        self.mode = mode;
202        self
203    }
204
205    /// Build WebsocketSink
206    pub fn build(self) -> WebsocketSink<T> {
207        WebsocketSink::<T>::new(self.port, self.mode)
208    }
209}
210
211struct WsStream {
212    inner: WebSocketStream<Async<TcpStream>>,
213}
214
215impl Sink<Message> for WsStream {
216    type Error = async_tungstenite::tungstenite::Error;
217
218    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
219        Pin::new(&mut self.inner).poll_ready(cx)
220    }
221
222    fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
223        Pin::new(&mut self.inner).start_send(item)
224    }
225
226    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
227        Pin::new(&mut self.inner).poll_flush(cx)
228    }
229
230    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
231        Pin::new(&mut self.inner).poll_close(cx)
232    }
233}
234
235impl Stream for WsStream {
236    type Item = async_tungstenite::tungstenite::Result<Message>;
237
238    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
239        Pin::new(&mut self.inner).poll_next(cx)
240    }
241}