hakuban 0.7.2

Data-object sharing library
Documentation
use std::{collections::{HashMap, HashSet}, hash::{Hash, Hasher}, sync::{Arc, RwLock, Weak}};

use crate::{descriptor::ObjectDescriptor, exchange::{core::ExchangeCore, LocalExchangeShared}, expose_contract::ExposeContract, object::core::ObjectCore, observe_contract::ObserveContract, TagDescriptor};

// Locking order:
// Tag.links -> Object.link_*_contract
// Tag.links

pub(crate) struct TagCore {
	local_exchange: Arc<LocalExchangeShared>,
	pub(crate) descriptor: TagDescriptor,
	links: RwLock<TagLinks>,
}

//TODO: simplify contract containers to simple hashsets
struct TagLinks {
	objects: HashMap<ObjectDescriptor, Weak<ObjectCore>>, // used solely to notify objects of inserted/removed tag contracts
	observe_contracts: HashMap<Arc<ExchangeCore>, HashSet<Arc<dyn ObserveContract>>>,
	expose_contracts: HashMap<Arc<ExchangeCore>, HashSet<Arc<dyn ExposeContract>>>,
}

//LockCheck: scope=TagCore
impl TagCore {
	//LockCheck:
	pub(crate) fn new(local_exchange: Arc<LocalExchangeShared>, descriptor: TagDescriptor) -> TagCore {
		TagCore {
			local_exchange,
			descriptor,
			links: RwLock::new(TagLinks { objects: HashMap::new(), observe_contracts: HashMap::new(), expose_contracts: HashMap::new() }),
		}
	}

	//LockCheck:  call=LocalExchange::tag_changed
	pub(super) fn notify_local_exchange(&self) {
		self.local_exchange.tag_changed(self.descriptor.clone());
	}

	//LockCheck:  lock=mutable  call=ObjectCore::link_observe_contract  unlock=mutable  call=LocalExchange::object_changed  call=notify_local_exchange
	pub(super) fn link_tag_observe_contract(self: &Arc<Self>, contract: Arc<dyn ObserveContract>) {
		let mut links = self.links.write().unwrap();
		links.observe_contracts.entry(contract.exchange_core().clone()).or_insert_with(HashSet::new).insert(contract.clone());
		let objects_to_notify: Vec<Arc<ObjectCore>> = links
			.objects
			.values()
			.filter_map(Weak::upgrade)
			.map(|object| {
				object.link_observe_contract(contract.clone());
				object
			})
			.collect();
		drop(links);
		for object in objects_to_notify {
			self.local_exchange.object_changed(object.descriptor.clone());
		}
		self.notify_local_exchange();
	}

	//LockCheck:  lock=mutable  call=ObjectCore::link_expose_contract  unlock=mutable  call=LocalExchange::object_changed  call=notify_local_exchange
	pub(super) fn link_tag_expose_contract(&self, contract: Arc<dyn ExposeContract>) {
		let mut links = self.links.write().unwrap();
		links.expose_contracts.entry(contract.exchange_core().clone()).or_insert_with(HashSet::new).insert(contract.clone());
		let objects_to_notify: Vec<Arc<ObjectCore>> = links
			.objects
			.values()
			.filter_map(Weak::upgrade)
			.map(|object| {
				object.link_expose_contract(contract.clone());
				object
			})
			.collect();
		drop(links);
		for object in objects_to_notify {
			self.local_exchange.object_changed(object.descriptor.clone());
		}
		self.notify_local_exchange();
	}

	//LockCheck:  lock=mutable  call=ObjectCore::unlink_observe_contract  unlock=mutable  call=LocalExchange::object_changed  call=ObjectCore::drop  call=notify_local_exchange
	pub(super) fn unlink_tag_observe_contract(self: &Arc<Self>, contract: &Arc<dyn ObserveContract>) {
		let mut links = self.links.write().unwrap();
		let observe_contracts = links.observe_contracts.get_mut(contract.exchange_core()).unwrap();
		observe_contracts.remove(contract);
		if observe_contracts.is_empty() {
			links.observe_contracts.remove(contract.exchange_core());
		};
		let objects_to_notify: Vec<Arc<ObjectCore>> = links
			.objects
			.values()
			.filter_map(Weak::upgrade)
			.map(|object| {
				object.unlink_observe_contract(contract);
				object
			})
			.collect();
		drop(links);
		for object in objects_to_notify {
			self.local_exchange.object_changed(object.descriptor.clone());
		}
		self.notify_local_exchange();
	}

