hakuban 0.7.2

Data-object sharing library
Documentation
use std::{sync::{Arc, Mutex}, task::Waker};

use super::{core::ObjectCore, ObjectStateStream, ObjectStateStreamSharedMutable, ObserveContractMutable};
use crate::{contract::Contract, exchange::core::ExchangeCore, observe_contract::ObserveContract, LocalExchange, ObjectDescriptor};

//TODO: flatten into 2 structs (or one, Arc-ed)

/// Represents a wish, a __contract__ to observe an object
///
/// ObjectObserveContract is a [Stream](futures_util::stream::Stream), emitting [ObjectStateStream] objects.
///
/// ObjectStateStream will only get emitted if/when [LocalExchange] gets to know at least one [ObjectState](crate::ObjectState) of the object the contract pertains to.
/// ObjectObserveContract will not emit more than one [ObjectStateStream] at a time. New [ObjectStateStream] will get emitted only when previous one gets dropped.
///
/// The [Stream](futures_util::stream::Stream) will end when contract gets [ObjectObserveContract::terminate]d.
///
/// # Cloning
/// ObjectObserveContract can be cloned, but cloning does not create a new contract. Think of it as creating o paper copy of an on-paper contract.
/// It's a new sheet of paper, but not a new deal/agreement. Clones are basically references to a logical agreement.
///
/// Accordingly, terminating any of the clones (or the original), renders all of them terminated.
/// Using mutiple clones as streams at the same time will have the clones race for the [ObjectStateStream](crate::ObjectStateStream)s.
#[derive(Clone)]
pub struct ObjectObserveContract {
	drop_guard: Arc<ObjectObserveContractDropGuard>,
}

struct ObjectObserveContractDropGuard {
	shared: Arc<ObjectObserveContractShared>,
}

pub(crate) struct ObjectObserveContractShared {
	exchange: LocalExchange,
	object_core: Arc<ObjectCore>,
	mutable: Mutex<ObjectObserveContractSharedMutable>,
}

struct ObjectObserveContractSharedMutable {
	terminated: bool,
	object_state_stream_ready_for_check_out: bool,
	object_state_stream: Option<Arc<Mutex<ObjectStateStreamSharedMutable>>>,
	initialized_and_active: bool,
	waker: Vec<Waker>,
}


//LockCheck:  scope=ObjectObserveContract
impl ObjectObserveContract {
	//LockCheck:  call=ObjectCore::link_observe_contract
	pub(super) fn new(exchange: LocalExchange, object_core: Arc<ObjectCore>) -> ObjectObserveContract {
		let shared = Arc::new(ObjectObserveContractShared {
			exchange,
			object_core,
			mutable: Mutex::new(ObjectObserveContractSharedMutable {
				terminated: false,
				waker: Vec::new(),
				object_state_stream_ready_for_check_out: false,
				object_state_stream: None,
				initialized_and_active: false,
			}),
		});
		shared.object_core.link_observe_contract(shared.clone());
		ObjectObserveContract { drop_guard: Arc::new(ObjectObserveContractDropGuard { shared }) }
	}

	pub fn descriptor(&self) -> &ObjectDescriptor {
		&self.drop_guard.shared.object_core.descriptor
	}

	//LockCheck:  lock=mutable  call=ObjectObserveContractMutable::check_out_object_state_stream  unlock=mutable  call=ObjectStateStream::link_to_object
	pub fn ready(&mut self) -> Option<ObjectStateStream> {
		let mut shared_mutable = self.drop_guard.shared.mutable.lock().unwrap();
		let ret = shared_mutable.check_out_object_state_stream(&self.drop_guard.shared).flatten();
		drop(shared_mutable);
		if let Some(stream) = &ret {
			stream.link_to_object();
		}
		ret
	}

	pub fn terminate(&mut self) {
		self.drop_guard.shared.terminate();
	}
}

//LockCheck:  scope=ObjectObserveContract
impl Drop for ObjectObserveContractDropGuard {
	//LockCheck:  call=contract_dropped
	fn drop(&mut self) {
		self.shared.contract_dropped();
	}
}

