hakuban 0.7.2

Data-object sharing library
Documentation
use std::{sync::{Arc, Mutex, MutexGuard}, task::Waker};

use super::{core::ObjectCore, remote::RemoteObjectExposeContractSharedMutable, DataBytes, DataSynchronized, DataVersion, ObjectObserveContractShared, ObjectState};
use crate::{tag::{remote::RemoteTagExposeContractSharedMutable, TagObserveContractShared}, ObjectDescriptor};

/// A [Stream](futures_util::stream::Stream), emitting [ObjectState]s
///
/// The Stream will end when, either, contract which emitted this ObjectStateStream gets terminated, or when hakuban network decides to drop the object.
/// Hakuban network will only keep alive those objects which have at least one not-terminated [ObjectObserveContract](crate::ObjectObserveContract) or [ObjectExposeContract](crate::ObjectExposeContract).
///
/// # Cloning
/// ObjectStateStream can be cloned, but cloning, like with contracts, doesn't create entirely separate entity.
///
/// Using mutiple clones as streams at the same time will have the clones race for the [ObjectState]s.
#[derive(Clone)]
pub struct ObjectStateStream {
	pub(crate) drop_guard: Arc<ObjectStateStreamDropGuard>,
}

pub(crate) struct ObjectStateStreamDropGuard {
	pub(crate) object_core: Arc<ObjectCore>,
	pub(crate) shared: Arc<Mutex<ObjectStateStreamSharedMutable>>,
	id: usize,
}

pub(crate) struct ObjectStateStreamSharedMutable {
	contract: Option<ObserveContractMutable>,
	drop_guard_dropped: bool,
	initialized_and_active: bool,
	last_reported_version: Option<Option<DataVersion>>,
	last_reported_synchronized: Option<Option<DataSynchronized>>,
	waker: Vec<Waker>,
}


pub(crate) enum ObserveContractMutable {
	Object(Arc<ObjectObserveContractShared>),
	Tag(Arc<TagObserveContractShared>),
	RemoteObject(Arc<Mutex<RemoteObjectExposeContractSharedMutable>>),
	RemoteTag(Arc<Mutex<RemoteTagExposeContractSharedMutable>>),
}

//LockCheck:  scope=ObjectStateStream
impl ObjectStateStream {
	//LockCheck:
	pub(crate) fn new(object_core: Arc<ObjectCore>, contract: ObserveContractMutable) -> ObjectStateStream {
		let id = object_core.object_state_streams_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
		let shared = Arc::new(Mutex::new(ObjectStateStreamSharedMutable {
			contract: Some(contract),
			waker: Vec::new(),
			drop_guard_dropped: false,
			initialized_and_active: true,
			last_reported_version: None,
			last_reported_synchronized: None,
		}));
		ObjectStateStream { drop_guard: Arc::new(ObjectStateStreamDropGuard { id, shared, object_core }) }
	}

	//LockCheck:  call=ObjectCore::add_object_state_stream
	pub(crate) fn link_to_object(&self) {
		self.drop_guard.object_core.add_object_state_stream(self.drop_guard.id, self.drop_guard.shared.clone());
	}

	pub fn descriptor(&self) -> &ObjectDescriptor {
		&self.drop_guard.object_core.descriptor
	}

	//LockCheck:  call=check_out_state  unlock=mutable
	pub fn current(&self) -> Option<ObjectState<DataBytes>> {
		self.check_out_state(&self.drop_guard.object_core).map(|(state, _changed, _guard)| state)
	}

