#![allow(clippy::not_unsafe_ptr_arg_deref)]
use std::pin::Pin;
use futures::{channel::mpsc, FutureExt, Sink, SinkExt, Stream, StreamExt};
use log::trace;
use wasm_bindgen::prelude::*;
use super::{WasmExchange, WasmFuture, WasmResult, WasmResultStatus};
use crate::connection::{
multiplexer::{multiplex, MultiplexedChannelEvent},
upstream::UpstreamConnectionControl,
};
pub struct WasmUpstreamConnection {
_upstream_control: UpstreamConnectionControl,
message_sink: Pin<Box<dyn Sink<MultiplexedChannelEvent, Error = mpsc::SendError> + Send + Sync>>,
message_stream: Pin<Box<dyn Stream<Item = MultiplexedChannelEvent> + Send + Sync>>,
}
#[wasm_bindgen]
pub fn hakuban_upstream_connection_new(exchange_pointer: *mut WasmExchange, local_name: String, local_address: String) -> *mut WasmUpstreamConnection {
trace!("Constructing upstream connection");
let exchange = unsafe { exchange_pointer.as_mut().unwrap() };
let (upstream_control, incoming_channels, outgoing_channels) = UpstreamConnectionControl::new(&exchange.exchange, local_name, local_address, vec![]);
let (message_sink, message_stream) = multiplex(true, incoming_channels, outgoing_channels, upstream_control.termination().clone());
let wasm_upstream_connection =
WasmUpstreamConnection { _upstream_control: upstream_control, message_sink: Box::pin(message_sink), message_stream: Box::pin(message_stream) };
Box::into_raw(Box::new(wasm_upstream_connection))
}
#[wasm_bindgen]
pub fn hakuban_upstream_connection_drop(wasm_upstream_connection: *mut WasmUpstreamConnection) {
let wasm_upstream_connection = unsafe { Box::from_raw(wasm_upstream_connection) };
trace!("Dropping upstream connection");
drop(wasm_upstream_connection);
}
#[wasm_bindgen]
pub fn hakuban_upstream_connection_next_message_to_network(wasm_upstream_connection: *mut WasmUpstreamConnection) -> *mut WasmFuture {
let wasm_upstream_connection = unsafe { wasm_upstream_connection.as_mut().unwrap() };
Box::into_raw(Box::new(WasmFuture::new(wasm_upstream_connection.message_stream.next().map(|item| {
item.map(|message| {
trace!("outgoing message: {:?}", message);
WasmResult::pointer(message)
})
.unwrap_or(WasmResult::end_of_stream())
}))))
}
#[wasm_bindgen]
pub fn hakuban_message_serialize(message_pointer: *mut MultiplexedChannelEvent) -> Box<[u8]> {
let message = unsafe { Box::from_raw(message_pointer) };
bincode::encode_to_vec(message, bincode::config::standard()).unwrap().into_boxed_slice()
}
#[wasm_bindgen]
pub fn hakuban_upstream_connection_send_message_from_network(
wasm_upstream_connection: *mut WasmUpstreamConnection,
message_data: Box<[u8]>,
) -> *mut WasmFuture {
let wasm_upstream_connection = unsafe { wasm_upstream_connection.as_mut().unwrap() };
let (message, _decoded_bytes_count) = bincode::decode_from_slice::<MultiplexedChannelEvent, _>(&message_data, bincode::config::standard()).unwrap();
trace!("incoming message: {:?}", message);
Box::into_raw(Box::new(WasmFuture::new(wasm_upstream_connection.message_sink.send(message).map(|result| match result {
Ok(()) => WasmResult::ok(),
Err(error) => WasmResult::error(WasmResultStatus::ConnectionTerminated, Some(format!("{:?}", error))),
}))))
}