use std::{
hash::Hash,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc,
},
};
use futures::task::AtomicWaker;
use super::{core::Object, DataBytes, DataSynchronized, DataVersion, ObjectObserveContractId, ObjectState};
#[cfg(feature = "downstream")]
use crate::connection::downstream::DownstreamConnectionId;
use crate::{connection::upstream::UpstreamConnectionId, tag::TagObserveContractId, ObjectDescriptor};
#[derive(PartialEq, Eq, Hash, Copy, Clone)]
pub(crate) struct StateStreamId(pub u64);
#[derive(Clone)]
pub(crate) enum ObjectStateStreamOwner {
ObjectObserveContract(ObjectObserveContractId),
TagObserveContract(TagObserveContractId),
UpstreamConnection(UpstreamConnectionId),
#[cfg(feature = "downstream")]
DownstreamConnection(DownstreamConnectionId),
}
pub struct ObjectStateStream {
id: StateStreamId,
owner: ObjectStateStreamOwner,
pub(crate) object_core: Arc<Object>,
last_reported_version: Option<Option<DataVersion>>,
last_reported_synchronized: Option<Option<DataSynchronized>>,
shared: Arc<ObjectStateStreamShared>,
notify_object_on_drop: bool,
}
pub(crate) struct ObjectStateStreamInlet {
id: StateStreamId,
shared: Arc<ObjectStateStreamShared>,
}
struct ObjectStateStreamShared {
closed: AtomicBool,
waker: AtomicWaker,
}
impl ObjectStateStream {
pub(crate) fn new(object_core: Arc<Object>, id: StateStreamId, owner: ObjectStateStreamOwner) -> (ObjectStateStream, ObjectStateStreamInlet) {
let shared = Arc::new(ObjectStateStreamShared { waker: AtomicWaker::new(), closed: AtomicBool::new(false) });
(
ObjectStateStream {
id,
owner,
object_core,
last_reported_version: None,
last_reported_synchronized: None,
shared: shared.clone(),
notify_object_on_drop: true,
},
ObjectStateStreamInlet { id, shared },
)
}
#[cfg(feature = "downstream")]
pub(crate) fn drop_without_notification(mut self) {
self.notify_object_on_drop = false;
drop(self);
}
pub fn descriptor(&self) -> &ObjectDescriptor {
&self.object_core.descriptor
}
}
impl Drop for ObjectStateStream {
fn drop(&mut self) {
if self.notify_object_on_drop {
self.object_core.state_stream_dropped(self.id, &self.owner);
}
}
}
impl ObjectStateStreamInlet {
pub(crate) fn id(&self) -> StateStreamId {
self.id
}
pub(crate) fn wake(&self) {
self.shared.waker.wake();
}
pub(crate) fn close(&self) {
self.shared.closed.store(true, Relaxed);
self.shared.waker.wake();
}
}
impl futures::Stream for ObjectStateStream {
type Item = ObjectState<DataBytes>;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
self.shared.waker.register(cx.waker());
if self.shared.closed.load(Relaxed) {
return std::task::Poll::Ready(None);
};
let Some(state) = self.object_core.state_get() else {
return std::task::Poll::Pending;
};
let changed = self.last_reported_version.is_none()
|| self.last_reported_version.as_ref().unwrap().as_ref() != Some(&state.version)
|| self.last_reported_synchronized.is_none()
|| self.last_reported_synchronized.as_ref().unwrap().as_ref() != Some(&state.synchronized);
if changed {
self.last_reported_version = Some(Some(state.version.clone()));
self.last_reported_synchronized = Some(Some(state.synchronized.clone()));
std::task::Poll::Ready(Some(state))
} else {
std::task::Poll::Pending
}
}
}