hakuban 0.7.2

Data-object sharing library
Documentation
#![allow(clippy::mutex_atomic)] //because I want the value AND the locking. actually, rethink this (TODO)

use std::{collections::{HashMap, HashSet}, hash::{Hash, Hasher}, sync::{atomic::{AtomicU32, Ordering}, Arc, Mutex}};

use super::core::TagCore;
use crate::{channel::MpscSender, contract::Contract, exchange::{core::ExchangeCore, local::LocalExchangeShared}, expose_contract::ExposeContract, object::{core::ObjectCore, state_sink::{ExposeContractMutable, ObjectStateSink, ObjectStateSinkSharedMutable}, ObjectStateStream, ObjectStateStreamSharedMutable, ObserveContractMutable}, observe_contract::ObserveContract, ObjectDescriptor, TagDescriptor};

pub(crate) struct RemoteTag {
	tag_interface: Arc<RemoteTagInterface>,
}


struct RemoteTagInterface {
	exchange_core: Arc<ExchangeCore>,
	tag_core: Arc<TagCore>,
	cost: AtomicU32,
	mutable: Arc<Mutex<RemoteTagExposeContractSharedMutable>>,
	object_changes: Option<MpscSender<HashSet<ObjectDescriptor>>>,
}

pub(crate) struct RemoteTagExposeContractSharedMutable {
	expose: bool,
	observe: bool,
	pub object_state_sinks: HashMap<ObjectDescriptor, (bool, Arc<ObjectCore>, Option<Arc<Mutex<ObjectStateSinkSharedMutable>>>)>,
	pub object_state_stream: HashMap<ObjectDescriptor, (bool, Arc<ObjectCore>, Option<Arc<Mutex<ObjectStateStreamSharedMutable>>>)>,
}


//LockCheck:  scope=RemoteTag
impl RemoteTag {
	//LockCheck:  call=LocalExchange::tag_acquire  call=RemoteTagInterface::new
	pub(crate) fn new(
		local_exchange: Arc<LocalExchangeShared>, exchange_core: Arc<ExchangeCore>, descriptor: &TagDescriptor,
		changes: Option<MpscSender<HashSet<ObjectDescriptor>>>,
	) -> RemoteTag {
		let tag_core = local_exchange.tag_acquire(descriptor);
		RemoteTag { tag_interface: RemoteTagInterface::new(tag_core, exchange_core, 0, changes) }
	}

	//LockCheck:  lock=RemoteTagInterface::mutable  lock=ObjectStateSink::mutable  call=ObjectStateSink::contract_terminated  unlock=ObjectStateSink::mutable  unlock=RemoteTagInterface::mutable  call=TagCore::link_tag_expose_contract  call=TagCore::unlink_tag_expose_contract
	pub(crate) fn expose(&self, value: bool, cost: u32) {
		self.tag_interface.cost.store(cost, Ordering::Relaxed);
		let mut shared_mutable = self.tag_interface.mutable.lock().unwrap();
		if shared_mutable.expose != value {
			shared_mutable.expose = value;
			let dyn_self: Arc<dyn ExposeContract> = self.tag_interface.clone();
			if value {
				drop(shared_mutable);
				self.tag_interface.tag_core.link_tag_expose_contract(dyn_self);
			} else {
				for (_, _, object_state_sink) in shared_mutable.object_state_stream.values() {
					if let Some(object_state_sink) = object_state_sink {
						object_state_sink.lock().unwrap().contract_terminated();
					}
				}
				drop(shared_mutable);
				self.tag_interface.tag_core.unlink_tag_expose_contract(&dyn_self);
			}
			//TODO: do we need to notify anybody?
		}
	}

