use futures::SinkExt;
use futures::StreamExt;
use gloo_net::websocket::Message;
use gloo_net::websocket::WebSocketError::ConnectionClose;
use gloo_net::websocket::WebSocketError::ConnectionError;
use gloo_net::websocket::WebSocketError::MessageSendError;
use gloo_net::websocket::futures::WebSocket;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::RwLock;
use wasm_bindgen_futures::spawn_local;
use crate::prelude::*;
#[derive(Block)]
pub struct WasmWsSink<T>
where
T: CpuSample,
{
#[input]
input: slab::Reader<T>,
data_sender: mpsc::Sender<Vec<u8>>,
data_storage: Vec<u8>,
iterations_per_send: usize,
ws_error: Arc<RwLock<bool>>,
_p: PhantomData<T>,
}
impl<T> WasmWsSink<T>
where
T: CpuSample,
{
pub fn new(url: String, iterations_per_send: usize) -> Self {
let (sender, mut receiver) = mpsc::channel::<Vec<u8>>(1);
let ws_error = Arc::new(RwLock::new(false));
let ws_error_clone = ws_error.clone();
spawn_local(async move {
if let Ok(mut conn) = WebSocket::open(&url) {
while let Some(v) = receiver.next().await {
if let Err(error) = conn.send(Message::Bytes(v)).await {
match error {
ConnectionError => {
*ws_error_clone.write().expect("Lock is poisoned") = true;
}
ConnectionClose(_close_event) => {
*ws_error_clone.write().expect("Lock is poisoned") = true;
}
MessageSendError(_) => {
*ws_error_clone.write().expect("Lock is poisoned") = true;
}
_ => {
panic!("New gloo_net websocket error");
}
}
}
}
} else {
*ws_error_clone.write().expect("Lock is poisoned") = true;
}
});
WasmWsSink {
input: slab::Reader::default(),
data_sender: sender,
data_storage: Vec::new(),
iterations_per_send,
ws_error,
_p: PhantomData,
}
}
}
#[doc(hidden)]
impl<T> Kernel for WasmWsSink<T>
where
T: CpuSample,
{
async fn work(
&mut self,
io: &mut WorkIo,
_mio: &mut MessageOutputs,
_meta: &mut BlockMeta,
) -> Result<()> {
if *self.ws_error.read().expect("Lock is poisoned") {
anyhow::bail!("WebSocket Error");
}
let i = self.input.slice();
let i_len = i.len();
if i.is_empty() {
return Ok(());
}
let items_to_process_per_run = 2048;
if items_to_process_per_run <= i_len {
let len_bytes = items_to_process_per_run * std::mem::size_of::<T>();
let s = unsafe { std::slice::from_raw_parts(i.as_ptr() as *const u8, len_bytes) };
self.data_storage.extend_from_slice(s);
self.input.consume(items_to_process_per_run);
if self.data_storage.len()
>= items_to_process_per_run * self.iterations_per_send * std::mem::size_of::<T>()
{
let mut movable_vector = Vec::with_capacity(
items_to_process_per_run * self.iterations_per_send * std::mem::size_of::<T>(),
);
std::mem::swap(&mut self.data_storage, &mut movable_vector);
self.data_sender
.send(movable_vector)
.await
.expect("Receiver has been dropped, we cannot gracefully recover");
}
}
if (i_len - items_to_process_per_run) >= items_to_process_per_run {
io.call_again = true;
}
if self.input.finished() {
io.finished = true;
}
Ok(())
}
}