#![allow(clippy::not_unsafe_ptr_arg_deref)]
use std::sync::Arc;
use futures::{stream::StreamExt, FutureExt, SinkExt};
use log::trace;
use super::{ffi_future::FFIFuture, FFIObjectDescriptor, FFIObjectState, FFIResult};
use crate::object::state_sink::{ObjectStateSink, ObjectStateSinkParams};
pub struct FFIObjectStateSink {
sink: ObjectStateSink,
send_future: Arc<FFIFuture>,
next_future: Arc<FFIFuture>,
}
impl FFIObjectStateSink {
pub(super) fn new(sink: ObjectStateSink) -> FFIObjectStateSink {
FFIObjectStateSink { sink, send_future: FFIFuture::empty(), next_future: FFIFuture::empty() }
}
}
#[no_mangle]
pub extern "C" fn hakuban_object_state_sink_drop(object_state_sink_pointer: *mut FFIObjectStateSink) {
let object_state_sink = unsafe { Box::from_raw(object_state_sink_pointer) };
object_state_sink.send_future.close();
object_state_sink.next_future.close();
trace!("ssdr {:?}", object_state_sink.sink.descriptor());
drop(object_state_sink);
}
#[no_mangle]
pub extern "C" fn hakuban_object_state_sink_send(
object_state_sink_pointer: *mut FFIObjectStateSink,
object_state_pointer: *mut FFIObjectState,
) -> *mut Arc<FFIFuture> {
let object_state_sink = unsafe { object_state_sink_pointer.as_mut().unwrap() };
let ffi_object_state: &FFIObjectState = unsafe { object_state_pointer.as_mut().unwrap() };
object_state_sink.send_future = FFIFuture::new(object_state_sink.sink.send(ffi_object_state.object_state.clone()).map(|result| match result {
Ok(()) => FFIResult::ok(),
Err(()) => panic!(),
}));
Box::into_raw(Box::new(object_state_sink.send_future.clone()))
}
#[no_mangle]
pub extern "C" fn hakuban_object_state_sink_next(object_state_sink_pointer: *mut FFIObjectStateSink) -> *mut Arc<FFIFuture> {
let object_state_sink = unsafe { object_state_sink_pointer.as_mut().unwrap() };
object_state_sink.next_future =
FFIFuture::new(object_state_sink.sink.next().map(|item| item.map(FFIResult::pointer).unwrap_or(FFIResult::end_of_stream())));
Box::into_raw(Box::new(object_state_sink.next_future.clone()))
}
#[no_mangle]
pub extern "C" fn hakuban_object_state_sink_descriptor(object_state_sink_pointer: *mut FFIObjectStateSink) -> *mut FFIObjectDescriptor {
let object_state_sink = unsafe { object_state_sink_pointer.as_mut().unwrap() };
Box::into_raw(Box::new(object_state_sink.sink.descriptor().into()))
}
#[no_mangle]
pub extern "C" fn hakuban_object_state_sink_params_drop(object_state_sink_params_pointer: *mut ObjectStateSinkParams) {
drop(unsafe { Box::from_raw(object_state_sink_params_pointer) });
}
#[no_mangle]
pub extern "C" fn hakuban_object_state_sink_params_clone(object_state_sink_params_pointer: *mut ObjectStateSinkParams) -> *mut ObjectStateSinkParams {
let object_state_sink_params: &mut ObjectStateSinkParams = unsafe { object_state_sink_params_pointer.as_mut().unwrap() };
Box::into_raw(Box::new(object_state_sink_params.clone()))
}