	//LockCheck:  call=ObjectCore::state_get  lock=mutable
	fn check_out_state(&self, object_core: &Arc<ObjectCore>) -> Option<(ObjectState<DataBytes>, bool, MutexGuard<ObjectStateStreamSharedMutable>)> {
		let state = object_core.state_get().unwrap();

		let mut shared_mutable = self.drop_guard.shared.lock().unwrap();

		if shared_mutable.contract.is_none() || shared_mutable.last_reported_version == Some(None) || !shared_mutable.initialized_and_active {
			shared_mutable.last_reported_version = Some(None);
			return None;
		};

		//TODO: maybe just not unwrap?
		let changed = shared_mutable.last_reported_version.is_none()
			|| shared_mutable.last_reported_version.as_ref().unwrap().as_ref() != Some(&state.version)
			|| shared_mutable.last_reported_synchronized.is_none()
			|| shared_mutable.last_reported_synchronized.as_ref().unwrap().as_ref() != Some(&state.synchronized);
		if changed {
			shared_mutable.last_reported_version = Some(Some(state.version.clone()));
			shared_mutable.last_reported_synchronized = Some(Some(state.synchronized.clone()));
		}
		Some((state, changed, shared_mutable))
	}
}


//LockCheck:  scope=ObjectStateStream
impl Drop for ObjectStateStreamDropGuard {
	//LockCheck:  call=ObjectCore::remove_object_state_stream  lock=mutable  unlock=mutable  call=ObjectObserveContract::object_state_stream_dropped  call=TagObserveContract::object_state_stream_dropped  lock=RemoteObjectInterface::mutable call=RemoteObjectInterface::object_state_stream_dropped unlock=RemoteObjectInterface::mutable  lock=RemoteTagInterface::mutable  call=RemoteTagInterface::object_state_stream_dropped  unlock=RemoteTagInterface::mutable
	fn drop(&mut self) {
		self.object_core.remove_object_state_stream(self.id);
		let contract = {
			let mut shared_mutable = self.shared.lock().unwrap();
			shared_mutable.drop_guard_dropped = true;
			shared_mutable.wake();
			shared_mutable.contract.take()
		};
		match contract {
			Some(ObserveContractMutable::Object(object_contract)) => object_contract.object_state_stream_dropped(),
			Some(ObserveContractMutable::Tag(tag_contract)) => tag_contract.object_state_stream_dropped(&self.object_core),
			Some(ObserveContractMutable::RemoteObject(remote_interface)) => remote_interface.lock().unwrap().object_state_stream_dropped(),
			Some(ObserveContractMutable::RemoteTag(remote_interface)) => remote_interface.lock().unwrap().object_state_stream_dropped(&self.object_core),
			None => (),
		}
	}
}



//LockCheck:  scope=ObjectStateStream
impl ObjectStateStreamSharedMutable {
	//LockCheck:
	pub(crate) fn initialize_activate(&mut self) -> bool {
		self.wake();
		if self.last_reported_version == Some(None) {
			false
		} else {
			assert!(!self.initialized_and_active);
			self.initialized_and_active = true;
			true
		}
	}

	//LockCheck:
	pub(crate) fn deinitialize_deactivate(&mut self) -> bool {
		self.wake();
		if self.last_reported_version == Some(None) {
			false
		} else {
			assert!(self.initialized_and_active);
			self.initialized_and_active = false;
			true
		}
	}

	//LockCheck:
	pub fn object_changed(&mut self) {
		self.wake();
	}

	//LockCheck:
	pub(crate) fn contract_terminated(&mut self) {
		self.contract = None;
		self.wake();
	}

	fn wake(&mut self) {
		for waker in self.waker.drain(..) {
			waker.wake();
		}
	}
}



//TODO: is it really?
impl Unpin for ObjectStateStream {}


//LockCheck:  scope=ObjectStateStream
impl futures::Stream for ObjectStateStream {
	type Item = ObjectState<DataBytes>;

	//LockCheck:  call=check_out_state  unlock=mutable
	fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
		match self.check_out_state(&self.drop_guard.object_core) {
			Some((state, true, _shared_mutable)) => std::task::Poll::Ready(Some(state)),
			Some((_state, false, mut shared_mutable)) => {
				shared_mutable.waker.push(cx.waker().clone());
				std::task::Poll::Pending
			}
			None => std::task::Poll::Ready(None),
		}
	}
}