hakuban 0.7.2

Data-object sharing library
Documentation
#![allow(clippy::mutex_atomic)] //because I want the value AND the locking. I think :)

use std::{collections::{hash_map::Entry, HashSet}, hash::{Hash, Hasher}, sync::{atomic::{AtomicU32, Ordering}, Arc, Mutex}};

use log::trace;

use super::{core::ObjectCore, state_sink::{ObjectStateSink, ObjectStateSinkSharedMutable}, ObjectStateStream, ObjectStateStreamSharedMutable, ObserveContractMutable};
use crate::{channel::MpscSender, contract::Contract, diff, exchange::{core::ExchangeCore, local::LocalExchangeShared}, expose_contract::ExposeContract, object::state_sink::ExposeContractMutable, observe_contract::ObserveContract, DataBytes, DataVersion, ObjectDescriptor};

pub(crate) struct RemoteObject {
	object_interface: Arc<RemoteObjectInterface>,
}

pub(super) struct RemoteObjectInterface {
	remote_exchange_core: Arc<ExchangeCore>,
	object_core: Arc<ObjectCore>,
	composition_cost: AtomicU32,
	mutable: Arc<Mutex<RemoteObjectExposeContractSharedMutable>>,
	id: Option<usize>,
}


pub(crate) struct RemoteObjectExposeContractSharedMutable {
	expose: bool,
	observe: bool,
	object_state_sink: Option<Arc<Mutex<ObjectStateSinkSharedMutable>>>,
	assigned: bool,
	object_state_stream: Option<Arc<Mutex<ObjectStateStreamSharedMutable>>>,
	live: bool,
}

//LockCheck:  scope=RemoteObject
impl RemoteObject {
	//LockCheck:  call=LocalExchange::object_acquire  call=RemoteObjectInterface::new
	pub(crate) fn new(
		instantiate: bool, local_exchange: Arc<LocalExchangeShared>, remote_exchange_core: Arc<ExchangeCore>, descriptor: &ObjectDescriptor,
		changes: Option<MpscSender<HashSet<ObjectDescriptor>>>,
	) -> Option<RemoteObject> {
		if instantiate {
			let object_core = local_exchange.object_acquire(descriptor);
			let object_interface = RemoteObjectInterface::new(object_core, remote_exchange_core, changes);
			Some(RemoteObject { object_interface })
		} else {
			local_exchange.object_maybe_acquire(descriptor).map(|object_core| {
				let object_interface = RemoteObjectInterface::new(object_core, remote_exchange_core, changes);
				RemoteObject { object_interface }
			})
		}
	}

	//LockCheck:  lock=RemoteObjectInterface::mutable  lock=ObjectStateSink::mutable  call=ObjectStateSink::contract_terminated  unlock=ObjectStateSink::mutable  unlock=RemoteObjectInterface::mutable  call=ObjectCore::link_expose_contract  call=ObjectCore::unlink_expose_contract  call=LocalExchange::object_changed
	pub(crate) fn expose(&self, value: bool, cost: u32) {
		self.object_interface.composition_cost.store(cost, Ordering::Relaxed);

		let mut shared_mutable = self.object_interface.mutable.lock().unwrap();
		if shared_mutable.expose != value {
			shared_mutable.expose = value;
			let dyn_self: Arc<dyn ExposeContract> = self.object_interface.clone();
			if value {
				drop(shared_mutable);
				self.object_interface.object_core.link_expose_contract(dyn_self);
			} else {
				if let Some(object_state_sink) = shared_mutable.object_state_sink.take() {
					object_state_sink.lock().unwrap().contract_terminated();
				}
				drop(shared_mutable);
				self.object_interface.object_core.unlink_expose_contract(&dyn_self);
			}
			self.object_interface.object_core.local_exchange.object_changed(self.object_interface.object_core.descriptor.clone());
		}
	}

