use std::{sync::{Arc, Mutex, MutexGuard}, task::Waker};
use super::{core::ObjectCore, remote::RemoteObjectExposeContractSharedMutable, DataBytes, DataSynchronized, DataVersion, ObjectObserveContractShared, ObjectState};
use crate::{tag::{remote::RemoteTagExposeContractSharedMutable, TagObserveContractShared}, ObjectDescriptor};
#[derive(Clone)]
pub struct ObjectStateStream {
pub(crate) drop_guard: Arc<ObjectStateStreamDropGuard>,
}
pub(crate) struct ObjectStateStreamDropGuard {
pub(crate) object_core: Arc<ObjectCore>,
pub(crate) shared: Arc<Mutex<ObjectStateStreamSharedMutable>>,
id: usize,
}
pub(crate) struct ObjectStateStreamSharedMutable {
contract: Option<ObserveContractMutable>,
drop_guard_dropped: bool,
initialized_and_active: bool,
last_reported_version: Option<Option<DataVersion>>,
last_reported_synchronized: Option<Option<DataSynchronized>>,
waker: Vec<Waker>,
}
pub(crate) enum ObserveContractMutable {
Object(Arc<ObjectObserveContractShared>),
Tag(Arc<TagObserveContractShared>),
RemoteObject(Arc<Mutex<RemoteObjectExposeContractSharedMutable>>),
RemoteTag(Arc<Mutex<RemoteTagExposeContractSharedMutable>>),
}
impl ObjectStateStream {
pub(crate) fn new(object_core: Arc<ObjectCore>, contract: ObserveContractMutable) -> ObjectStateStream {
let id = object_core.object_state_streams_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let shared = Arc::new(Mutex::new(ObjectStateStreamSharedMutable {
contract: Some(contract),
waker: Vec::new(),
drop_guard_dropped: false,
initialized_and_active: true,
last_reported_version: None,
last_reported_synchronized: None,
}));
ObjectStateStream { drop_guard: Arc::new(ObjectStateStreamDropGuard { id, shared, object_core }) }
}
pub(crate) fn link_to_object(&self) {
self.drop_guard.object_core.add_object_state_stream(self.drop_guard.id, self.drop_guard.shared.clone());
}
pub fn descriptor(&self) -> &ObjectDescriptor {
&self.drop_guard.object_core.descriptor
}
pub fn current(&self) -> Option<ObjectState<DataBytes>> {
self.check_out_state(&self.drop_guard.object_core).map(|(state, _changed, _guard)| state)
}
fn check_out_state(&self, object_core: &Arc<ObjectCore>) -> Option<(ObjectState<DataBytes>, bool, MutexGuard<ObjectStateStreamSharedMutable>)> {
let state = object_core.state_get().unwrap();
let mut shared_mutable = self.drop_guard.shared.lock().unwrap();
if shared_mutable.contract.is_none() || shared_mutable.last_reported_version == Some(None) || !shared_mutable.initialized_and_active {
shared_mutable.last_reported_version = Some(None);
return None;
};
let changed = shared_mutable.last_reported_version.is_none()
|| shared_mutable.last_reported_version.as_ref().unwrap().as_ref() != Some(&state.version)
|| shared_mutable.last_reported_synchronized.is_none()
|| shared_mutable.last_reported_synchronized.as_ref().unwrap().as_ref() != Some(&state.synchronized);
if changed {
shared_mutable.last_reported_version = Some(Some(state.version.clone()));
shared_mutable.last_reported_synchronized = Some(Some(state.synchronized.clone()));
}
Some((state, changed, shared_mutable))
}
}
impl Drop for ObjectStateStreamDropGuard {
fn drop(&mut self) {
self.object_core.remove_object_state_stream(self.id);
let contract = {
let mut shared_mutable = self.shared.lock().unwrap();
shared_mutable.drop_guard_dropped = true;
shared_mutable.wake();
shared_mutable.contract.take()
};
match contract {
Some(ObserveContractMutable::Object(object_contract)) => object_contract.object_state_stream_dropped(),
Some(ObserveContractMutable::Tag(tag_contract)) => tag_contract.object_state_stream_dropped(&self.object_core),
Some(ObserveContractMutable::RemoteObject(remote_interface)) => remote_interface.lock().unwrap().object_state_stream_dropped(),
Some(ObserveContractMutable::RemoteTag(remote_interface)) => remote_interface.lock().unwrap().object_state_stream_dropped(&self.object_core),
None => (),
}
}
}
impl ObjectStateStreamSharedMutable {
pub(crate) fn initialize_activate(&mut self) -> bool {
self.wake();
if self.last_reported_version == Some(None) {
false
} else {
assert!(!self.initialized_and_active);
self.initialized_and_active = true;
true
}
}
pub(crate) fn deinitialize_deactivate(&mut self) -> bool {
self.wake();
if self.last_reported_version == Some(None) {
false
} else {
assert!(self.initialized_and_active);
self.initialized_and_active = false;
true
}
}
pub fn object_changed(&mut self) {
self.wake();
}
pub(crate) fn contract_terminated(&mut self) {
self.contract = None;
self.wake();
}
fn wake(&mut self) {
for waker in self.waker.drain(..) {
waker.wake();
}
}
}
impl Unpin for ObjectStateStream {}
impl futures::Stream for ObjectStateStream {
type Item = ObjectState<DataBytes>;
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
match self.check_out_state(&self.drop_guard.object_core) {
Some((state, true, _shared_mutable)) => std::task::Poll::Ready(Some(state)),
Some((_state, false, mut shared_mutable)) => {
shared_mutable.waker.push(cx.waker().clone());
std::task::Poll::Pending
}
None => std::task::Poll::Ready(None),
}
}
}