	//LockCheck:  lock=RemoteTagInterface::mutable  lock=ObjectStateStream::mutable  call=ObjectStateStream::contract_terminated  unlock=ObjectStateStream::mutable  unlock=RemoteTagInterface::mutable  call=TagCore::link_tag_observe_contract  call=TagCore::unlink_tag_observe_contract
	pub(crate) fn observe(&self, value: bool) {
		let mut shared_mutable = self.tag_interface.mutable.lock().unwrap();
		if shared_mutable.observe != value {
			shared_mutable.observe = value;
			let dyn_self: Arc<dyn ObserveContract> = self.tag_interface.clone();
			if value {
				drop(shared_mutable);
				self.tag_interface.tag_core.link_tag_observe_contract(dyn_self);
			} else {
				for (_, _, object_state_stream) in shared_mutable.object_state_stream.values() {
					if let Some(object_state_stream) = object_state_stream {
						object_state_stream.lock().unwrap().contract_terminated();
					}
				}
				drop(shared_mutable);
				self.tag_interface.tag_core.unlink_tag_observe_contract(&dyn_self);
			}
			//TODO: do we need to notify anybody?
		}
	}

	pub(crate) fn minimum_cost_in_other_exchanges(&self) -> Option<u32> {
		self.tag_interface.tag_core.minimum_cost_excluding_exchange(&self.tag_interface.exchange_core)
	}

	//LockCheck:  lock=RemoteTagInterface::mutable  unlock=RemoteTagInterface::mutable
	pub(crate) fn is_observed_by_this_exchange(&self) -> bool {
		self.tag_interface.mutable.lock().unwrap().observe
	}

	//LockCheck:  lock=RemoteTagInterface::mutable  unlock=RemoteTagInterface::mutable
	pub(crate) fn is_exposed_by_this_exchange(&self) -> bool {
		self.tag_interface.mutable.lock().unwrap().expose
	}

	//LockCheck:  call=TagCore::has_observer_other_than
	pub(crate) fn is_observed_by_other_exchange(&self) -> bool {
		self.tag_interface.tag_core.has_observer_other_than(&self.tag_interface.exchange_core)
	}

	//LockCheck:  call=TagCore::has_exposer_other_than
	pub(crate) fn is_exposed_by_other_exchange(&self) -> bool {
		self.tag_interface.tag_core.has_exposer_other_than(&self.tag_interface.exchange_core)
	}

	//LockCheck:  lock=TagCoreInterface::mutable  call=ObjectStateSink::new  unlock=TagCoreInterface::mutable
	pub(crate) fn accept_object_state_sink(&self, object_descriptor: &ObjectDescriptor) -> Option<ObjectStateSink> {
		let mut shared_mutable = self.tag_interface.mutable.lock().unwrap();
		if let Some((assigned, object_core, object_state_sink)) = shared_mutable.object_state_sinks.get_mut(object_descriptor) {
			if object_state_sink.is_some() {
				panic!("Double accept")
			}
			if *assigned {
				let new_object_state_sink = ObjectStateSink::new(object_core.clone(), ExposeContractMutable::RemoteTag(self.tag_interface.mutable.clone()));
				*object_state_sink = Some(new_object_state_sink.drop_guard.shared.clone());
				Some(new_object_state_sink)
			} else {
				None
			}
		} else {
			None
		}
	}

	//LockCheck:  lock=TagCoreInterface::mutable  call=ObjectStateStream::new  unlock=TagCoreInterface::mutable  call=ObjectStateStream::link_to_object
	pub(crate) fn get_object_state_stream(&self, object_descriptor: &ObjectDescriptor) -> Option<ObjectStateStream> {
		let mut shared_mutable = self.tag_interface.mutable.lock().unwrap();

		let ret = if let Some((live, object_core, object_state_stream)) = shared_mutable.object_state_stream.get_mut(object_descriptor) {
			if object_state_stream.is_some() {
				panic!("Double object_state_stream checkout")
			}
			if *live {
				let new_object_state_stream =
					ObjectStateStream::new(object_core.clone(), ObserveContractMutable::RemoteTag(self.tag_interface.mutable.clone()));
				*object_state_stream = Some(new_object_state_stream.drop_guard.shared.clone());
				Some(new_object_state_stream)
			} else {
				None
			}
		} else {
			None
		};
		drop(shared_mutable);
		if let Some(stream) = &ret {
			stream.link_to_object();
		}
		ret
	}
}

