hakuban 0.7.2

Data-object sharing library
Documentation
use std::{cmp::max, collections::{HashMap, HashSet}, hash::{Hash, Hasher}, sync::{atomic::AtomicUsize, Arc, Mutex, RwLock}};

use instant::Instant;
use log::trace;

use super::{DataBytes, DataSynchronized, DataVersion, ObjectState, ObjectStateStreamSharedMutable};
use crate::{channel::MpscSender, descriptor::ObjectDescriptor, diff, exchange::{core::ExchangeCore, LocalExchangeShared}, expose_contract::ExposeContract, observe_contract::ObserveContract, tag::core::TagCore};



type VersionFromTo = (Option<DataVersion>, DataVersion);


// Object Core is responsible for:
// - determining if the object should be active (== be updated by some exposer)
// - (un-)assigning object to some Contract, if the object is active
// - maintain synchronization state (based on input from ObjectStateSinks)
// - keeping the latest version of the object data, and notify ObjectStateStreams of changes
// - receiving desired expose parameters from Observe Contracts, summarize them, and deliver (notify in case of changs) them to ObjectStateSinks
// - being a central point for storage of object-related cached data (diffs)

// Changes propagation:
// - object_state_sink: exposers (object expose contract, tag expose contract, remote object interface, remote tag interface?)
// - state change: observers
// - liveness: observers

// Notifications:
// - local exchange gets creation notification from builder (easier to avoid deadlocks that way)
// - destruction doesn't trigger any notification (the action which tiggered destruction should have triggered notification already)

//TODO: depub everything, only allow functions
//TODO: consider splitting mutable into links and state again
pub(crate) struct ObjectCore {
	pub local_exchange: Arc<LocalExchangeShared>,
	pub descriptor: ObjectDescriptor,
	tags: Vec<Arc<TagCore>>,
	mutable: RwLock<ObjectCoreMutable>,
	pub(super) object_state_sinks_sequence: AtomicUsize,
	pub(super) object_state_streams_sequence: AtomicUsize,
	// cache
	pub(super) diffs: Mutex<HashMap<VersionFromTo, Arc<Mutex<Option<DataBytes>>>>>,
	pub(super) diff_states: Mutex<HashMap<DataVersion, Arc<Mutex<Option<diff::State>>>>>,
	// to be removed
	pub(super) remote_interfaces_sequence: AtomicUsize,
}

pub(crate) struct ObjectCoreMutable {
	observe_contracts: HashMap<Arc<ExchangeCore>, HashSet<Arc<dyn ObserveContract>>>,
	object_observe_contract_count: usize,
	expose_contracts: HashMap<Arc<ExchangeCore>, HashSet<Arc<dyn ExposeContract>>>,
	object_expose_contract_count: usize,
	//TODO: move to separate mutex, maybe
	object_state_streams: HashMap<usize, Arc<Mutex<ObjectStateStreamSharedMutable>>>,
	synchronized_object_state_sinks: HashSet<usize>,
	// active == should_be_produced
	active: bool,
	live: bool,
	assigned_expose_contract: Option<Arc<dyn ExposeContract>>,
	state: Option<ObjectState<DataBytes>>,
	// to be removed
	remote_interfaces: HashMap<usize, MpscSender<HashSet<ObjectDescriptor>>>,
}


//LockCheck:  scope=ObjectCoreMutable
impl ObjectCoreMutable {
	//LockCheck:  call=expose_contract_assign  call=expose_contract_unassign
	fn recalculate_active(&mut self, object_core: &Arc<ObjectCore>) -> bool {
		let previous_active = self.active;

		// at least one direct, except if all observers are on a single non-local-exchange and the exchange has its own exposer
		self.active = (self.object_observe_contract_count > 0 || (self.object_expose_contract_count > 0 && !self.observe_contracts.is_empty()))
			&& !(self.observe_contracts.len() == 1
				&& !self.observe_contracts.keys().next().unwrap().local
				&& self.expose_contracts.contains_key(self.observe_contracts.keys().next().unwrap()));

		if self.active && self.assigned_expose_contract.is_none() {
			self.expose_contract_assign(object_core);
		} else if !self.active && self.assigned_expose_contract.is_some() {
			self.expose_contract_unassign(object_core.clone(), self.assigned_expose_contract.as_ref().unwrap().clone());
		};

		self.active != previous_active
	}

