use std::{sync::{Arc, Mutex}, task::Waker};
use super::{core::ObjectCore, ObjectStateStream, ObjectStateStreamSharedMutable, ObserveContractMutable};
use crate::{contract::Contract, exchange::core::ExchangeCore, observe_contract::ObserveContract, LocalExchange, ObjectDescriptor};
#[derive(Clone)]
pub struct ObjectObserveContract {
drop_guard: Arc<ObjectObserveContractDropGuard>,
}
struct ObjectObserveContractDropGuard {
shared: Arc<ObjectObserveContractShared>,
}
pub(crate) struct ObjectObserveContractShared {
exchange: LocalExchange,
object_core: Arc<ObjectCore>,
mutable: Mutex<ObjectObserveContractSharedMutable>,
}
struct ObjectObserveContractSharedMutable {
terminated: bool,
object_state_stream_ready_for_check_out: bool,
object_state_stream: Option<Arc<Mutex<ObjectStateStreamSharedMutable>>>,
initialized_and_active: bool,
waker: Vec<Waker>,
}
impl ObjectObserveContract {
pub(super) fn new(exchange: LocalExchange, object_core: Arc<ObjectCore>) -> ObjectObserveContract {
let shared = Arc::new(ObjectObserveContractShared {
exchange,
object_core,
mutable: Mutex::new(ObjectObserveContractSharedMutable {
terminated: false,
waker: Vec::new(),
object_state_stream_ready_for_check_out: false,
object_state_stream: None,
initialized_and_active: false,
}),
});
shared.object_core.link_observe_contract(shared.clone());
ObjectObserveContract { drop_guard: Arc::new(ObjectObserveContractDropGuard { shared }) }
}
pub fn descriptor(&self) -> &ObjectDescriptor {
&self.drop_guard.shared.object_core.descriptor
}
pub fn ready(&mut self) -> Option<ObjectStateStream> {
let mut shared_mutable = self.drop_guard.shared.mutable.lock().unwrap();
let ret = shared_mutable.check_out_object_state_stream(&self.drop_guard.shared).flatten();
drop(shared_mutable);
if let Some(stream) = &ret {
stream.link_to_object();
}
ret
}
pub fn terminate(&mut self) {
self.drop_guard.shared.terminate();
}
}
impl Drop for ObjectObserveContractDropGuard {
fn drop(&mut self) {
self.shared.contract_dropped();
}
}
impl ObjectObserveContractShared {
fn contract_dropped(self: &Arc<Self>) {
self.terminate();
assert_eq!(Arc::strong_count(self), 1);
}
pub(crate) fn object_state_stream_dropped(self: &Arc<Self>) {
let mut mutable = self.mutable.lock().unwrap();
mutable.object_state_stream.take().unwrap();
if mutable.initialized_and_active {
mutable.object_state_stream_ready_for_check_out = true;
};
mutable.wake();
}
fn terminate(self: &Arc<Self>) {
let mut mutable = self.mutable.lock().unwrap();
if !mutable.terminated {
mutable.terminate();
drop(mutable);
let dyn_self: Arc<dyn ObserveContract> = self.clone();
self.object_core.unlink_observe_contract(&dyn_self);
self.exchange.shared.object_changed(self.object_core.descriptor.clone());
drop(dyn_self);
}
}
}
impl Contract for ObjectObserveContractShared {
fn id(&self) -> usize {
(self as *const Self) as usize
}
fn exchange_core(&self) -> &Arc<ExchangeCore> {
self.exchange.shared.exchange_core.as_ref().unwrap()
}
}
impl ObjectObserveContractSharedMutable {
fn wake(&mut self) {
for waker in self.waker.drain(..) {
waker.wake();
}
}
fn check_out_object_state_stream(&mut self, shared: &Arc<ObjectObserveContractShared>) -> Option<Option<ObjectStateStream>> {
if self.terminated {
Some(None)
} else if self.object_state_stream_ready_for_check_out {
self.object_state_stream_ready_for_check_out = false;
let object_state_stream = ObjectStateStream::new(shared.object_core.clone(), ObserveContractMutable::Object(shared.clone()));
self.object_state_stream = Some(object_state_stream.drop_guard.shared.clone());
Some(Some(object_state_stream))
} else {
None
}
}
fn terminate(&mut self) {
if !self.terminated {
self.terminated = true;
self.wake();
if let Some(object_state_stream) = self.object_state_stream.take() {
object_state_stream.lock().unwrap().contract_terminated();
};
}
}
}
impl ObserveContract for ObjectObserveContractShared {
fn initialize_activate(&self, _object_core: &Arc<ObjectCore>) {
let mut mutable = self.mutable.lock().unwrap();
if mutable.initialized_and_active {
panic!("Double activate");
};
mutable.initialized_and_active = true;
if let Some(_object_state_stream) = &mutable.object_state_stream {
panic!();
} else {
mutable.object_state_stream_ready_for_check_out = true;
};
mutable.wake();
}
fn deinitialize_deactivate(&self, _object_core: &Arc<ObjectCore>) {
unreachable!();
}
fn object(&self) -> bool {
true
}
}
impl Unpin for ObjectObserveContract {}
impl futures::Stream for ObjectObserveContract {
type Item = ObjectStateStream;
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
let mut shared_mutable = self.drop_guard.shared.mutable.lock().unwrap();
match shared_mutable.check_out_object_state_stream(&self.drop_guard.shared) {
Some(object_state_stream) => {
drop(shared_mutable);
if let Some(stream) = &object_state_stream {
stream.link_to_object();
}
std::task::Poll::Ready(object_state_stream)
}
None => {
shared_mutable.waker.push(cx.waker().clone());
std::task::Poll::Pending
}
}
}
}
impl std::fmt::Debug for ObjectObserveContractShared {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str(&format!("ObjectObserveContract:{}", self.object_core.descriptor))
}
}