use std::{
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc,
},
task::Poll,
};
use futures::{task::AtomicWaker, Sink};
use super::{core::Object, expose_contract::ObjectExposeContractId, state::ObjectState};
#[cfg(feature = "downstream")]
use crate::connection::downstream::DownstreamConnectionId;
use crate::{connection::upstream::UpstreamConnectionId, tag::TagExposeContractId, DataBytes, ObjectDescriptor};
#[derive(PartialEq, Eq, Hash, Copy, Clone, Ord, PartialOrd)]
pub(crate) struct StateSinkId(pub u64);
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub(crate) enum ObjectStateSinkOwner {
ObjectExposeContract(ObjectExposeContractId),
TagExposeContract(TagExposeContractId),
UpstreamConnection(UpstreamConnectionId),
#[cfg(feature = "downstream")]
DownstreamConnection(DownstreamConnectionId),
}
#[derive(Eq, PartialEq, Clone)]
pub struct ObjectStateSinkParams {}
pub struct ObjectStateSink {
pub(crate) id: StateSinkId,
owner: ObjectStateSinkOwner,
pub(crate) object_core: Arc<Object>,
last_reported_params: Option<Option<ObjectStateSinkParams>>,
shared: Arc<ObjectStateSinkShared>,
notify_object_on_drop: bool,
}
pub(crate) struct ObjectStateSinkInlet {
id: StateSinkId,
shared: Arc<ObjectStateSinkShared>,
}
struct ObjectStateSinkShared {
closed: AtomicBool,
waker: AtomicWaker,
}
impl ObjectStateSink {
pub(crate) fn new(object_core: Arc<Object>, id: StateSinkId, owner: ObjectStateSinkOwner) -> (ObjectStateSink, ObjectStateSinkInlet) {
let shared = Arc::new(ObjectStateSinkShared { closed: AtomicBool::new(false), waker: AtomicWaker::new() });
(ObjectStateSink { id, owner, object_core, shared: shared.clone(), last_reported_params: None, notify_object_on_drop: true }, ObjectStateSinkInlet {
id,
shared,
})
}
#[cfg(feature = "downstream")]
pub(crate) fn drop_without_notification(mut self) {
self.notify_object_on_drop = false;
drop(self);
}
pub fn descriptor(&self) -> &ObjectDescriptor {
&self.object_core.descriptor
}
}
impl Drop for ObjectStateSink {
fn drop(&mut self) {
if self.notify_object_on_drop {
self.object_core.desynchronize(self.id);
self.object_core.state_sink_dropped(self.id, &self.owner);
}
}
}
impl ObjectStateSinkInlet {
pub(crate) fn id(&self) -> StateSinkId {
self.id
}
pub(crate) fn close(&self) {
self.shared.closed.store(true, Relaxed);
self.shared.waker.wake();
}
}
impl futures::Stream for ObjectStateSink {
type Item = ObjectStateSinkParams;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
fn return_candidate(this: &mut std::pin::Pin<&mut ObjectStateSink>) -> std::task::Poll<Option<ObjectStateSinkParams>> {
if this.shared.closed.load(Relaxed) {
return std::task::Poll::Ready(None);
};
let object_state_sink_params = ObjectStateSinkParams {};
let changed = this.last_reported_params.is_none() || this.last_reported_params.as_ref().unwrap().as_ref() != Some(&object_state_sink_params);
if changed {
this.last_reported_params = Some(Some(object_state_sink_params.clone()));
std::task::Poll::Ready(Some(object_state_sink_params))
} else {
std::task::Poll::Pending
}
}
if let Poll::Ready(value) = return_candidate(&mut self) {
Poll::Ready(value)
} else {
self.shared.waker.register(cx.waker());
return_candidate(&mut self)
}
}
}
impl<T: Into<ObjectState<DataBytes>>> Sink<T> for ObjectStateSink {
type Error = ();
fn poll_ready(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let object_state = item.into();
self.object_core.state_set(object_state, self.id);
Ok(())
}
fn poll_flush(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}