	//LockCheck:  lock=RemoteObjectInterface::mutable  lock=ObjectStateStream::mutable  call=ObjectStateStream::contract_terminated  unlock=ObjectStateStream::mutable  unlock=RemoteObjectInterface::mutable  call=ObjectCore::link_observe_contract  call=ObjectCore::unlink_observe_contract  call=LocalExchange::object_changed
	pub(crate) fn observe(&self, value: bool) {
		let mut shared_mutable = self.object_interface.mutable.lock().unwrap();
		if shared_mutable.observe != value {
			shared_mutable.observe = value;
			let dyn_self: Arc<dyn ObserveContract> = self.object_interface.clone();
			if value {
				drop(shared_mutable);
				self.object_interface.object_core.link_observe_contract(dyn_self);
			} else {
				if let Some(object_state_stream) = shared_mutable.object_state_stream.take() {
					object_state_stream.lock().unwrap().contract_terminated();
				};
				drop(shared_mutable);
				self.object_interface.object_core.unlink_observe_contract(&dyn_self);
			}
			self.object_interface.object_core.local_exchange.object_changed(self.object_interface.object_core.descriptor.clone());
			//MISSING-TEST: notification was missing and no test detected it
			//self.object_interface.notify();
		}
	}

	//LockCheck:  call=ObjectCore::is_observed_by_exchange
	pub(crate) fn is_observed_by_this_exchange(&self) -> bool {
		self.object_interface.object_core.is_observed_by_exchange(&self.object_interface.remote_exchange_core)
	}

	//LockCheck:  call=ObjectCore::is_exposed_by_exchange
	pub(crate) fn is_exposed_by_this_exchange(&self) -> bool {
		self.object_interface.object_core.is_exposed_by_exchange(&self.object_interface.remote_exchange_core)
	}

	//LockCheck:  call=ObjectCore::is_assigned_to_exchange_other_than
	pub(crate) fn is_assigned_to_other_exchange(&self) -> bool {
		self.object_interface.object_core.is_assigned_to_exchange_other_than(&self.object_interface.remote_exchange_core)
	}

	//LockCheck:  call=ObjectCore::has_observer_other_than
	pub(crate) fn is_observed_by_other_exchange(&self, check_tags: bool) -> bool {
		self.object_interface.object_core.has_observer_other_than(&self.object_interface.remote_exchange_core, check_tags)
	}

	//LockCheck:  call=ObjectCore::has_exposer_other_than
	pub(crate) fn is_exposed_by_other_exchange(&self, check_tags: bool) -> bool {
		self.object_interface.object_core.has_exposer_other_than(&self.object_interface.remote_exchange_core, check_tags)
	}

	//LockCheck:  lock=RemoteObjectInterface::mutable  call=ObjectStateSink::new  unlock=RemoteObjectInterface::mutable
	pub(crate) fn accept_object_state_sink(&self) -> Option<ObjectStateSink> {
		let mut shared_mutable = self.object_interface.mutable.lock().unwrap();
		if shared_mutable.object_state_sink.is_some() {
			panic!("Double accept")
		}
		if shared_mutable.assigned {
			let object_state_sink =
				ObjectStateSink::new(self.object_interface.object_core.clone(), ExposeContractMutable::RemoteObject(self.object_interface.mutable.clone()));
			shared_mutable.object_state_sink = Some(object_state_sink.drop_guard.shared.clone());
			Some(object_state_sink)
		} else {
			None
		}
	}

	//LockCheck:  lock=RemoteObjectInterface::mutable  call=ObjectStateStream::new  unlock=RemoteObjectInterface::mutable  call=ObjectStateStream::link_to_object
	pub(crate) fn get_object_state_stream(&self) -> Option<ObjectStateStream> {
		let mut shared_mutable = self.object_interface.mutable.lock().unwrap();
		if shared_mutable.object_state_stream.is_some() {
			panic!("Double object_state_stream checkout")
		};
		let ret = if shared_mutable.live {
			let object_state_stream =
				ObjectStateStream::new(self.object_interface.object_core.clone(), ObserveContractMutable::RemoteObject(self.object_interface.mutable.clone()));
			shared_mutable.object_state_stream = Some(object_state_stream.drop_guard.shared.clone());
			Some(object_state_stream)
		} else {
			None
		};
		drop(shared_mutable);
		if let Some(stream) = &ret {
			stream.link_to_object();
		}
		ret
	}

	pub(crate) fn cache_diff(&self, from_version: Option<Vec<i64>>, to_version: Vec<i64>, diff: Arc<Vec<u8>>) {
		let mut diffs = self.object_interface.object_core.diffs.lock().unwrap();
		let diff_key = (from_version, to_version);
		if let std::collections::hash_map::Entry::Vacant(entry) = diffs.entry(diff_key) {
			trace!("Memorizing diff submitted by remote.");
			entry.insert(Arc::new(Mutex::new(Some(diff))));
			while diffs.len() > 1 {
				let key = diffs.keys().min().unwrap().clone();
				diffs.remove(&key);
			}
		};
	}

