hakuban 0.8.5

Data-object sharing library
Documentation
#![allow(clippy::not_unsafe_ptr_arg_deref)]

use std::pin::Pin;

use futures::{channel::mpsc, FutureExt, Sink, SinkExt, Stream, StreamExt};
use log::trace;
use wasm_bindgen::prelude::*;

use super::{WasmExchange, WasmFuture, WasmResult, WasmResultStatus};
use crate::connection::{
	multiplexer::{multiplex, MultiplexedChannelEvent},
	upstream::UpstreamConnectionControl,
};

pub struct WasmUpstreamConnection {
	_upstream_control: UpstreamConnectionControl,
	message_sink: Pin<Box<dyn Sink<MultiplexedChannelEvent, Error = mpsc::SendError> + Send + Sync>>,
	message_stream: Pin<Box<dyn Stream<Item = MultiplexedChannelEvent> + Send + Sync>>,
}

//TODO: check if all of this is leaking horribly
#[wasm_bindgen]
pub fn hakuban_upstream_connection_new(exchange_pointer: *mut WasmExchange, local_name: String, local_address: String) -> *mut WasmUpstreamConnection {
	trace!("Constructing upstream connection");
	let exchange = unsafe { exchange_pointer.as_mut().unwrap() };
	let (upstream_control, incoming_channels, outgoing_channels) = UpstreamConnectionControl::new(&exchange.exchange, local_name, local_address, vec![]);
	let (message_sink, message_stream) = multiplex(true, incoming_channels, outgoing_channels, upstream_control.termination().clone());
	let wasm_upstream_connection =
		WasmUpstreamConnection { _upstream_control: upstream_control, message_sink: Box::pin(message_sink), message_stream: Box::pin(message_stream) };
	Box::into_raw(Box::new(wasm_upstream_connection))
}

#[wasm_bindgen]
pub fn hakuban_upstream_connection_drop(wasm_upstream_connection: *mut WasmUpstreamConnection) {
	let wasm_upstream_connection = unsafe { Box::from_raw(wasm_upstream_connection) };
	trace!("Dropping upstream connection");
	drop(wasm_upstream_connection);
}

#[wasm_bindgen]
pub fn hakuban_upstream_connection_next_message_to_network(wasm_upstream_connection: *mut WasmUpstreamConnection) -> *mut WasmFuture {
	let wasm_upstream_connection = unsafe { wasm_upstream_connection.as_mut().unwrap() };
	Box::into_raw(Box::new(WasmFuture::new(wasm_upstream_connection.message_stream.next().map(|item| {
		item.map(|message| {
			trace!("outgoing message: {:?}", message);
			WasmResult::pointer(message)
		})
		.unwrap_or(WasmResult::end_of_stream())
	}))))
}

#[wasm_bindgen]
pub fn hakuban_message_serialize(message_pointer: *mut MultiplexedChannelEvent) -> Box<[u8]> {
	let message = unsafe { Box::from_raw(message_pointer) };
	bincode::encode_to_vec(message, bincode::config::standard()).unwrap().into_boxed_slice()
}

#[wasm_bindgen]
pub fn hakuban_upstream_connection_send_message_from_network(
	wasm_upstream_connection: *mut WasmUpstreamConnection,
	message_data: Box<[u8]>,
) -> *mut WasmFuture {
	let wasm_upstream_connection = unsafe { wasm_upstream_connection.as_mut().unwrap() };
	let (message, _decoded_bytes_count) = bincode::decode_from_slice::<MultiplexedChannelEvent, _>(&message_data, bincode::config::standard()).unwrap();
	trace!("incoming message: {:?}", message);
	Box::into_raw(Box::new(WasmFuture::new(wasm_upstream_connection.message_sink.send(message).map(|result| match result {
		Ok(()) => WasmResult::ok(),
		Err(error) => WasmResult::error(WasmResultStatus::ConnectionTerminated, Some(format!("{:?}", error))),
	}))))
}