//LockCheck:  scope=RemoteTag
impl Drop for RemoteTag {
	//LockCheck:  call=RemoteTagInterface::destroy
	fn drop(&mut self) {
		self.tag_interface.destroy();
	}
}

//LockCheck:  scope=RemoteTagInterface
impl RemoteTagInterface {
	fn id(&self) -> usize {
		(self as *const Self) as usize
	}

	//LockCheck:
	pub(super) fn new(
		tag_core: Arc<TagCore>, exchange_core: Arc<ExchangeCore>, cost: u32, changes: Option<MpscSender<HashSet<ObjectDescriptor>>>,
	) -> Arc<RemoteTagInterface> {
		Arc::new(RemoteTagInterface {
			exchange_core,
			tag_core,
			cost: AtomicU32::new(cost),
			mutable: Arc::new(Mutex::new(RemoteTagExposeContractSharedMutable {
				object_state_sinks: HashMap::new(),
				object_state_stream: HashMap::new(),
				expose: false,
				observe: false,
			})),
			object_changes: changes,
		})
	}

	//LockCheck:  lock=mutable  unlock=mutable  call=TagCore::unlink_tag_observe_contract  call=TagCore::unlink_tag_expose_contract
	pub(super) fn destroy(self: &Arc<Self>) {
		let (observe, expose) = {
			let shared_mutable = self.mutable.lock().unwrap();
			(shared_mutable.observe, shared_mutable.expose)
		};
		if observe {
			let dyn_self: Arc<dyn ObserveContract> = self.clone();
			self.tag_core.unlink_tag_observe_contract(&dyn_self);
		};
		if expose {
			let dyn_self: Arc<dyn ExposeContract> = self.clone();
			self.tag_core.unlink_tag_expose_contract(&dyn_self);
		};
		assert_eq!(Arc::strong_count(self), 1);
	}

	pub(super) fn object_notify(&self, object_core: &Arc<ObjectCore>) {
		if let Some(changes) = &self.object_changes {
			changes.send(object_core.descriptor.clone());
		}
	}
}


impl Hash for RemoteTagInterface {
	fn hash<H: Hasher>(&self, state: &mut H) {
		self.id().hash(state);
	}
}

impl PartialEq for RemoteTagInterface {
	fn eq(&self, other: &Self) -> bool {
		self.id() == other.id()
	}
}

impl Eq for RemoteTagInterface {}


impl std::fmt::Debug for RemoteTagInterface {
	fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		formatter.write_str(&format!("RemoteTagInterface:{}", self.tag_core.descriptor))
	}
}


impl Contract for RemoteTagInterface {
	fn id(&self) -> usize {
		(self as *const Self) as usize
	}

	fn exchange_core(&self) -> &Arc<ExchangeCore> {
		&self.exchange_core
	}
}

//LockCheck:  scope=RemoteTagInterface
impl ObserveContract for RemoteTagInterface {
	//LockCheck:  lock=mutable  lock=ObjectStateStream::mutable  call=ObjectStateStream::initialize_activate  unlock=ObjectStateStream::mutable  unlock=mutable
	fn initialize_activate(&self, object_core: &Arc<ObjectCore>) {
		let mut mutable = self.mutable.lock().unwrap();

		if let Some((live, _object_core, object_state_stream)) = mutable.object_state_stream.get_mut(&object_core.descriptor) {
			if *live {
				panic!("Double activate");
			};
			*live = true;
			if let Some(object_state_stream) = object_state_stream {
				object_state_stream.lock().unwrap().initialize_activate();
			}
		} else {
			mutable.object_state_stream.insert(object_core.descriptor.clone(), (true, object_core.clone(), None));
		}

		self.object_notify(object_core);
	}

