hakuban 0.8.5

Data-object sharing library
Documentation
#[cfg(not(target_family = "wasm"))]
use std::time::Instant;
use std::{
	collections::{hash_map::Entry, BTreeMap, HashMap},
	fmt,
	sync::{atomic::AtomicU64, Arc, Mutex, RwLock, Weak},
};

use serde::{Deserialize, Serialize};
#[cfg(target_family = "wasm")]
use web_time::Instant;

#[cfg(feature = "downstream")]
use crate::connection::downstream::DownstreamConnectionId;
use crate::{
	connection::upstream::{UpstreamConnectionId, UpstreamObjectInterface},
	descriptor::ObjectDescriptor,
	object::{
		core::{Object, ObjectId},
		monitor::ObjectSnapshot,
		ObjectExposeContractId, ObjectObserveContractId, StateSinkId, StateStreamId,
	},
	tag::{
		core::{Tag, TagId},
		monitor::TagSnapshot,
		TagExposeContractId, TagObserveContractId,
	},
	TagDescriptor,
};

/// Main access point to the Hakuban network
#[derive(Clone)]
pub struct Exchange {
	pub(crate) shared: Arc<ExchangeShared>,
}

pub(crate) struct ExchangeShared {
	objects: RwLock<HashMap<ObjectDescriptor, Weak<Object>>>,
	tags: RwLock<HashMap<TagDescriptor, Weak<Tag>>>,
	upstream_connection: RwLock<Option<Arc<UpstreamObjectInterface>>>,
	#[cfg(feature = "downstream")]
	rebalance_due_date: Mutex<BTreeMap<(Instant, ObjectId), Weak<Object>>>,
	object_id_sequence: AtomicU64,
	tag_id_sequence: AtomicU64,
	upstream_id_sequence: AtomicU64,
	#[cfg(feature = "downstream")]
	downstream_id_sequence: AtomicU64,
	object_observe_contract_id_sequence: AtomicU64,
	object_expose_contract_id_sequence: AtomicU64,
	tag_observe_contract_id_sequence: AtomicU64,
	tag_expose_contract_id_sequence: AtomicU64,
	state_stream_id_sequence: AtomicU64,
	state_sink_id_sequence: AtomicU64,
}

impl Exchange {
	pub fn new() -> Exchange {
		Exchange {
			shared: Arc::new(ExchangeShared {
				objects: RwLock::new(HashMap::new()),
				tags: RwLock::new(HashMap::new()),
				upstream_connection: RwLock::new(None),
				#[cfg(feature = "downstream")]
				rebalance_due_date: Mutex::new(BTreeMap::new()),
				object_id_sequence: AtomicU64::new(0),
				tag_id_sequence: AtomicU64::new(0),
				upstream_id_sequence: AtomicU64::new(0),
				#[cfg(feature = "downstream")]
				downstream_id_sequence: AtomicU64::new(0),
				object_observe_contract_id_sequence: AtomicU64::new(0),
				object_expose_contract_id_sequence: AtomicU64::new(0),
				tag_observe_contract_id_sequence: AtomicU64::new(0),
				tag_expose_contract_id_sequence: AtomicU64::new(0),
				state_stream_id_sequence: AtomicU64::new(0),
				state_sink_id_sequence: AtomicU64::new(0),
			}),
		}
	}

	pub fn object_observe_contract(&self, descriptor: impl Into<ObjectDescriptor>) -> crate::object::contract_builder::ObjectObserveContractBuilder {
		crate::object::contract_builder::ObjectObserveContractBuilder::new(self.shared.clone(), descriptor.into())
	}

	pub fn object_expose_contract(&self, descriptor: impl Into<ObjectDescriptor>) -> crate::object::contract_builder::ObjectExposeContractBuilder {
		crate::object::contract_builder::ObjectExposeContractBuilder::new(self.shared.clone(), descriptor.into())
	}

	pub fn tag_observe_contract(&self, descriptor: impl Into<TagDescriptor>) -> crate::tag::contract_builder::TagObserveContractBuilder {
		crate::tag::contract_builder::TagObserveContractBuilder::new(self.shared.clone(), descriptor.into())
	}

	pub fn tag_expose_contract(&self, descriptor: impl Into<TagDescriptor>) -> crate::tag::contract_builder::TagExposeContractBuilder {
		crate::tag::contract_builder::TagExposeContractBuilder::new(self.shared.clone(), descriptor.into())
	}

	#[cfg(feature = "downstream")]
	pub fn rebalance(&self, average_rebalance_interval: std::time::Duration, rebalance_threshold: f64) -> Instant {
		self.shared.rebalance(average_rebalance_interval, rebalance_threshold)
	}

