use std::{sync::{Arc, Mutex}, task::{Poll, Waker}};
use futures::Sink;
use super::{core::ObjectCore, remote::RemoteObjectExposeContractSharedMutable, ObjectExposeContractShared, ObjectState};
use crate::{tag::{remote::RemoteTagExposeContractSharedMutable, TagExposeContractShared}, DataBytes, ObjectDescriptor};
#[derive(Eq, PartialEq, Clone)]
pub struct ObjectStateSinkParams {}
#[derive(Clone)]
pub struct ObjectStateSink {
pub(crate) drop_guard: Arc<ObjectStateSinkDropGuard>,
}
pub(crate) struct ObjectStateSinkDropGuard {
pub(crate) object_core: Arc<ObjectCore>,
pub(crate) shared: Arc<Mutex<ObjectStateSinkSharedMutable>>,
id: usize,
}
pub(crate) struct ObjectStateSinkSharedMutable {
contract: Option<ExposeContractMutable>,
drop_guard_dropped: bool,
assigned: bool,
last_reported_params: Option<Option<ObjectStateSinkParams>>,
waker: Vec<Waker>,
}
pub(crate) enum ExposeContractMutable {
Object(Arc<ObjectExposeContractShared>),
Tag(Arc<TagExposeContractShared>),
RemoteObject(Arc<Mutex<RemoteObjectExposeContractSharedMutable>>),
RemoteTag(Arc<Mutex<RemoteTagExposeContractSharedMutable>>),
}
impl ObjectStateSink {
pub(crate) fn new(object_core: Arc<ObjectCore>, contract: ExposeContractMutable) -> ObjectStateSink {
ObjectStateSink {
drop_guard: Arc::new(ObjectStateSinkDropGuard {
id: object_core.object_state_sinks_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
object_core,
shared: Arc::new(Mutex::new(ObjectStateSinkSharedMutable {
contract: Some(contract),
last_reported_params: None,
waker: Vec::new(),
drop_guard_dropped: false,
assigned: true,
})),
}),
}
}
pub fn descriptor(&self) -> &ObjectDescriptor {
&self.drop_guard.object_core.descriptor
}
pub fn current(&self) -> Option<ObjectStateSinkParams> {
let mut shared_mutable = self.drop_guard.shared.lock().unwrap();
shared_mutable.check_out_object_state_sink_params(&self.drop_guard.object_core).map(|(state, _changed)| state)
}
pub(crate) fn state_set(&self, object_state: ObjectState<DataBytes>) {
self.drop_guard.object_core.state_set(object_state, self.drop_guard.id);
self.drop_guard.object_core.local_exchange.object_changed(self.drop_guard.object_core.descriptor.clone());
}
pub fn desynchronize(&self) {
self.drop_guard.desynchronize();
}
}
impl ObjectStateSinkDropGuard {
pub fn desynchronize(&self) {
self.object_core.desynchronize(self.id);
self.object_core.local_exchange.object_changed(self.object_core.descriptor.clone());
}
}
impl Drop for ObjectStateSinkDropGuard {
fn drop(&mut self) {
self.desynchronize(); let contract = {
let mut shared_mutable = self.shared.lock().unwrap();
shared_mutable.drop_guard_dropped = true;
shared_mutable.wake();
shared_mutable.contract.take()
};
match contract {
Some(ExposeContractMutable::Object(object_contract)) => object_contract.object_state_sink_dropped(),
Some(ExposeContractMutable::Tag(tag_contract)) => tag_contract.object_state_sink_dropped(&self.object_core),
Some(ExposeContractMutable::RemoteObject(remote_interface)) => remote_interface.lock().unwrap().object_state_sink_dropped(),
Some(ExposeContractMutable::RemoteTag(remote_interface)) => remote_interface.lock().unwrap().object_state_sink_dropped(&self.object_core),
None => {}
}
}
}
impl ObjectStateSinkSharedMutable {
pub(crate) fn assign(&mut self) -> bool {
self.wake();
if self.last_reported_params == Some(None) {
false
} else {
assert!(!self.assigned);
self.assigned = true;
true
}
}
pub(crate) fn unassign(&mut self) -> bool {
self.wake();
if self.last_reported_params == Some(None) {
false
} else {
assert!(self.assigned);
self.assigned = false;
true
}
}
pub(crate) fn contract_terminated(&mut self) {
self.contract = None;
self.wake();
}
fn wake(&mut self) {
for waker in self.waker.drain(..) {
waker.wake();
}
}
fn check_out_object_state_sink_params(&mut self, _object_core: &Arc<ObjectCore>) -> Option<(ObjectStateSinkParams, bool)> {
if self.contract.is_none() || self.last_reported_params == Some(None) || !self.assigned {
self.last_reported_params = Some(None);
return None;
};
let object_state_sink_params = ObjectStateSinkParams {};
let changed = self.last_reported_params.is_none() || self.last_reported_params.as_ref().unwrap().as_ref() != Some(&object_state_sink_params);
if changed {
self.last_reported_params = Some(Some(object_state_sink_params));
}
Some((ObjectStateSinkParams {}, changed))
}
}
impl futures::Stream for ObjectStateSink {
type Item = ObjectStateSinkParams;
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
let mut shared_mutable = self.drop_guard.shared.lock().unwrap();
match shared_mutable.check_out_object_state_sink_params(&self.drop_guard.object_core) {
Some((object_state_sink_params, true)) => std::task::Poll::Ready(Some(object_state_sink_params)),
Some((_object_state_sink_params, false)) => {
shared_mutable.waker.push(cx.waker().clone());
std::task::Poll::Pending
}
None => std::task::Poll::Ready(None),
}
}
}
impl<T: Into<ObjectState<DataBytes>>> Sink<T> for ObjectStateSink {
type Error = ();
fn poll_ready(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let object_state = item.into();
self.state_set(object_state);
Ok(())
}
fn poll_flush(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}