#[cfg(not(target_family = "wasm"))]
use std::time::Instant;
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
fmt,
sync::{atomic::AtomicU64, Arc, Mutex, RwLock, Weak},
};
use serde::{Deserialize, Serialize};
#[cfg(target_family = "wasm")]
use web_time::Instant;
#[cfg(feature = "downstream")]
use crate::connection::downstream::DownstreamConnectionId;
use crate::{
connection::upstream::{UpstreamConnectionId, UpstreamObjectInterface},
descriptor::ObjectDescriptor,
object::{
core::{Object, ObjectId},
monitor::ObjectSnapshot,
ObjectExposeContractId, ObjectObserveContractId, StateSinkId, StateStreamId,
},
tag::{
core::{Tag, TagId},
monitor::TagSnapshot,
TagExposeContractId, TagObserveContractId,
},
TagDescriptor,
};
#[derive(Clone)]
pub struct Exchange {
pub(crate) shared: Arc<ExchangeShared>,
}
pub(crate) struct ExchangeShared {
objects: RwLock<HashMap<ObjectDescriptor, Weak<Object>>>,
tags: RwLock<HashMap<TagDescriptor, Weak<Tag>>>,
upstream_connection: RwLock<Option<Arc<UpstreamObjectInterface>>>,
#[cfg(feature = "downstream")]
rebalance_due_date: Mutex<BTreeMap<(Instant, ObjectId), Weak<Object>>>,
object_id_sequence: AtomicU64,
tag_id_sequence: AtomicU64,
upstream_id_sequence: AtomicU64,
#[cfg(feature = "downstream")]
downstream_id_sequence: AtomicU64,
object_observe_contract_id_sequence: AtomicU64,
object_expose_contract_id_sequence: AtomicU64,
tag_observe_contract_id_sequence: AtomicU64,
tag_expose_contract_id_sequence: AtomicU64,
state_stream_id_sequence: AtomicU64,
state_sink_id_sequence: AtomicU64,
}
impl Exchange {
pub fn new() -> Exchange {
Exchange {
shared: Arc::new(ExchangeShared {
objects: RwLock::new(HashMap::new()),
tags: RwLock::new(HashMap::new()),
upstream_connection: RwLock::new(None),
#[cfg(feature = "downstream")]
rebalance_due_date: Mutex::new(BTreeMap::new()),
object_id_sequence: AtomicU64::new(0),
tag_id_sequence: AtomicU64::new(0),
upstream_id_sequence: AtomicU64::new(0),
#[cfg(feature = "downstream")]
downstream_id_sequence: AtomicU64::new(0),
object_observe_contract_id_sequence: AtomicU64::new(0),
object_expose_contract_id_sequence: AtomicU64::new(0),
tag_observe_contract_id_sequence: AtomicU64::new(0),
tag_expose_contract_id_sequence: AtomicU64::new(0),
state_stream_id_sequence: AtomicU64::new(0),
state_sink_id_sequence: AtomicU64::new(0),
}),
}
}
pub fn object_observe_contract(&self, descriptor: impl Into<ObjectDescriptor>) -> crate::object::contract_builder::ObjectObserveContractBuilder {
crate::object::contract_builder::ObjectObserveContractBuilder::new(self.shared.clone(), descriptor.into())
}
pub fn object_expose_contract(&self, descriptor: impl Into<ObjectDescriptor>) -> crate::object::contract_builder::ObjectExposeContractBuilder {
crate::object::contract_builder::ObjectExposeContractBuilder::new(self.shared.clone(), descriptor.into())
}
pub fn tag_observe_contract(&self, descriptor: impl Into<TagDescriptor>) -> crate::tag::contract_builder::TagObserveContractBuilder {
crate::tag::contract_builder::TagObserveContractBuilder::new(self.shared.clone(), descriptor.into())
}
pub fn tag_expose_contract(&self, descriptor: impl Into<TagDescriptor>) -> crate::tag::contract_builder::TagExposeContractBuilder {
crate::tag::contract_builder::TagExposeContractBuilder::new(self.shared.clone(), descriptor.into())
}
#[cfg(feature = "downstream")]
pub fn rebalance(&self, average_rebalance_interval: std::time::Duration, rebalance_threshold: f64) -> Instant {
self.shared.rebalance(average_rebalance_interval, rebalance_threshold)
}
pub fn snapshot(&self) -> ExchangeSnapshot {
self.shared.snapshot()
}
}
impl Default for Exchange {
fn default() -> Self {
Self::new()
}
}
impl ExchangeShared {
pub(crate) fn object_acquire(self: &Arc<Self>, descriptor: &ObjectDescriptor) -> Arc<Object> {
let mut objects = self.objects.write().unwrap();
if let Some(arc) = objects.get(descriptor).and_then(|weak| weak.upgrade()) {
return arc;
}
let tags: Vec<Arc<Tag>> = descriptor.tags.iter().map(|tag_descriptor| self.tag_acquire(tag_descriptor)).collect();
let upstream_connection = self.upstream_connection.read().unwrap();
let now = Instant::now();
let object_core = Object::new(self.clone(), self.next_object_id(), descriptor.clone(), tags, upstream_connection.clone());
#[cfg(feature = "downstream")]
self.insert_object_to_rebalance_queue(now, object_core.id(), Arc::downgrade(&object_core));
objects.insert(descriptor.clone(), Arc::downgrade(&object_core));
object_core
}
pub(crate) fn object_release(self: &Arc<Self>, descriptor: &ObjectDescriptor) {
if let Entry::Occupied(entry) = self.objects.write().unwrap().entry(descriptor.clone()) {
if Weak::strong_count(entry.get()) == 0 {
entry.remove();
}
}
}
pub(crate) fn tag_acquire(self: &Arc<Self>, descriptor: &TagDescriptor) -> Arc<Tag> {
let mut tags = self.tags.write().unwrap();
if let Some(arc) = tags.get(descriptor).and_then(|weak| weak.upgrade()) {
return arc;
}
let upstream_connection = self.upstream_connection.read().unwrap();
let tag_core = Arc::new(Tag::new(self.clone(), self.next_tag_id(), descriptor.clone(), upstream_connection.clone()));
tags.insert(descriptor.clone(), Arc::downgrade(&tag_core));
tag_core
}
pub(crate) fn tag_release(self: &Arc<Self>, descriptor: &TagDescriptor) {
if let Entry::Occupied(entry) = self.tags.write().unwrap().entry(descriptor.clone()) {
if Weak::strong_count(entry.get()) == 0 {
entry.remove();
}
}
}
pub(crate) fn set_upstream_connection(&self, upstream_object_interface: Arc<UpstreamObjectInterface>) -> bool {
{
let mut upstream_connection = self.upstream_connection.write().unwrap();
if upstream_connection.is_none() || upstream_connection.as_ref().unwrap().id.0 < upstream_object_interface.id.0 {
*upstream_connection = Some(upstream_object_interface.clone());
} else {
return false;
}
};
let objects: Box<[Arc<Object>]> = { self.objects.read().unwrap().values().filter_map(|weak_object| weak_object.upgrade()).collect() };
let tags: Box<[Arc<Tag>]> = { self.tags.read().unwrap().values().filter_map(|weak_tag| weak_tag.upgrade()).collect() };
objects.iter().for_each(|object| object.set_upstream_connection(upstream_object_interface.clone()));
tags.iter().for_each(|tag| tag.set_upstream_connection(upstream_object_interface.clone()));
true
}
pub(crate) fn remove_upstream_connection(&self, upstream_connection_id: UpstreamConnectionId) {
{
let mut upstream_connection = self.upstream_connection.write().unwrap();
if upstream_connection.is_some() && upstream_connection.as_ref().unwrap().id.0 <= upstream_connection_id.0 {
*upstream_connection = None;
} else {
return;
}
};
let objects: Box<[Arc<Object>]> = { self.objects.read().unwrap().values().filter_map(|weak_object| weak_object.upgrade()).collect() };
let tags: Box<[Arc<Tag>]> = { self.tags.read().unwrap().values().filter_map(|weak_tag| weak_tag.upgrade()).collect() };
objects.iter().for_each(|object| object.remove_upstream_connection(upstream_connection_id));
tags.iter().for_each(|tag| tag.remove_upstream_connection(upstream_connection_id));
}
fn next_object_id(&self) -> ObjectId {
ObjectId(self.object_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
fn next_tag_id(&self) -> TagId {
TagId(self.tag_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
pub(crate) fn next_upstream_id(&self) -> UpstreamConnectionId {
UpstreamConnectionId(self.upstream_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
#[cfg(feature = "downstream")]
pub(crate) fn next_downstream_id(&self) -> DownstreamConnectionId {
DownstreamConnectionId(self.downstream_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
pub(crate) fn next_object_observe_contract_id(&self) -> ObjectObserveContractId {
ObjectObserveContractId(self.object_observe_contract_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
pub(crate) fn next_object_expose_contract_id(&self) -> ObjectExposeContractId {
ObjectExposeContractId(self.object_expose_contract_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
pub(crate) fn next_tag_observe_contract_id(&self) -> TagObserveContractId {
TagObserveContractId(self.tag_observe_contract_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
pub(crate) fn next_tag_expose_contract_id(&self) -> TagExposeContractId {
TagExposeContractId(self.tag_expose_contract_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
pub(crate) fn next_state_stream_id(&self) -> StateStreamId {
StateStreamId(self.state_stream_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
pub(crate) fn next_state_sink_id(&self) -> StateSinkId {
StateSinkId(self.state_sink_id_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
#[cfg(feature = "downstream")]
pub fn rebalance(&self, average_rebalance_interval: std::time::Duration, rebalance_threshold: f64) -> Instant {
use std::time::Duration;
let mut objects_to_drop = Vec::new();
let mut locked_due_dates = self.rebalance_due_date.lock().unwrap();
while locked_due_dates
.first_key_value()
.map(|((due_date, _object_id), _object)| *due_date <= Instant::now() + Duration::from_millis(100))
.unwrap_or(false)
{
let ((due_date, _object_id), object_weak) = locked_due_dates.pop_first().unwrap();
if let Some(object) = Weak::upgrade(&object_weak) {
object.rebalance(rebalance_threshold);
let next_due_date = due_date
+ average_rebalance_interval / 2
+ Duration::from_millis((rand::random::<f32>() * average_rebalance_interval.as_millis() as f32) as u64);
locked_due_dates.insert((next_due_date, object.id()), object_weak);
objects_to_drop.push(object);
}
}
locked_due_dates.first_key_value().map(|((due_date, _), _)| *due_date).unwrap_or_else(|| Instant::now() + Duration::from_millis(1000))
}
#[cfg(feature = "downstream")]
fn insert_object_to_rebalance_queue(&self, instant: Instant, id: ObjectId, object: Weak<Object>) {
let mut locked_due_dates = self.rebalance_due_date.lock().unwrap();
locked_due_dates.insert((instant, id), object);
}
pub fn snapshot(&self) -> ExchangeSnapshot {
let objects = self.objects.read().unwrap();
let tags = self.tags.read().unwrap();
let upstream_connection = self.upstream_connection.read().unwrap();
ExchangeSnapshot {
objects: objects.values().filter_map(Weak::upgrade).map(|object| object.snapshot()).collect(),
tags: tags.values().filter_map(Weak::upgrade).map(|tag| tag.snapshot()).collect(),
upstream_connection: upstream_connection.is_some(),
}
}
}
#[derive(Serialize, Deserialize)]
pub struct ExchangeSnapshot {
pub objects: Vec<ObjectSnapshot>,
pub tags: Vec<TagSnapshot>,
pub upstream_connection: bool,
}
impl fmt::Debug for ExchangeSnapshot {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.write_str("╒══════════════════════════════════════════════════════════════════════════════════════════════════════════════ Exchange\n")?;
fmt.write_fmt(format_args!("│ Upstream connection: {}\n", if self.upstream_connection { "yes" } else { "no" }))?;
fmt.write_str("├─────────────────────────────────────────────────────────────────────────────────────── Objects\n")?;
for (i, object) in self.objects.iter().enumerate() {
object.fmt(fmt)?;
if i < self.objects.len() - 1 {
fmt.write_str("├─────\n").unwrap();
}
}
fmt.write_str("├─────────────────────────────────────────────────────────────────────────────────────── Tags\n")?;
for (i, tag) in self.tags.iter().enumerate() {
tag.fmt(fmt)?;
if i < self.tags.len() - 1 {
fmt.write_str("├─────\n").unwrap();
}
}
fmt.write_str("└╼")?;
Ok(())
}
}