hakuban 0.7.2

Data-object sharing library
Documentation
use std::{sync::{Arc, Mutex}, task::{Poll, Waker}};

use futures::Sink;

use super::{core::ObjectCore, remote::RemoteObjectExposeContractSharedMutable, ObjectExposeContractShared, ObjectState};
use crate::{tag::{remote::RemoteTagExposeContractSharedMutable, TagExposeContractShared}, DataBytes, ObjectDescriptor};


/// Currently empty struct, to hold sink parameters in the future
#[derive(Eq, PartialEq, Clone)]
pub struct ObjectStateSinkParams {}

/// A [Sink](futures_util::sink::Sink), accepting [ObjectState]s
///
/// Basic function of the ObjectStateSink is passing new states of objects to the [LocalExchange](crate::LocalExchange).
///
/// The ObjectStateSink, besides being a Sink, also implements a [Stream](futures_util::stream::Stream), carrying [ObjectStateSinkParams] objects.
/// The Stream will end when, either, contract which emitted this ObjectStateSink gets terminated, or when the contract stops being the contract selected for exposing the object.
/// Hakuban network will, under normal conditions, try to pick only one expose contract per object, to keep that object exposed/up-to-date.
///
/// # Cloning
/// ObjectStateSink can be cloned, but cloning, like with contracts, doesn't create entirely separate entity.
///
/// Using mutiple clones as streams at the same time will have the clones race for the [ObjectStateSinkParams].
#[derive(Clone)]
pub struct ObjectStateSink {
	pub(crate) drop_guard: Arc<ObjectStateSinkDropGuard>,
}

//TODO: depub fields when not needed by remote interface
pub(crate) struct ObjectStateSinkDropGuard {
	pub(crate) object_core: Arc<ObjectCore>,
	pub(crate) shared: Arc<Mutex<ObjectStateSinkSharedMutable>>,
	id: usize,
}


pub(crate) struct ObjectStateSinkSharedMutable {
	contract: Option<ExposeContractMutable>,
	drop_guard_dropped: bool,
	assigned: bool,
	last_reported_params: Option<Option<ObjectStateSinkParams>>,
	waker: Vec<Waker>,
}

pub(crate) enum ExposeContractMutable {
	Object(Arc<ObjectExposeContractShared>),
	Tag(Arc<TagExposeContractShared>),
	RemoteObject(Arc<Mutex<RemoteObjectExposeContractSharedMutable>>),
	RemoteTag(Arc<Mutex<RemoteTagExposeContractSharedMutable>>),
}


//LockCheck:  scope=ObjectStateSink
impl ObjectStateSink {
	//LockCheck:
	pub(crate) fn new(object_core: Arc<ObjectCore>, contract: ExposeContractMutable) -> ObjectStateSink {
		ObjectStateSink {
			drop_guard: Arc::new(ObjectStateSinkDropGuard {
				id: object_core.object_state_sinks_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
				object_core,
				shared: Arc::new(Mutex::new(ObjectStateSinkSharedMutable {
					contract: Some(contract),
					last_reported_params: None,
					waker: Vec::new(),
					drop_guard_dropped: false,
					assigned: true,
				})),
			}),
		}
	}

	pub fn descriptor(&self) -> &ObjectDescriptor {
		&self.drop_guard.object_core.descriptor
	}

	//LockCheck:  lock=mutable  call=check_out_object_state_sink_params  unlock=mutable
	pub fn current(&self) -> Option<ObjectStateSinkParams> {
		let mut shared_mutable = self.drop_guard.shared.lock().unwrap();
		shared_mutable.check_out_object_state_sink_params(&self.drop_guard.object_core).map(|(state, _changed)| state)
	}

	//LockCheck:  call=ObjectCore::state_set  call=LocalExchange::object_changed
	pub(crate) fn state_set(&self, object_state: ObjectState<DataBytes>) {
		self.drop_guard.object_core.state_set(object_state, self.drop_guard.id);
		self.drop_guard.object_core.local_exchange.object_changed(self.drop_guard.object_core.descriptor.clone());
	}

	pub fn desynchronize(&self) {
		self.drop_guard.desynchronize();
	}
}

//LockCheck:  scope=ObjectStateSink
impl ObjectStateSinkDropGuard {
	//LockCheck:  call=ObjectCore::desynchronize  call=LocalExchange::object_changed
	pub fn desynchronize(&self) {
		self.object_core.desynchronize(self.id);
		self.object_core.local_exchange.object_changed(self.object_core.descriptor.clone());
	}
}