	//LockCheck:  call=ObjectObserveContract::initialize_activate  call=TagObserveContract::initialize_activate  call=RemoteObjectInterface::initialize_activate  call=RemoteTagInterface::initialize_activate  call=ObjectObserveContract::deinitialize_deactivate  call=TagObserveContract::deinitialize_deactivate  call=RemoteObjectInterface::deinitialize_deactivate  call=RemoteTagInterface::deinitialize_deactivate
	fn recalculate_live(&mut self, object_core: &Arc<ObjectCore>) {
		let previous_live = self.live;

		self.live = self.active && self.state.is_some();
		match (previous_live, self.live) {
			(false, false) => (),
			(false, true) => {
				self.observe_contracts.values().for_each(|contracts_set| contracts_set.iter().for_each(|contract| contract.initialize_activate(object_core)));
			}
			(true, false) => {
				self.observe_contracts
					.values()
					.for_each(|contracts_set| contracts_set.iter().for_each(|contract| contract.deinitialize_deactivate(object_core)));
			}
			(true, true) => (),
		}
	}

	//LockCheck:  call=ObjectExposeContract::assign  call=TagExposeContract::assign  call=RemoteObjectInterface::assign  call=RemoteTagInterface::assign
	fn expose_contract_assign(&mut self, object_core: &Arc<ObjectCore>) {
		//TODO if all observe contracts are on a single exchange, and that exchange also has an expose contract - strongly prefer composing there
		loop {
			let mut best_expected_score = (true, u32::MAX); // (downstream, expected load)
			let mut best_expose_contract = None;

			for expose_contracts in self.expose_contracts.values() {
				for candidate_expose_contract in expose_contracts {
					if let Some(load) = candidate_expose_contract.expected_proportional_load_with() {
						let candidate_score = (!candidate_expose_contract.exchange_core().upstream, load);
						if candidate_score < best_expected_score {
							best_expected_score = candidate_score;
							best_expose_contract = Some(candidate_expose_contract.clone() as Arc<dyn ExposeContract>);
						}
					}
				}
			}

			//TODO: unassign before assignment?
			if let Some(selected_expose_contract) = &best_expose_contract {
				if selected_expose_contract.assign(object_core) {
					self.assigned_expose_contract = best_expose_contract;
					break;
				};
			} else {
				break;
			}
		}
	}

	//LockCheck:   call=ObjectExposeContract::assign  call=TagExposeContract::assign  call=RemoteObjectInterface::assign  call=RemoteTagInterface::assign  call=expose_contract_unassign
	fn potentially_reassign_to(&mut self, object_core: &Arc<ObjectCore>, candidate_contract: Arc<dyn ExposeContract>) {
		#[allow(clippy::collapsible_if, clippy::collapsible_else_if)] //because it's more readable uncollapsed
		if let Some(current_expose_contract) = &self.assigned_expose_contract {
			//TODO: something is wrong here
			let current_score = (!current_expose_contract.exchange_core().upstream, current_expose_contract.expected_proportional_load_without());
			let candidate_score = (!candidate_contract.exchange_core().upstream, candidate_contract.expected_proportional_load_without());
			if current_score > candidate_score {
				if candidate_contract.assign(object_core) {
					self.expose_contract_unassign(object_core.clone(), current_expose_contract.clone());
					self.assigned_expose_contract = Some(candidate_contract);
				}
			};
		} else {
			if candidate_contract.assign(object_core) {
				self.assigned_expose_contract = Some(candidate_contract);
			}
		}
	}