	pub fn snapshot(&self) -> ExchangeSnapshot {
		self.shared.snapshot()
	}
}

impl Default for Exchange {
	fn default() -> Self {
		Self::new()
	}
}

impl ExchangeShared {
	pub(crate) fn object_acquire(self: &Arc<Self>, descriptor: &ObjectDescriptor) -> Arc<Object> {
		let mut objects = self.objects.write().unwrap();
		if let Some(arc) = objects.get(descriptor).and_then(|weak| weak.upgrade()) {
			return arc;
		}
		let tags: Vec<Arc<Tag>> = descriptor.tags.iter().map(|tag_descriptor| self.tag_acquire(tag_descriptor)).collect();
		let upstream_connection = self.upstream_connection.read().unwrap();
		let now = Instant::now();
		let object_core = Object::new(self.clone(), self.next_object_id(), descriptor.clone(), tags, upstream_connection.clone());
		#[cfg(feature = "downstream")]
		self.insert_object_to_rebalance_queue(now, object_core.id(), Arc::downgrade(&object_core));
		objects.insert(descriptor.clone(), Arc::downgrade(&object_core));
		object_core
	}

	//This is only called from object destructor

	pub(crate) fn object_release(self: &Arc<Self>, descriptor: &ObjectDescriptor) {
		if let Entry::Occupied(entry) = self.objects.write().unwrap().entry(descriptor.clone()) {
			if Weak::strong_count(entry.get()) == 0 {
				entry.remove();
			}
		}
	}

	pub(crate) fn tag_acquire(self: &Arc<Self>, descriptor: &TagDescriptor) -> Arc<Tag> {
		let mut tags = self.tags.write().unwrap();
		if let Some(arc) = tags.get(descriptor).and_then(|weak| weak.upgrade()) {
			return arc;
		}
		let upstream_connection = self.upstream_connection.read().unwrap();
		let tag_core = Arc::new(Tag::new(self.clone(), self.next_tag_id(), descriptor.clone(), upstream_connection.clone()));
		tags.insert(descriptor.clone(), Arc::downgrade(&tag_core));
		tag_core
	}

	//This is only called from tag destructor

	pub(crate) fn tag_release(self: &Arc<Self>, descriptor: &TagDescriptor) {
		if let Entry::Occupied(entry) = self.tags.write().unwrap().entry(descriptor.clone()) {
			if Weak::strong_count(entry.get()) == 0 {
				entry.remove();
			}
		}
	}

	//TODO: consider returning Result instead
	pub(crate) fn set_upstream_connection(&self, upstream_object_interface: Arc<UpstreamObjectInterface>) -> bool {
		{
			let mut upstream_connection = self.upstream_connection.write().unwrap();
			if upstream_connection.is_none() || upstream_connection.as_ref().unwrap().id.0 < upstream_object_interface.id.0 {
				*upstream_connection = Some(upstream_object_interface.clone());
			} else {
				return false;
			}
		};

		let objects: Box<[Arc<Object>]> = { self.objects.read().unwrap().values().filter_map(|weak_object| weak_object.upgrade()).collect() };
		let tags: Box<[Arc<Tag>]> = { self.tags.read().unwrap().values().filter_map(|weak_tag| weak_tag.upgrade()).collect() };

		objects.iter().for_each(|object| object.set_upstream_connection(upstream_object_interface.clone()));
		tags.iter().for_each(|tag| tag.set_upstream_connection(upstream_object_interface.clone()));

		true
	}

	pub(crate) fn remove_upstream_connection(&self, upstream_connection_id: UpstreamConnectionId) {
		{
			let mut upstream_connection = self.upstream_connection.write().unwrap();
			if upstream_connection.is_some() && upstream_connection.as_ref().unwrap().id.0 <= upstream_connection_id.0 {
				*upstream_connection = None;
			} else {
				return;
			}
		};

		let objects: Box<[Arc<Object>]> = { self.objects.read().unwrap().values().filter_map(|weak_object| weak_object.upgrade()).collect() };
		let tags: Box<[Arc<Tag>]> = { self.tags.read().unwrap().values().filter_map(|weak_tag| weak_tag.upgrade()).collect() };

		objects.iter().for_each(|object| object.remove_upstream_connection(upstream_connection_id));
		tags.iter().for_each(|tag| tag.remove_upstream_connection(upstream_connection_id));
	}

	// IDs