//LockCheck:  scope=ObjectStateSink
impl Drop for ObjectStateSinkDropGuard {
	//LockCheck:  call=desynchronize  lock=mutable  unlock=mutable  call=ObjectExposeContract::object_state_sink_dropped  call=TagExposeContract::object_state_sink_dropped  lock=RemoteObjectInterface::mutable call=RemoteObjectInterface::object_state_sink_dropped unlock=RemoteObjectInterface::mutable  lock=RemoteTagInterface::mutable  call=RemoteTagInterface::object_state_sink_dropped  unlock=RemoteTagInterface::mutable
	fn drop(&mut self) {
		//TODO: remove from object_state_sink_params_streams
		self.desynchronize(); //TODO: think if the order is right
		let contract = {
			let mut shared_mutable = self.shared.lock().unwrap();
			shared_mutable.drop_guard_dropped = true;
			shared_mutable.wake();
			shared_mutable.contract.take()
		};
		match contract {
			Some(ExposeContractMutable::Object(object_contract)) => object_contract.object_state_sink_dropped(),
			Some(ExposeContractMutable::Tag(tag_contract)) => tag_contract.object_state_sink_dropped(&self.object_core),
			Some(ExposeContractMutable::RemoteObject(remote_interface)) => remote_interface.lock().unwrap().object_state_sink_dropped(),
			Some(ExposeContractMutable::RemoteTag(remote_interface)) => remote_interface.lock().unwrap().object_state_sink_dropped(&self.object_core),
			None => {}
		}
	}
}

//LockCheck:  scope=ObjectStateSink
impl ObjectStateSinkSharedMutable {
	//LockCheck:
	pub(crate) fn assign(&mut self) -> bool {
		self.wake();
		if self.last_reported_params == Some(None) {
			false
		} else {
			assert!(!self.assigned);
			self.assigned = true;
			true
		}
	}

	//LockCheck:
	pub(crate) fn unassign(&mut self) -> bool {
		self.wake();
		if self.last_reported_params == Some(None) {
			false
		} else {
			assert!(self.assigned);
			self.assigned = false;
			true
		}
	}

	//LockCheck:
	pub(crate) fn contract_terminated(&mut self) {
		self.contract = None;
		self.wake();
	}

	fn wake(&mut self) {
		for waker in self.waker.drain(..) {
			waker.wake();
		}
	}

	//LockCheck:
	fn check_out_object_state_sink_params(&mut self, _object_core: &Arc<ObjectCore>) -> Option<(ObjectStateSinkParams, bool)> {
		if self.contract.is_none() || self.last_reported_params == Some(None) || !self.assigned {
			self.last_reported_params = Some(None);
			return None;
		};
		//TODO: get assigmnent_params from core
		let object_state_sink_params = ObjectStateSinkParams {};
		let changed = self.last_reported_params.is_none() || self.last_reported_params.as_ref().unwrap().as_ref() != Some(&object_state_sink_params);
		if changed {
			self.last_reported_params = Some(Some(object_state_sink_params));
		}
		Some((ObjectStateSinkParams {}, changed))
	}
}


//LockCheck:  scope=ObjectStateSink
impl futures::Stream for ObjectStateSink {
	type Item = ObjectStateSinkParams;

	//LockCheck:  lock=mutable  call=check_out_object_state_sink_params  unlock=mutable
	fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
		let mut shared_mutable = self.drop_guard.shared.lock().unwrap();
		match shared_mutable.check_out_object_state_sink_params(&self.drop_guard.object_core) {
			Some((object_state_sink_params, true)) => std::task::Poll::Ready(Some(object_state_sink_params)),
			Some((_object_state_sink_params, false)) => {
				shared_mutable.waker.push(cx.waker().clone());
				std::task::Poll::Pending
			}
			None => std::task::Poll::Ready(None),
		}
	}
}



//LockCheck:  scope=ObjectStateSink
impl<T: Into<ObjectState<DataBytes>>> Sink<T> for ObjectStateSink {
	type Error = ();

	fn poll_ready(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}

	//LockCheck:  call=state_set
	fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
		let object_state = item.into();
		self.state_set(object_state);
		Ok(())
	}

	fn poll_flush(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}

	fn poll_close(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}
}