	//TODO: no Arc, this is stupid
	//LockCheck:  call=ObjectExposeContract::unassign  call=TagExposeContract::unassign  call=RemoteObjectInterface::unassign  call=RemoteTagInterface::unassign
	fn expose_contract_unassign(&mut self, object_core: Arc<ObjectCore>, contract_to_unassign: Arc<dyn ExposeContract>) {
		contract_to_unassign.unassign(&object_core);
		self.assigned_expose_contract = None;
	}

	//TODO: &Synchronized ?
	//LockCheck:
	fn synchronize(&mut self, object_state_sink_synchronized: DataSynchronized, object_state_sink_id: usize) -> DataSynchronized {
		//TODO: check if sync timestamp is not in the future
		match object_state_sink_synchronized {
			DataSynchronized::LastAt(new_timestamp) => {
				self.synchronized_object_state_sinks.remove(&object_state_sink_id);
				if self.synchronized_object_state_sinks.is_empty() {
					if let Some(DataSynchronized::LastAt(old_timestamp)) = self.state.as_ref().map(|state| &state.synchronized) {
						DataSynchronized::LastAt(max(new_timestamp, *old_timestamp))
					} else {
						DataSynchronized::LastAt(new_timestamp)
					}
				} else {
					DataSynchronized::Now
				}
			}
			DataSynchronized::Now => {
				self.synchronized_object_state_sinks.insert(object_state_sink_id);
				DataSynchronized::Now
			}
		}
	}

	//TODO: &Synchronized
	//LockCheck:
	fn desynchronize(&mut self, object_state_sink_id: usize) -> DataSynchronized {
		self.synchronized_object_state_sinks.remove(&object_state_sink_id);
		if self.synchronized_object_state_sinks.is_empty() {
			if let Some(DataSynchronized::LastAt(old_timestamp)) = self.state.as_ref().map(|state| &state.synchronized) {
				DataSynchronized::LastAt(max(Instant::now(), *old_timestamp))
			} else {
				DataSynchronized::LastAt(Instant::now())
			}
		} else {
			DataSynchronized::Now
		}
	}

	//TODO: why is only version a &
	//LockCheck:  call=synchronize
	pub(crate) fn state_set(&mut self, object_state: ObjectState<DataBytes>, object_state_sink_id: usize) -> (bool, bool) {
		let old_synchronized = self.state.as_ref().map(|state| state.synchronized.clone());
		let new_synchronized = self.synchronize(object_state.synchronized.clone(), object_state_sink_id);

		let (state_changed, state_initialized) = if self.state.is_none() {
			self.state = Some(object_state);
			(true, true)
		} else if object_state.version > self.state.as_mut().unwrap().version {
			self.state = Some(object_state);
			(true, false)
		} else if old_synchronized.as_ref() != Some(&new_synchronized) {
			let old_state = self.state.take().unwrap();
			self.state = Some(old_state.with_synchronized(new_synchronized));
			(true, false)
		} else {
			(false, false)
		};

		(state_changed, state_initialized)
	}
}

impl Hash for ObjectCore {
	fn hash<H: Hasher>(&self, state: &mut H) {
		self.descriptor.hash(state);
	}
}

impl PartialEq for ObjectCore {
	fn eq(&self, other: &Self) -> bool {
		self.descriptor == other.descriptor
	}
}

impl Eq for ObjectCore {}