	//LockCheck:  lock=mutable  call=ObjectCore::unlink_expose_contract  unlock=mutable  call=LocalExchange::object_changed  call=ObjectCore::drop  call=notify_local_exchange
	pub(super) fn unlink_tag_expose_contract(self: &Arc<Self>, contract: &Arc<dyn ExposeContract>) {
		let mut links = self.links.write().unwrap();
		let expose_contracts = links.expose_contracts.get_mut(contract.exchange_core()).unwrap();
		expose_contracts.remove(contract);
		if expose_contracts.is_empty() {
			links.expose_contracts.remove(contract.exchange_core());
		};
		//TEST-MISSING: We were keeping descriptors only here, leading to occasional ObjectCore::drop call while holding lock on links. No test detected that deadlock.
		let objects_to_notify: Vec<Arc<ObjectCore>> = links
			.objects
			.values()
			.filter_map(Weak::upgrade)
			.map(|object| {
				object.unlink_expose_contract(contract);
				object
			})
			.collect();
		drop(links);
		for object in objects_to_notify {
			self.local_exchange.object_changed(object.descriptor.clone());
		}
		self.notify_local_exchange();
	}

	//LockCheck:  lock=mutable  call=ObjectCore::link_observe_contract  call=ObjectCore::link_expose_contract  unlock=mutable
	pub(crate) fn object_insert(&self, object_core: &Arc<ObjectCore>) {
		let mut links = self.links.write().unwrap();
		links.objects.insert(object_core.descriptor.clone(), Arc::downgrade(object_core));
		for observe_contracts in links.observe_contracts.values() {
			for observe_contract in observe_contracts {
				object_core.link_observe_contract(observe_contract.clone());
			}
		}
		for expose_contracts in links.expose_contracts.values() {
			for expose_contract in expose_contracts {
				object_core.link_expose_contract(expose_contract.clone());
			}
		}
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn object_remove(&self, descriptor: &ObjectDescriptor) {
		let mut links = self.links.write().unwrap();
		links.objects.remove(descriptor);
		//TODO: are contracts aware of object removal?
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn has_observer_other_than(self: &Arc<Self>, exchange_core: &Arc<ExchangeCore>) -> bool {
		let tag_links = self.links.read().unwrap();
		tag_links.observe_contracts.len() > 1 || (!tag_links.observe_contracts.is_empty() && !tag_links.observe_contracts.contains_key(exchange_core))
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn has_exposer_other_than(self: &Arc<Self>, exchange_core: &Arc<ExchangeCore>) -> bool {
		let tag_links = self.links.read().unwrap();
		tag_links.expose_contracts.len() > 1 || (!tag_links.expose_contracts.is_empty() && !tag_links.expose_contracts.contains_key(exchange_core))
	}

	fn expose_contracts(&self) -> Vec<Arc<dyn ExposeContract>> {
		self.links.read().unwrap().expose_contracts.values().flatten().cloned().collect()
	}

	//TODO: cache this
	pub(crate) fn minimum_cost_excluding_exchange(self: &Arc<Self>, exchange_core: &Arc<ExchangeCore>) -> Option<u32> {
		let mut lowest_cost = None;

		for expose_contract in self.expose_contracts() {
			if expose_contract.exchange_core() != exchange_core {
				let cost = expose_contract.composition_cost();
				if lowest_cost.is_none() || cost < lowest_cost.unwrap() {
					lowest_cost = Some(cost);
				}
			}
		}

		lowest_cost
	}
}

impl std::fmt::Debug for TagCore {
	fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		if fmt.alternate() {
			let links = self.links.read().unwrap();
			fmt.write_fmt(format_args!(
				"│ Tag: {:?}\n│ Observe contracts: {:?}\n│ Expose contracts: {:?}\n",
				self.descriptor,
				links.observe_contracts.keys(),
				links.expose_contracts.keys()
			))?;
		} else {
			fmt.write_fmt(format_args!("Tag:{}", self.descriptor))?;
		}
		Ok(())
	}
}


impl Hash for TagCore {
	fn hash<H: Hasher>(&self, state: &mut H) {
		self.descriptor.hash(state);
	}
}

impl PartialEq for TagCore {
	fn eq(&self, other: &Self) -> bool {
		self.descriptor == other.descriptor
	}
}

impl Eq for TagCore {}


//TEST-NEEDED: removing this doesn't make any test fail
//LockCheck: scope=TagCore
impl Drop for TagCore {
	//LockCheck:  call=LocalExchange::tag_release
	fn drop(&mut self) {
		self.local_exchange.tag_release(&self.descriptor);
	}
}