	//LockCheck:  lock=mutable  lock=ObjectStateStream::mutable  call=ObjectStateStream::deinitialize_deactivate  unlock=ObjectStateStream::mutable  unlock=mutable
	fn deinitialize_deactivate(&self, object_core: &Arc<ObjectCore>) {
		let mut mutable = self.mutable.lock().unwrap();

		if let Some((live, _object_core, object_state_stream)) = mutable.object_state_stream.get_mut(&object_core.descriptor) {
			if !*live {
				panic!("Double deactivate (#1)");
			};
			*live = false;
			if let Some(object_state_stream) = object_state_stream {
				object_state_stream.lock().unwrap().deinitialize_deactivate();
			} else {
				mutable.object_state_stream.remove(&object_core.descriptor);
			}
		} else {
			panic!("Double deactivate (#2)");
		}

		self.object_notify(object_core);
	}

	fn object(&self) -> bool {
		false
	}
}

//LockCheck:  scope=RemoteTagInterface
impl ExposeContract for RemoteTagInterface {
	//LockCheck:  lock=mutable  lock=ObjectStateSink::mutable  call=ObjectStateSink::assign  unlock=ObjectStateSink::mutable  unlock=mutable
	fn assign(&self, object_core: &Arc<ObjectCore>) -> bool {
		let mut mutable = self.mutable.lock().unwrap();

		if let Some((assigned, _object_core, object_state_sink)) = mutable.object_state_sinks.get_mut(&object_core.descriptor) {
			if *assigned {
				panic!("Double assign");
			};
			*assigned = true;
			if let Some(object_state_sink) = object_state_sink {
				object_state_sink.lock().unwrap().assign();
			}
		} else {
			mutable.object_state_sinks.insert(object_core.descriptor.clone(), (true, object_core.clone(), None));
		}

		self.object_notify(object_core);
		self.exchange_core().load.fetch_add(self.composition_cost(), Ordering::Relaxed);

		true
	}

	//LockCheck:  lock=mutable  lock=ObjectStateSink::mutable  call=ObjectStateSink::unassign  unlock=ObjectStateSink::mutable  unlock=mutable
	fn unassign(&self, object_core: &Arc<ObjectCore>) -> bool {
		let mut mutable = self.mutable.lock().unwrap();

		if let Some((assigned, _object_core, object_state_sink)) = mutable.object_state_sinks.get_mut(&object_core.descriptor) {
			if !*assigned {
				panic!("Double unassign (#1)");
			};
			*assigned = false;
			if let Some(object_state_sink) = object_state_sink {
				object_state_sink.lock().unwrap().unassign();
			} else {
				mutable.object_state_sinks.remove(&object_core.descriptor);
			}
		} else {
			panic!("Double unassign (#2)");
		}

		self.object_notify(object_core);
		self.exchange_core().load.fetch_sub(self.composition_cost(), Ordering::Relaxed);

		true
	}

	fn composition_cost(&self) -> u32 {
		self.cost.load(Ordering::Relaxed)
	}

	fn object(&self) -> bool {
		false
	}
}


//LockCheck:  scope=RemoteTagInterface
impl RemoteTagExposeContractSharedMutable {
	//LockCheck:
	pub(crate) fn object_state_sink_dropped(&mut self, object_core: &Arc<ObjectCore>) {
		if let Some((assigned, _object_core, object_state_sink)) = self.object_state_sinks.get_mut(&object_core.descriptor) {
			*object_state_sink = None;
			if *assigned {
			} else {
				self.object_state_sinks.remove(&object_core.descriptor);
			}
		//mutable.wake();
		} else {
			panic!("Drop of unknown object_state_sink");
		}
	}

	//LockCheck:
	pub(crate) fn object_state_stream_dropped(&mut self, object_core: &Arc<ObjectCore>) {
		if let Some((live, _object_core, object_state_stream)) = self.object_state_stream.get_mut(&object_core.descriptor) {
			*object_state_stream = None;
			if *live {
			} else {
				self.object_state_stream.remove(&object_core.descriptor);
			}
		//mutable.wake();
		} else {
			panic!("Drop of unknown object_state_stream");
		}
	}
}