	pub(crate) fn minimum_cost_in_other_exchanges(&self) -> Option<u32> {
		self.object_interface.object_core.minimum_cost_excluding_exchange(&self.object_interface.remote_exchange_core)
	}

	pub(crate) fn get_diff(
		&self, old_version: Option<&DataVersion>, old_data: Option<&DataBytes>, new_version: &[i64], new_data: &DataBytes,
	) -> Option<DataBytes> {
		let diff_mutex = {
			let mut diffs = self.object_interface.object_core.diffs.lock().unwrap();

			let ret = match diffs.entry((old_version.cloned(), new_version.into())) {
				Entry::Occupied(entry) => entry.get().clone(),
				Entry::Vacant(entry) => entry.insert(Arc::new(Mutex::new(None))).clone(),
			};

			while diffs.len() > 1 {
				let key = diffs.keys().min().unwrap().clone();
				diffs.remove(&key);
			}

			ret
		};
		let mut diff_mutex_lock = diff_mutex.lock().unwrap();

		Some(
			diff_mutex_lock
				.get_or_insert_with(|| {
					let (diff, new_state) = if let Some(old_version) = old_version {
						let state_mutex = {
							let mut diff_states = self.object_interface.object_core.diff_states.lock().unwrap();
							let ret = match diff_states.entry(old_version.clone()) {
								Entry::Occupied(entry) => entry.get().clone(),
								Entry::Vacant(entry) => entry.insert(Arc::new(Mutex::new(None))).clone(),
							};
							while diff_states.len() > 2 {
								let key = diff_states.keys().min().unwrap().clone();
								diff_states.remove(&key);
							}
							ret
						};
						let mut state_mutex_lock = state_mutex.lock().unwrap();
						let state = state_mutex_lock.get_or_insert_with(|| diff::State::new().diff(&[], old_data.unwrap(), 8, 6).unwrap().1);
						let ret = state.diff(old_data.unwrap(), new_data, 8, 6).unwrap();
						let patched = diff::patch(old_data.unwrap(), &ret.0).unwrap();
						if patched.len() != new_data.len() {
							panic!()
						};
						patched.iter().zip(new_data.iter()).for_each(|(a, b)| {
							if a != b {
								panic!()
							}
						});
						ret
					} else {
						let state = diff::State::new();
						state.diff(&[], new_data, 8, 6).unwrap()
					};
					let mut diff_states = self.object_interface.object_core.diff_states.lock().unwrap();
					if !diff_states.contains_key(new_version) {
						diff_states.insert(new_version.into(), Arc::new(Mutex::new(Some(new_state))));
					}
					while diff_states.len() > 2 {
						let key = diff_states.keys().min().unwrap().clone();
						diff_states.remove(&key);
					}
					Arc::new(diff)
				})
				.clone(),
		)
	}
}


//LockCheck:  scope=RemoteObject
impl Drop for RemoteObject {
	//LockCheck:  call=RemoteObjectInterface::destroy
	fn drop(&mut self) {
		self.object_interface.destroy();
	}
}


//LockCheck:  scope=RemoteObjectInterface
impl RemoteObjectInterface {
	//LockCheck:  call=ObjectCore::link_remote_interface
	pub(super) fn new(
		object_core: Arc<ObjectCore>, exchange: Arc<ExchangeCore>, changes: Option<MpscSender<HashSet<ObjectDescriptor>>>,
	) -> Arc<RemoteObjectInterface> {
		let id = changes.map(|sender| object_core.link_remote_interface(sender));
		Arc::new(RemoteObjectInterface {
			remote_exchange_core: exchange,
			object_core,
			composition_cost: AtomicU32::new(0),
			mutable: Arc::new(Mutex::new(RemoteObjectExposeContractSharedMutable {
				object_state_sink: None,
				assigned: false,
				object_state_stream: None,
				live: false,
				observe: false,
				expose: false,
			})),
			id,
		})
	}
}


impl Hash for RemoteObjectInterface {
	fn hash<H: Hasher>(&self, state: &mut H) {
		self.id().hash(state);
	}
}

impl PartialEq for RemoteObjectInterface {
	fn eq(&self, other: &Self) -> bool {
		self.id() == other.id()
	}
}

impl Eq for RemoteObjectInterface {}