	fn next_object_id(&self) -> ObjectId {
		ObjectId(self.object_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	fn next_tag_id(&self) -> TagId {
		TagId(self.tag_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	pub(crate) fn next_upstream_id(&self) -> UpstreamConnectionId {
		UpstreamConnectionId(self.upstream_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	#[cfg(feature = "downstream")]
	pub(crate) fn next_downstream_id(&self) -> DownstreamConnectionId {
		DownstreamConnectionId(self.downstream_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	pub(crate) fn next_object_observe_contract_id(&self) -> ObjectObserveContractId {
		ObjectObserveContractId(self.object_observe_contract_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	pub(crate) fn next_object_expose_contract_id(&self) -> ObjectExposeContractId {
		ObjectExposeContractId(self.object_expose_contract_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	pub(crate) fn next_tag_observe_contract_id(&self) -> TagObserveContractId {
		TagObserveContractId(self.tag_observe_contract_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	pub(crate) fn next_tag_expose_contract_id(&self) -> TagExposeContractId {
		TagExposeContractId(self.tag_expose_contract_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	pub(crate) fn next_state_stream_id(&self) -> StateStreamId {
		StateStreamId(self.state_stream_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	pub(crate) fn next_state_sink_id(&self) -> StateSinkId {
		StateSinkId(self.state_sink_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
	}

	//TODO: add async fn till_upstream_not_assigned() and allow racing multiple connection attempts

	#[cfg(feature = "downstream")]
	pub fn rebalance(&self, average_rebalance_interval: std::time::Duration, rebalance_threshold: f64) -> Instant {
		use std::time::Duration;
		let mut objects_to_drop = Vec::new();
		// dropping locked_due_dates BEFORE objects_to_drop. drop of upgraded arc may cause drop of the object, which in turn will make it call object_release, potentially deadlocking with ongoing object_acquire (reverse lock order)
		let mut locked_due_dates = self.rebalance_due_date.lock().unwrap();
		while locked_due_dates
			.first_key_value()
			.map(|((due_date, _object_id), _object)| *due_date <= Instant::now() + Duration::from_millis(100))
			.unwrap_or(false)
		{
			let ((due_date, _object_id), object_weak) = locked_due_dates.pop_first().unwrap();
			if let Some(object) = Weak::upgrade(&object_weak) {
				object.rebalance(rebalance_threshold);
				let next_due_date = due_date
					+ average_rebalance_interval / 2
					+ Duration::from_millis((rand::random::<f32>() * average_rebalance_interval.as_millis() as f32) as u64);
				locked_due_dates.insert((next_due_date, object.id()), object_weak);
				objects_to_drop.push(object);
			}
		}
		locked_due_dates.first_key_value().map(|((due_date, _), _)| *due_date).unwrap_or_else(|| Instant::now() + Duration::from_millis(1000))
	}

	#[cfg(feature = "downstream")]
	fn insert_object_to_rebalance_queue(&self, instant: Instant, id: ObjectId, object: Weak<Object>) {
		let mut locked_due_dates = self.rebalance_due_date.lock().unwrap();
		locked_due_dates.insert((instant, id), object);
	}

	pub fn snapshot(&self) -> ExchangeSnapshot {
		let objects = self.objects.read().unwrap();
		let tags = self.tags.read().unwrap();
		let upstream_connection = self.upstream_connection.read().unwrap();
		ExchangeSnapshot {
			objects: objects.values().filter_map(Weak::upgrade).map(|object| object.snapshot()).collect(),
			tags: tags.values().filter_map(Weak::upgrade).map(|tag| tag.snapshot()).collect(),
			upstream_connection: upstream_connection.is_some(),
		}
	}
}

#[derive(Serialize, Deserialize)]
pub struct ExchangeSnapshot {
	pub objects: Vec<ObjectSnapshot>,
	pub tags: Vec<TagSnapshot>,
	pub upstream_connection: bool,
}

impl fmt::Debug for ExchangeSnapshot {
	fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
		fmt.write_str("╒══════════════════════════════════════════════════════════════════════════════════════════════════════════════ Exchange\n")?;
		fmt.write_fmt(format_args!("│ Upstream connection:    {}\n", if self.upstream_connection { "yes" } else { "no" }))?;
		fmt.write_str("├─────────────────────────────────────────────────────────────────────────────────────── Objects\n")?;
		for (i, object) in self.objects.iter().enumerate() {
			object.fmt(fmt)?;
			if i < self.objects.len() - 1 {
				fmt.write_str("├─────\n").unwrap();
			}
		}
		fmt.write_str("├─────────────────────────────────────────────────────────────────────────────────────── Tags\n")?;
		for (i, tag) in self.tags.iter().enumerate() {
			tag.fmt(fmt)?;
			if i < self.tags.len() - 1 {
				fmt.write_str("├─────\n").unwrap();
			}
		}
		fmt.write_str("└╼")?;
		Ok(())
	}
}