//LockCheck:  scope=ObjectObserveContract
impl ObjectObserveContractShared {
	//LockCheck:  call=terminate
	fn contract_dropped(self: &Arc<Self>) {
		self.terminate();
		assert_eq!(Arc::strong_count(self), 1);
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn object_state_stream_dropped(self: &Arc<Self>) {
		let mut mutable = self.mutable.lock().unwrap();
		mutable.object_state_stream.take().unwrap();
		if mutable.initialized_and_active {
			mutable.object_state_stream_ready_for_check_out = true;
		};
		mutable.wake();
	}

	//LockCheck:  lock=mutable  call=ObjectObserveContractMutable::terminate  unlock=mutable  call=ObjectCore::unlink_observe_contract  call=LocalExchange::object_changed
	fn terminate(self: &Arc<Self>) {
		let mut mutable = self.mutable.lock().unwrap();
		if !mutable.terminated {
			mutable.terminate();
			drop(mutable);
			let dyn_self: Arc<dyn ObserveContract> = self.clone();
			self.object_core.unlink_observe_contract(&dyn_self);
			self.exchange.shared.object_changed(self.object_core.descriptor.clone());
			drop(dyn_self);
		}
	}
}


impl Contract for ObjectObserveContractShared {
	fn id(&self) -> usize {
		(self as *const Self) as usize
	}

	fn exchange_core(&self) -> &Arc<ExchangeCore> {
		self.exchange.shared.exchange_core.as_ref().unwrap()
	}
}



//LockCheck:  scope=ObjectObserveContractMutable
impl ObjectObserveContractSharedMutable {
	fn wake(&mut self) {
		for waker in self.waker.drain(..) {
			waker.wake();
		}
	}

	//LockCheck:  call=ObjectStateStream::new
	fn check_out_object_state_stream(&mut self, shared: &Arc<ObjectObserveContractShared>) -> Option<Option<ObjectStateStream>> {
		if self.terminated {
			Some(None)
		} else if self.object_state_stream_ready_for_check_out {
			self.object_state_stream_ready_for_check_out = false;
			let object_state_stream = ObjectStateStream::new(shared.object_core.clone(), ObserveContractMutable::Object(shared.clone()));
			self.object_state_stream = Some(object_state_stream.drop_guard.shared.clone());
			Some(Some(object_state_stream))
		} else {
			None
		}
	}

	//LockCheck:  lock=ObjectStateStream::mutable  call=ObjectStateStream::contract_terminated  unlock=ObjectStateStream::mutable
	fn terminate(&mut self) {
		if !self.terminated {
			self.terminated = true;
			self.wake();
			if let Some(object_state_stream) = self.object_state_stream.take() {
				object_state_stream.lock().unwrap().contract_terminated();
			};
		}
	}
}

//LockCheck:  scope=ObjectObserveContract
impl ObserveContract for ObjectObserveContractShared {
	//LockCheck:  lock=mutable  unlock=mutable
	fn initialize_activate(&self, _object_core: &Arc<ObjectCore>) {
		let mut mutable = self.mutable.lock().unwrap();

		if mutable.initialized_and_active {
			panic!("Double activate");
		};
		mutable.initialized_and_active = true;

		if let Some(_object_state_stream) = &mutable.object_state_stream {
			panic!();
		} else {
			mutable.object_state_stream_ready_for_check_out = true;
		};
		mutable.wake();
	}

	//LockCheck:
	fn deinitialize_deactivate(&self, _object_core: &Arc<ObjectCore>) {
		unreachable!();
	}

	fn object(&self) -> bool {
		true
	}
}



//TODO: is it really?
impl Unpin for ObjectObserveContract {}


//LockCheck:  scope=ObjectObserveContract
impl futures::Stream for ObjectObserveContract {
	type Item = ObjectStateStream;

	//LockCheck:  lock=mutable  call=ObjectObserveContractMutable::check_out_object_state_stream  unlock=mutable  call=ObjectStateStream::link_to_object
	fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
		let mut shared_mutable = self.drop_guard.shared.mutable.lock().unwrap();
		match shared_mutable.check_out_object_state_stream(&self.drop_guard.shared) {
			Some(object_state_stream) => {
				drop(shared_mutable);
				if let Some(stream) = &object_state_stream {
					stream.link_to_object();
				}
				std::task::Poll::Ready(object_state_stream)
			}
			None => {
				shared_mutable.waker.push(cx.waker().clone());
				std::task::Poll::Pending
			}
		}
	}
}



impl std::fmt::Debug for ObjectObserveContractShared {
	fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		formatter.write_str(&format!("ObjectObserveContract:{}", self.object_core.descriptor))
	}
}