//LockCheck:  scope=RemoteObjectInterface
impl RemoteObjectInterface {
	//LockCheck:  call=ObjectCore::unlink_remote_interface  lock=mutable  unlock=mutable  call=ObjectCore::unlink_observe_contract  call=ObjectCore::unlink_expose_contract  call=LocalExchange::object_changed
	pub(super) fn destroy(self: &Arc<Self>) {
		if let Some(id) = self.id {
			self.object_core.unlink_remote_interface(id);
		};
		let (observe, expose) = {
			let shared_mutable = self.mutable.lock().unwrap();
			(shared_mutable.observe, shared_mutable.expose)
		};
		if observe {
			let dyn_self: Arc<dyn ObserveContract> = self.clone();
			self.object_core.unlink_observe_contract(&dyn_self);
			self.object_core.local_exchange.object_changed(self.object_core.descriptor.clone());
		};
		if expose {
			let dyn_self: Arc<dyn ExposeContract> = self.clone();
			self.object_core.unlink_expose_contract(&dyn_self);
			self.object_core.local_exchange.object_changed(self.object_core.descriptor.clone());
		};
		assert_eq!(Arc::strong_count(self), 1);
	}
}

impl std::fmt::Debug for RemoteObjectInterface {
	fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		formatter.write_str(&format!("RemoteObjectInterface:{}", self.object_core.descriptor))
	}
}



impl Contract for RemoteObjectInterface {
	fn id(&self) -> usize {
		(self as *const Self) as usize
	}

	fn exchange_core(&self) -> &Arc<ExchangeCore> {
		&self.remote_exchange_core
	}
}


//LockCheck:  scope=RemoteObjectInterface
impl ObserveContract for RemoteObjectInterface {
	//LockCheck:  lock=mutable  unlock=mutable
	fn initialize_activate(&self, _object_core: &Arc<ObjectCore>) {
		let mut shared_mutable = self.mutable.lock().unwrap();
		shared_mutable.live = true;
		//self.notify();
	}

	//LockCheck:  lock=mutable  unlock=mutable
	fn deinitialize_deactivate(&self, _object_core: &Arc<ObjectCore>) {
		let mut shared_mutable = self.mutable.lock().unwrap();
		shared_mutable.live = false;
		//self.notify();
	}

	fn object(&self) -> bool {
		true
	}
}


//LockCheck:  scope=RemoteObjectInterface
impl ExposeContract for RemoteObjectInterface {
	fn composition_cost(&self) -> u32 {
		self.composition_cost.load(Ordering::Relaxed)
	}

	//LockCheck:  lock=mutable  lock=ObjectStateSink::mutable  call=ObjectStateSink::assign  unlock=ObjectStateSink::mutable  unlock=mutable
	fn assign(&self, _object_core: &Arc<ObjectCore>) -> bool {
		let mut mutable = self.mutable.lock().unwrap();

		if mutable.assigned {
			panic!("Double assign");
		};
		mutable.assigned = true;
		if let Some(object_state_sink) = &mutable.object_state_sink {
			object_state_sink.lock().unwrap().assign();
		}

		//self.notify();
		self.exchange_core().load.fetch_add(self.composition_cost(), Ordering::Relaxed);

		true
	}

	//LockCheck:  lock=mutable  lock=ObjectStateSink::mutable  call=ObjectStateSink::unassign  unlock=ObjectStateSink::mutable  unlock=mutable
	fn unassign(&self, _object_core: &Arc<ObjectCore>) -> bool {
		let mut mutable = self.mutable.lock().unwrap();

		if !mutable.assigned {
			panic!("Double unassign (#1)");
		};
		mutable.assigned = false;
		if let Some(object_state_sink) = &mutable.object_state_sink {
			object_state_sink.lock().unwrap().unassign();
		}

		//self.notify();
		self.exchange_core().load.fetch_sub(self.composition_cost(), Ordering::Relaxed);

		true
	}

	fn object(&self) -> bool {
		true
	}
}

//LockCheck:  scope=RemoteObjectInterface
impl RemoteObjectExposeContractSharedMutable {
	//only mutable part of the sink is dropped here, there is no destructor call
	//LockCheck:
	pub(crate) fn object_state_sink_dropped(&mut self) {
		if self.object_state_sink.is_none() {
			panic!("Double return")
		}
		drop(self.object_state_sink.take());
		//TODO: notify?
	}

	//LockCheck:
	pub(crate) fn object_state_stream_dropped(&mut self) {
		if self.object_state_stream.is_none() {
			panic!("Double return")
		}
		drop(self.object_state_stream.take());
		//TODO: notify?
	}
}