//LockCheck:  scope=ObjectCore
impl ObjectCore {
	//LockCheck:  call=TagCore::object_insert
	pub(crate) fn new(local_exchange: Arc<LocalExchangeShared>, descriptor: ObjectDescriptor, tags: Vec<Arc<TagCore>>) -> Arc<ObjectCore> {
		let object_core = Arc::new(ObjectCore {
			local_exchange,
			descriptor,
			object_state_sinks_sequence: AtomicUsize::new(1),
			object_state_streams_sequence: AtomicUsize::new(1),
			mutable: RwLock::new(ObjectCoreMutable {
				observe_contracts: HashMap::new(),
				expose_contracts: HashMap::new(),
				object_state_streams: HashMap::new(),
				active: false,
				assigned_expose_contract: None,
				synchronized_object_state_sinks: HashSet::new(),
				state: None,
				live: false,
				object_observe_contract_count: 0,
				object_expose_contract_count: 0,
				remote_interfaces: HashMap::new(),
			}),
			tags: tags.clone(),
			diffs: Mutex::new(HashMap::new()),
			diff_states: Mutex::new(HashMap::new()),
			remote_interfaces_sequence: AtomicUsize::new(1),
		});
		for tag in tags.iter() {
			tag.object_insert(&object_core);
		}
		object_core
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn link_remote_interface(self: &Arc<Self>, notification_sender: MpscSender<HashSet<ObjectDescriptor>>) -> usize {
		let id = self.remote_interfaces_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
		let mut links = self.mutable.write().unwrap();
		links.remote_interfaces.insert(id, notification_sender);
		id
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn unlink_remote_interface(self: &Arc<Self>, id: usize) {
		let mut links = self.mutable.write().unwrap();
		links.remote_interfaces.remove(&id);
	}

	//LockCheck:  lock=mutable  call=ObjectObserveContract::initialize_activate  call=TagObserveContract::initialize_activate  call=RemoteObjectInterface::initialize_activate  call=RemoteTagInterface::initialize_activate  call=ObjectCoreMutable::recalculate_active  call=ObjectCoreMutable::recalculate_live  unlock=mutable
	pub(crate) fn link_observe_contract(self: &Arc<Self>, contract: Arc<dyn ObserveContract>) {
		let mut links = self.mutable.write().unwrap();
		if contract.object() {
			links.object_observe_contract_count += 1
		};
		if links.live {
			contract.clone().initialize_activate(self)
		};
		links.observe_contracts.entry(contract.exchange_core().clone()).or_insert_with(HashSet::new).insert(contract);
		links.recalculate_active(self);
		links.recalculate_live(self);
		links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
	}

	//LockCheck:  lock=mutable  call=ObjectCoreMutable::potentially_reassign_to  call=ObjectCoreMutable::recalculate_active  call=ObjectCoreMutable::recalculate_live  unlock=mutable
	pub(crate) fn link_expose_contract(self: &Arc<Self>, contract: Arc<dyn ExposeContract>) {
		let mut links = self.mutable.write().unwrap();
		if contract.object() {
			links.object_expose_contract_count += 1
		};
		links.expose_contracts.entry(contract.exchange_core().clone()).or_insert_with(HashSet::new).insert(contract.clone());
		links.potentially_reassign_to(self, contract); //TODO: assign without active may happen here
		links.recalculate_active(self);
		links.recalculate_live(self);
		links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
	}

	//LockCheck:  lock=mutable  call=ObjectCoreMutable::expose_contract_unassign  call=ObjectCoreMutable::recalculate_active  call=ObjectCoreMutable::recalculate_live  unlock=mutable
	pub(crate) fn unlink_expose_contract(self: &Arc<Self>, contract: &Arc<dyn ExposeContract>) {
		let mut links = self.mutable.write().unwrap();
		if contract.object() {
			links.object_expose_contract_count -= 1
		};
		let exchange_expose_contracts = links.expose_contracts.get_mut(contract.exchange_core()).unwrap();
		exchange_expose_contracts.remove(contract);
		if exchange_expose_contracts.is_empty() {
			links.expose_contracts.remove(contract.exchange_core());
		};
		if links.assigned_expose_contract.as_ref().map(|contract| contract.id()) == Some(contract.id()) {
			links.expose_contract_unassign(self.clone(), contract.clone());
		};
		links.recalculate_active(self);
		links.recalculate_live(self);
		links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
	}

	//LockCheck:  lock=mutable  call=ObjectCoreMutable::recalculate_active  call=ObjectCoreMutable::recalculate_live  unlock=mutable
	pub(crate) fn unlink_observe_contract(self: &Arc<Self>, contract: &Arc<dyn ObserveContract>) {
		let mut links = self.mutable.write().unwrap();
		if contract.object() {
			links.object_observe_contract_count -= 1
		};
		let exchange_observe_contracts = links.observe_contracts.get_mut(contract.exchange_core()).unwrap();
		exchange_observe_contracts.remove(contract);
		if exchange_observe_contracts.is_empty() {
			links.observe_contracts.remove(contract.exchange_core());
		};
		links.recalculate_active(self);
		links.recalculate_live(self);
		links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn state_get(&self) -> Option<ObjectState<DataBytes>> {
		let locked_state = self.mutable.read().unwrap();
		locked_state.state.clone()
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn add_object_state_stream(&self, object_state_stream_id: usize, object_state_stream: Arc<Mutex<ObjectStateStreamSharedMutable>>) {
		let mut mutable = self.mutable.write().unwrap();
		mutable.object_state_streams.insert(object_state_stream_id, object_state_stream);
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn remove_object_state_stream(&self, object_state_stream_id: usize) {
		let mut mutable = self.mutable.write().unwrap();
		mutable.object_state_streams.remove(&object_state_stream_id);
	}

	//TODO: add/remove_object_state_sink_params_stream

	//LockCheck:  lock=mutable  call=ObjectCoreMutable::desynchronize  unlock=mutable  lock=ObjectStateStream::mutable  call=ObjectStateStream::object_changed  unlock=ObjectStateStream::mutable
	pub(crate) fn desynchronize(&self, object_state_sink_id: usize) {
		let mut links = self.mutable.write().unwrap();
		//TODO: assign in links instead
		let new_synchronized = links.desynchronize(object_state_sink_id);
		if links.state.is_some() && links.state.as_ref().map(|state| &state.synchronized) != Some(&new_synchronized) {
			let previous_state = links.state.take().unwrap();
			//TODO: uh, destructure or something, stop the clones
			links.state = Some(previous_state.with_synchronized(new_synchronized));
			let streams_to_notify: Vec<Arc<Mutex<ObjectStateStreamSharedMutable>>> = links.object_state_streams.values().map(Arc::clone).collect();
			links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
			drop(links);
			for object_state_stream in streams_to_notify {
				object_state_stream.lock().unwrap().object_changed();
			}
		}
	}

	//LockCheck:  lock=mutable  call=ObjectCoreMutable::state_set  call=ObjectCoreMutable::recalculate_live  unlock=mutable  lock=ObjectStateStream::mutable  call=ObjectStateStream::object_changed  unlock=ObjectStateStream::mutable
	pub(super) fn state_set(self: &Arc<Self>, object_state: ObjectState<DataBytes>, object_state_sink_id: usize) {
		let mut links = self.mutable.write().unwrap();
		let (state_changed, state_initialized) = links.state_set(object_state, object_state_sink_id);
		trace!(
			"New object state: {:?}  state_changed:{:?}  state_initialized: {:?}  synchronized: {:?}",
			self.descriptor,
			state_changed,
			state_initialized,
			links.state.as_ref().map(|x| &x.synchronized)
		);
		links.recalculate_live(self);

		if state_changed {
			let streams_to_notify: Vec<Arc<Mutex<ObjectStateStreamSharedMutable>>> = links.object_state_streams.values().map(Arc::clone).collect();
			links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
			drop(links);
			for object_state_stream in streams_to_notify {
				object_state_stream.lock().unwrap().object_changed();
			}
		}
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(super) fn has_observer_other_than(self: &Arc<Self>, exchange: &Arc<ExchangeCore>, check_tags: bool) -> bool {
		let object_links = self.mutable.read().unwrap();
		if check_tags {
			object_links.observe_contracts.keys().any(|contract_exchange| contract_exchange != exchange)
		} else {
			object_links
				.observe_contracts
				.iter()
				.any(|(contract_exchange, contracts)| contract_exchange != exchange && contracts.iter().any(|contract| contract.object()))
		}
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(super) fn has_exposer_other_than(self: &Arc<Self>, exchange: &Arc<ExchangeCore>, check_tags: bool) -> bool {
		let object_links = self.mutable.read().unwrap();
		if check_tags {
			object_links.expose_contracts.keys().any(|contract_exchange| contract_exchange != exchange)
		} else {
			object_links
				.expose_contracts
				.iter()
				.any(|(contract_exchange, contracts)| contract_exchange != exchange && contracts.iter().any(|contract| contract.object()))
		}
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(super) fn is_assigned_to_exchange_other_than(self: &Arc<Self>, exchange: &Arc<ExchangeCore>) -> bool {
		let object_links = self.mutable.read().unwrap();
		if let Some(ref expose_contract) = object_links.assigned_expose_contract {
			expose_contract.exchange_core() != exchange
		} else {
			false
		}
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(super) fn is_exposed_by_exchange(self: &Arc<Self>, exchange: &Arc<ExchangeCore>) -> bool {
		let object_links = self.mutable.read().unwrap();
		object_links.expose_contracts.contains_key(exchange)
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(super) fn is_observed_by_exchange(self: &Arc<Self>, exchange: &Arc<ExchangeCore>) -> bool {
		let object_links = self.mutable.read().unwrap();
		object_links.observe_contracts.contains_key(exchange)
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(super) fn minimum_cost_excluding_exchange(self: &Arc<Self>, exchange: &Arc<ExchangeCore>) -> Option<u32> {
		let object_links = self.mutable.read().unwrap();

		let mut lowest_cost = None;

		for expose_contracts in object_links.expose_contracts.values() {
			for expose_contract in expose_contracts {
				if expose_contract.exchange_core() != exchange {
					let cost = expose_contract.composition_cost();
					if lowest_cost.is_none() || cost < lowest_cost.unwrap() {
						lowest_cost = Some(cost);
					}
				}
			}
		}

		lowest_cost
	}
}

//LockCheck:  scope=ObjectCore
impl Drop for ObjectCore {
	//LockCheck:  call=LocalExchange::object_release
	fn drop(&mut self) {
		self.local_exchange.object_release(&self.descriptor);
	}
}


impl std::fmt::Debug for ObjectCore {
	fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		if fmt.alternate() {
			let mutable = self.mutable.read().unwrap();
			fmt.write_fmt(format_args!(
				"│ Object: {:?}\n│ Active: {:?}  Live: {:?}\n│ Observe contracts: {:?}\n│ Expose contracts: {:?}\n│ Tags: {:?}\n│ Assigned: {:?}\n│ Diffs stored: {}  diff states stored: {}\n",
				self.descriptor,
				mutable.active,
				mutable.live,
				mutable.observe_contracts.keys(),
				mutable.expose_contracts.keys(),
				self.tags,
				mutable.assigned_expose_contract.as_ref().map(|contract| contract.exchange_core()),
				self.diffs.lock().unwrap().len(),
				self.diff_states.lock().unwrap().len(),
			)).unwrap();
			match &mutable.state {
				Some(data) => fmt.write_fmt(format_args!("│ Data: {:?} {:?} {:?}B {:?}\n", data.version, data.format, data.data.len(), data.synchronized)),
				None => fmt.write_fmt(format_args!("│ Data: None\n")),
			}
			.unwrap();
		} else {
			fmt.write_fmt(format_args!("ObjectCore<{:?}>", self.descriptor)).unwrap();
		}
		Ok(())
	}
}