#[cfg(not(target_family = "wasm"))]
use std::time::Instant;
use std::{
cmp::{max, min},
collections::{HashMap, HashSet},
hash::{Hash, Hasher},
sync::{Arc, RwLock},
};
use log::trace;
#[cfg(target_family = "wasm")]
use web_time::Instant;
use super::{
diff_cache::DiffCache,
monitor::{ObjectDownstreamLinkSnapshot, ObjectSnapshot, ObjectStateSinkOwnerSnapshot, ObjectStateSnapshot, ObjectUpstreamLinkSnapshot},
DataBytes, DataSynchronized, ObjectExposeContractId, ObjectObserveContractId, ObjectState, ObjectStateSink, ObjectStateSinkInlet, ObjectStateSinkOwner,
ObjectStateStreamInlet, StateSinkId, StateStreamId,
};
#[cfg(feature = "downstream")]
use crate::connection::{
downstream::{DownsteamObjectInterface, DownstreamConnectionId},
termination::DownstreamBehaviourError,
};
use crate::{
connection::upstream::{UpstreamConnectionId, UpstreamObjectInterface},
descriptor::ObjectDescriptor,
exchange::ExchangeShared,
expose_contract::ExposeContractInlet,
object::ObjectStateStreamOwner,
observe_contract::ObserveContractInlet,
tag::{core::Tag, TagExposeContractId, TagObserveContractId},
utils::BoolUtils,
ObjectStateStream,
};
#[derive(PartialEq, Eq, Hash, Copy, Clone, Ord, PartialOrd)]
pub(crate) struct ObjectId(pub u64);
pub(crate) struct Object {
id: ObjectId,
pub descriptor: ObjectDescriptor,
exchange: Arc<ExchangeShared>,
tags: Box<[Arc<Tag>]>,
inner: RwLock<ObjectInner>,
pub(crate) diff_cache: DiffCache,
}
pub(crate) struct ObjectInner {
local_links: LocalLinks,
upstream_link: Option<UpstreamLink>,
#[cfg(feature = "downstream")]
downstream_links: HashMap<DownstreamConnectionId, DownstreamLink>,
cumulative_object_exposer_capacity: u64,
assigned_exposer: Option<ObjectStateSinkOwner>,
assigned_exposer_contributed_load: Option<f64>,
should_exist: bool,
data_needed: bool,
state: Option<ObjectState<DataBytes>>,
synchronized_object_state_sinks: HashSet<StateSinkId>, }
struct UpstreamLink {
interface: Arc<UpstreamObjectInterface>,
streams_sending_to_upstream: HashMap<StateStreamId, ObjectStateStreamInlet>,
sinks_receiving_from_upstream: HashMap<StateSinkId, ObjectStateSinkInlet>,
}
#[cfg(feature = "downstream")]
struct DownstreamLink {
object_interface: Arc<DownsteamObjectInterface>,
object_observer: bool,
object_exposer: bool,
object_exposer_capacity: u32,
tag_observer_count: usize,
tag_exposer_count: usize,
tag_exposer_capacity: u64,
stream: Option<ObjectStateStreamInlet>,
sink: Option<ObjectStateSinkInlet>,
}
#[cfg(feature = "downstream")]
impl DownstreamLink {
fn new(object_interface: Arc<DownsteamObjectInterface>) -> DownstreamLink {
DownstreamLink {
object_interface,
object_observer: false,
object_exposer: false,
object_exposer_capacity: 0,
stream: None,
sink: None,
tag_observer_count: 0,
tag_exposer_count: 0,
tag_exposer_capacity: 0,
}
}
fn update_stream(&mut self, id: DownstreamConnectionId, object_core: &Arc<Object>, assigned_exposer: Option<ObjectStateSinkOwner>) {
let should_have_a_stream = (self.object_observer || self.tag_observer_count > 0)
&& (assigned_exposer != Some(ObjectStateSinkOwner::DownstreamConnection(id)))
&& assigned_exposer.is_some();
if should_have_a_stream && self.stream.is_none() {
let (new_stream, new_stream_shared_mutable) =
ObjectStateStream::new(object_core.clone(), object_core.exchange.next_state_stream_id(), ObjectStateStreamOwner::DownstreamConnection(id));
if self.object_interface.stream_created(new_stream) {
self.stream = Some(new_stream_shared_mutable);
}
};
if !should_have_a_stream && self.stream.is_some() {
self.close_stream();
};
}
fn update_sink(&mut self, id: DownstreamConnectionId, object_core: &Arc<Object>, assigned_exposer: Option<ObjectStateSinkOwner>) {
let should_have_a_sink =
(self.object_exposer || self.tag_exposer_count > 0) && (assigned_exposer == Some(ObjectStateSinkOwner::DownstreamConnection(id)));
if should_have_a_sink && self.sink.is_none() {
let (new_sink, new_sink_shared_mutable) =
ObjectStateSink::new(object_core.clone(), object_core.exchange.next_state_sink_id(), ObjectStateSinkOwner::DownstreamConnection(id));
if self.object_interface.sink_created(new_sink) {
self.sink = Some(new_sink_shared_mutable);
};
};
if !should_have_a_sink && self.sink.is_some() {
self.close_sink();
}
}
fn close_sink(&mut self) {
if let Some(ref sink) = self.sink {
sink.close();
}
}
fn close_stream(&mut self) {
if let Some(ref stream) = self.stream {
stream.close();
}
}
}
struct ObserveContractLink {
contract: ObserveContractInlet,
stream: Option<ObjectStateStreamInlet>,
}
impl ObserveContractLink {
fn update_stream(&mut self, object_core: &Arc<Object>, owner: ObjectStateStreamOwner, should_exist: bool) {
let should_have_a_stream = should_exist;
if should_have_a_stream && self.stream.is_none() {
let (new_stream, new_stream_control) = ObjectStateStream::new(object_core.clone(), object_core.exchange.next_state_stream_id(), owner);
self.stream = Some(new_stream_control);
self.contract.stream_created(new_stream);
};
if !should_have_a_stream && self.stream.is_some() {
self.close_stream();
};
}
fn close_stream(&self) {
if let Some(ref stream) = self.stream {
stream.close();
}
}
}
struct ExposeContractLink {
contract: ExposeContractInlet,
sink: Option<ObjectStateSinkInlet>,
capacity: u32,
}
impl ExposeContractLink {
fn update_sink(&mut self, object_core: &Arc<Object>, owner: ObjectStateSinkOwner, assigned_exposer: Option<ObjectStateSinkOwner>) {
let should_have_a_sink = Some(owner) == assigned_exposer;
if should_have_a_sink && self.sink.is_none() {
let (new_sink, new_sink_control) = ObjectStateSink::new(object_core.clone(), object_core.exchange.next_state_sink_id(), owner);
self.sink = Some(new_sink_control);
self.contract.sink_created(new_sink);
};
if !should_have_a_sink && self.sink.is_some() {
self.close_sink();
};
}
fn close_sink(&self) {
if let Some(ref sink) = self.sink {
sink.close();
}
}
}
struct LocalLinks {
object_observe_contracts: HashMap<ObjectObserveContractId, ObserveContractLink>,
tag_observe_contracts: HashMap<TagObserveContractId, ObserveContractLink>,
object_expose_contracts: HashMap<ObjectExposeContractId, ExposeContractLink>,
tag_expose_contracts: HashMap<TagExposeContractId, ExposeContractLink>,
}
impl ObjectInner {
fn update(
&mut self,
object_core: &Arc<Object>,
update_should_exist: bool,
mut update_data_needed: bool,
mut update_assigned_exposer: bool,
mut update_downstream_sinks_and_streams: bool,
) {
if update_should_exist {
let previous_should_exist = self.should_exist;
self.should_exist = self
.upstream_link
.as_ref()
.map(|link| !link.sinks_receiving_from_upstream.is_empty() || !link.streams_sending_to_upstream.is_empty())
.unwrap_or(false)
|| !self.local_links.object_observe_contracts.is_empty()
|| !self.local_links.object_expose_contracts.is_empty();
#[cfg(feature = "downstream")]
{
self.should_exist = self.should_exist || self.downstream_links.values().any(|link| link.object_observer || link.object_exposer);
};
if self.should_exist != previous_should_exist {
if self.should_exist {
self.local_links.object_observe_contracts.iter_mut().for_each(|(contract_id, link)| {
link.update_stream(object_core, ObjectStateStreamOwner::ObjectObserveContract(*contract_id), self.should_exist);
});
self.local_links.tag_observe_contracts.iter_mut().for_each(|(contract_id, link)| {
link.update_stream(object_core, ObjectStateStreamOwner::TagObserveContract(*contract_id), self.should_exist);
});
} else {
self.local_links.tag_observe_contracts.iter_mut().for_each(|(_contract_id, interface)| {
interface.close_stream();
});
}
update_data_needed = true;
};
}
if update_data_needed {
let previous_data_needed = self.data_needed;
self.data_needed = self.should_exist
&& (self
.upstream_link
.as_ref()
.map(|link| !link.streams_sending_to_upstream.is_empty() || !link.sinks_receiving_from_upstream.is_empty())
.unwrap_or(false)
|| !self.local_links.object_observe_contracts.is_empty()
|| !self.local_links.tag_observe_contracts.is_empty());
#[cfg(feature = "downstream")]
{
self.data_needed = self.data_needed
|| (self.should_exist
&& (self.downstream_links.values().any(|downstream_link| {
(downstream_link.object_observer || downstream_link.tag_observer_count > 0)
&& !(downstream_link.object_exposer || downstream_link.tag_exposer_count > 0)
}) || self
.downstream_links
.values()
.filter(|downstream_link| downstream_link.object_observer || downstream_link.tag_observer_count > 0)
.nth(1)
.is_some()));
}
if !self.data_needed && self.state.is_some() && self.state.as_ref().unwrap().synchronized != DataSynchronized::Now {
self.state = None;
}
if self.data_needed != previous_data_needed {
update_assigned_exposer = true;
}
}
if update_assigned_exposer {
let previous_assigned_exposer = self.assigned_exposer;
if self.data_needed {
let picked_exposer = self.pick_exposer(None);
if picked_exposer != self.assigned_exposer {
self.unassign_exposer();
if let Some(new_exposer) = picked_exposer {
self.assign_exposer(object_core.clone(), new_exposer);
};
}
} else if self.assigned_exposer.is_some() {
self.unassign_exposer();
}
if self.assigned_exposer != previous_assigned_exposer {
update_downstream_sinks_and_streams = true;
}
}
#[cfg(feature = "downstream")]
if update_downstream_sinks_and_streams {
let assigned_exposer = self.assigned_exposer;
self.downstream_links.iter_mut().for_each(|(connection_id, link)| {
link.update_sink(*connection_id, object_core, assigned_exposer);
link.update_stream(*connection_id, object_core, assigned_exposer);
});
}
self.notify_upstream(object_core);
trace!(
"Object#{}: should_exist:{} data_needed:{} upstream_link:{} assigned_exposer:{:?}",
object_core.id.0,
self.should_exist,
self.data_needed,
self.upstream_link.is_some(),
&self.assigned_exposer
);
}
fn notify_upstream(&mut self, object_core: &Arc<Object>) {
if let Some(ref link) = self.upstream_link {
#[cfg(feature = "downstream")]
link.interface.object_contracts_changed(
object_core.clone(),
!self.local_links.object_observe_contracts.is_empty() || self.downstream_links.values().any(|downstream_link| downstream_link.object_observer),
!self.local_links.object_expose_contracts.is_empty() || self.downstream_links.values().any(|downstream_link| downstream_link.object_exposer),
min(self.cumulative_object_exposer_capacity, u32::MAX as u64) as u32,
);
#[cfg(not(feature = "downstream"))]
link.interface.object_contracts_changed(
object_core.clone(),
!self.local_links.object_observe_contracts.is_empty(),
!self.local_links.object_expose_contracts.is_empty(),
min(self.cumulative_object_exposer_capacity, u32::MAX as u64) as u32,
);
};
}
fn pick_exposer(&self, pick_new_downstream_if_load_lower_by: Option<f64>) -> Option<ObjectStateSinkOwner> {
if let Some(ref upstream_link) = self.upstream_link {
if !upstream_link.sinks_receiving_from_upstream.is_empty() {
return Some(ObjectStateSinkOwner::UpstreamConnection(upstream_link.interface.id));
}
};
if let Some(ObjectStateSinkOwner::ObjectExposeContract(object_expose_contract_id)) = self.assigned_exposer {
if self.local_links.object_expose_contracts.contains_key(&object_expose_contract_id) {
return self.assigned_exposer;
}
};
if let Some(ObjectStateSinkOwner::TagExposeContract(tag_expose_contract_id)) = self.assigned_exposer {
if self.local_links.tag_expose_contracts.contains_key(&tag_expose_contract_id) {
return self.assigned_exposer;
}
};
if let Some((lowest_score_object_expose_contract_id, _link)) =
self.local_links.object_expose_contracts.iter().max_by_key(|(_contract_id, link)| link.capacity)
{
return Some(ObjectStateSinkOwner::ObjectExposeContract(*lowest_score_object_expose_contract_id));
};
if let Some((lowest_score_tag_expose_contract_id, _link)) =
self.local_links.tag_expose_contracts.iter().max_by_key(|(_contract_id, link)| link.capacity)
{
return Some(ObjectStateSinkOwner::TagExposeContract(*lowest_score_tag_expose_contract_id));
};
#[cfg(feature = "downstream")]
if let Some(ObjectStateSinkOwner::DownstreamConnection(downstream_connection_id)) = self.assigned_exposer {
if pick_new_downstream_if_load_lower_by.is_none() {
if let Some(downstream_link) = self.downstream_links.get(&downstream_connection_id) {
if downstream_link.object_exposer || downstream_link.tag_exposer_count > 0 {
return self.assigned_exposer;
}
}
}
}
#[cfg(feature = "downstream")]
if let Some((load, lowest_score_downstream_connection_id)) = self
.downstream_links
.iter()
.filter_map(|(connection_id, link)| -> Option<(f64, DownstreamConnectionId)> {
let capacity = (link.object_exposer_capacity as u64 + link.tag_exposer_capacity) as f64;
if capacity > 0.0 && (link.object_exposer || link.tag_exposer_count > 0) {
link.object_interface.expected_proportional_load(1.0 / capacity, true).map(|load| (load, *connection_id))
} else {
None
}
})
.min_by(|(load_a, _id_a), (load_b, _id_b)| load_a.partial_cmp(load_b).unwrap())
{
if let Some(ObjectStateSinkOwner::DownstreamConnection(currently_assigned_downstream_id)) = self.assigned_exposer {
if let Some(current_downstream_link) = self.downstream_links.get(¤tly_assigned_downstream_id) {
if current_downstream_link.object_exposer || current_downstream_link.tag_exposer_count > 0 {
if (currently_assigned_downstream_id == lowest_score_downstream_connection_id)
|| (load
< current_downstream_link.object_interface.expected_proportional_load(0.0, false).unwrap()
- pick_new_downstream_if_load_lower_by.unwrap())
{
return Some(ObjectStateSinkOwner::DownstreamConnection(lowest_score_downstream_connection_id));
} else {
return Some(ObjectStateSinkOwner::DownstreamConnection(currently_assigned_downstream_id));
}
}
}
}
return Some(ObjectStateSinkOwner::DownstreamConnection(lowest_score_downstream_connection_id));
};
None
}
fn assign_exposer(&mut self, object_core: Arc<Object>, new_exposer: ObjectStateSinkOwner) {
assert!(self.assigned_exposer.is_none());
self.assigned_exposer = Some(new_exposer);
match new_exposer {
ObjectStateSinkOwner::ObjectExposeContract(object_expose_contract_id) => {
if let Some(contract) = self.local_links.object_expose_contracts.get_mut(&object_expose_contract_id) {
contract.update_sink(&object_core, new_exposer, Some(new_exposer));
}
}
ObjectStateSinkOwner::TagExposeContract(tag_expose_contract_id) => {
if let Some(contract) = self.local_links.tag_expose_contracts.get_mut(&tag_expose_contract_id) {
contract.update_sink(&object_core, new_exposer, Some(new_exposer));
}
}
ObjectStateSinkOwner::UpstreamConnection(_upstream_connection_id) => {
}
#[cfg(feature = "downstream")]
ObjectStateSinkOwner::DownstreamConnection(downstream_connection_id) => {
if let Some(link) = self.downstream_links.get_mut(&downstream_connection_id) {
link.update_sink(downstream_connection_id, &object_core, Some(new_exposer));
let load = 1.0 / (link.object_exposer_capacity as u64 + link.tag_exposer_capacity) as f64;
self.assigned_exposer_contributed_load = Some(load);
link.object_interface.load(load);
};
}
};
}
fn unassign_exposer(&mut self) {
match self.assigned_exposer.take() {
Some(ObjectStateSinkOwner::ObjectExposeContract(object_expose_contract_id)) => {
if let Some(contract) = self.local_links.object_expose_contracts.get_mut(&object_expose_contract_id) {
contract.close_sink();
}
}
Some(ObjectStateSinkOwner::TagExposeContract(tag_expose_contract_id)) => {
if let Some(contract) = self.local_links.tag_expose_contracts.get_mut(&tag_expose_contract_id) {
contract.close_sink();
}
}
Some(ObjectStateSinkOwner::UpstreamConnection(_upstream_connection_id)) => {}
#[cfg(feature = "downstream")]
Some(ObjectStateSinkOwner::DownstreamConnection(downstream_connection_id)) => {
if let Some(link) = self.downstream_links.get_mut(&downstream_connection_id) {
link.close_sink();
link.object_interface.load(-self.assigned_exposer_contributed_load.unwrap());
};
}
None => return,
};
self.assigned_exposer = None;
self.assigned_exposer_contributed_load = None;
}
fn contribute_capacity(&mut self, capacity: u32) {
self.cumulative_object_exposer_capacity = self.cumulative_object_exposer_capacity.checked_add(capacity as u64).unwrap();
}
fn withdraw_capacity(&mut self, capacity: u32) {
self.cumulative_object_exposer_capacity = self.cumulative_object_exposer_capacity.checked_sub(capacity as u64).unwrap();
}
fn synchronize(&mut self, object_state_sink_synchronized: DataSynchronized, object_state_sink_id: StateSinkId) -> DataSynchronized {
match object_state_sink_synchronized {
DataSynchronized::LastAt(new_timestamp) => {
self.synchronized_object_state_sinks.remove(&object_state_sink_id);
if self.synchronized_object_state_sinks.is_empty() {
if let Some(DataSynchronized::LastAt(old_timestamp)) = self.state.as_ref().map(|state| &state.synchronized) {
DataSynchronized::LastAt(max(new_timestamp, *old_timestamp))
} else {
DataSynchronized::LastAt(new_timestamp)
}
} else {
DataSynchronized::Now
}
}
DataSynchronized::Now => {
self.synchronized_object_state_sinks.insert(object_state_sink_id);
DataSynchronized::Now
}
}
}
fn desynchronize(&mut self, object_state_sink_id: StateSinkId) -> DataSynchronized {
self.synchronized_object_state_sinks.remove(&object_state_sink_id);
if self.synchronized_object_state_sinks.is_empty() {
if let Some(DataSynchronized::LastAt(old_timestamp)) = self.state.as_ref().map(|state| &state.synchronized) {
DataSynchronized::LastAt(max(Instant::now(), *old_timestamp))
} else {
DataSynchronized::LastAt(Instant::now())
}
} else {
DataSynchronized::Now
}
}
pub(crate) fn state_set(&mut self, new_object_state: ObjectState<DataBytes>, object_state_sink_id: StateSinkId) -> bool {
let old_synchronized = self.state.as_ref().map(|state| state.synchronized.clone());
let new_synchronized = self.synchronize(new_object_state.synchronized.clone(), object_state_sink_id);
if !self.data_needed && new_synchronized != DataSynchronized::Now {
self.state = None;
true
} else if self.state.is_none() || new_object_state.version > self.state.as_mut().unwrap().version {
self.state = Some(new_object_state);
true
} else if old_synchronized.as_ref() != Some(&new_synchronized) {
let old_state = self.state.take().unwrap();
self.state = Some(old_state.with_synchronized(new_synchronized));
true
} else {
false
}
}
fn notify_state_observers(&mut self) {
self.local_links.object_observe_contracts.values().filter_map(|interface| interface.stream.as_ref()).for_each(|stream_control| {
stream_control.wake();
});
self.local_links.tag_observe_contracts.values().filter_map(|interface| interface.stream.as_ref()).for_each(|stream_control| {
stream_control.wake();
});
self.upstream_link.iter().for_each(|upstream_link| {
upstream_link.streams_sending_to_upstream.values().for_each(|stream_control| {
stream_control.wake();
})
});
#[cfg(feature = "downstream")]
self.downstream_links.iter().filter_map(|(_downstream_id, link)| link.stream.as_ref()).for_each(|stream_control| {
stream_control.wake();
});
}
}
impl Object {
pub(crate) fn new(
exchange: Arc<ExchangeShared>,
id: ObjectId,
descriptor: ObjectDescriptor,
tags: Vec<Arc<Tag>>,
upstream_interface: Option<Arc<UpstreamObjectInterface>>,
) -> Arc<Object> {
let object_core = Arc::new(Object {
id,
exchange,
descriptor,
inner: RwLock::new(ObjectInner {
data_needed: false,
synchronized_object_state_sinks: HashSet::new(),
state: None,
local_links: LocalLinks {
object_observe_contracts: HashMap::new(),
tag_observe_contracts: HashMap::new(),
object_expose_contracts: HashMap::new(),
tag_expose_contracts: HashMap::new(),
},
upstream_link: upstream_interface.map(|interface| UpstreamLink {
interface,
streams_sending_to_upstream: HashMap::new(),
sinks_receiving_from_upstream: HashMap::new(),
}),
#[cfg(feature = "downstream")]
downstream_links: HashMap::new(),
assigned_exposer: None,
assigned_exposer_contributed_load: None,
should_exist: false,
cumulative_object_exposer_capacity: 0,
}),
tags: tags.clone().into_boxed_slice(),
diff_cache: DiffCache::default(),
});
for tag in tags.iter() {
tag.object_insert(&object_core);
}
object_core
}
pub fn id(&self) -> ObjectId {
self.id
}
#[cfg(feature = "downstream")]
pub(crate) fn rebalance(self: &Arc<Self>, rebalance_threshold: f64) {
let mut links = self.inner.write().unwrap();
if links.data_needed {
if let Some(better_exposer) = links.pick_exposer(Some(rebalance_threshold)) {
if Some(better_exposer) != links.assigned_exposer {
links.unassign_exposer();
links.assign_exposer(self.clone(), better_exposer);
links.update(self, false, false, false, true);
}
};
}
}
pub(crate) fn link_local_object_observe_contract(self: &Arc<Self>, contract_id: ObjectObserveContractId, contract: ObserveContractInlet) {
let mut links = self.inner.write().unwrap();
let interface = ObserveContractLink { contract, stream: None };
links.local_links.object_observe_contracts.insert(contract_id, interface).is_none().assert_true();
links.update(self, true, true, false, false);
let should_exist = links.should_exist;
links.local_links.object_observe_contracts.get_mut(&contract_id).unwrap().update_stream(
self,
ObjectStateStreamOwner::ObjectObserveContract(contract_id),
should_exist,
);
}
pub(crate) fn unlink_local_object_observe_contract(self: &Arc<Self>, contract_id: ObjectObserveContractId) {
let mut links = self.inner.write().unwrap();
let interface = links.local_links.object_observe_contracts.remove(&contract_id).unwrap();
links.update(self, true, true, false, false);
interface.close_stream();
}
pub(crate) fn link_local_object_expose_contract(self: &Arc<Self>, contract_id: ObjectExposeContractId, contract: ExposeContractInlet, capacity: u32) {
let mut links = self.inner.write().unwrap();
let interface = ExposeContractLink { contract, sink: None, capacity };
links.contribute_capacity(capacity);
links.local_links.object_expose_contracts.insert(contract_id, interface).is_none().assert_true();
links.update(self, true, false, true, false);
let assigned_exposer = links.assigned_exposer;
links.local_links.object_expose_contracts.get_mut(&contract_id).unwrap().update_sink(
self,
ObjectStateSinkOwner::ObjectExposeContract(contract_id),
assigned_exposer,
);
}
pub(crate) fn unlink_local_object_expose_contract(self: &Arc<Self>, contract_id: ObjectExposeContractId) {
let mut links = self.inner.write().unwrap();
let interface = links.local_links.object_expose_contracts.remove(&contract_id).unwrap();
links.withdraw_capacity(interface.capacity);
links.update(self, true, false, true, false);
interface.close_sink();
}
pub(crate) fn link_local_tag_observe_contract(self: &Arc<Self>, contract_id: TagObserveContractId, contract: ObserveContractInlet) {
let mut links = self.inner.write().unwrap();
let interface = ObserveContractLink { contract, stream: None };
links.local_links.tag_observe_contracts.insert(contract_id, interface).is_none().assert_true();
links.update(self, false, true, false, false);
let should_exist = links.should_exist;
links.local_links.tag_observe_contracts.get_mut(&contract_id).unwrap().update_stream(
self,
ObjectStateStreamOwner::TagObserveContract(contract_id),
should_exist,
);
}
pub(crate) fn unlink_local_tag_observe_contract(self: &Arc<Self>, contract_id: TagObserveContractId) {
let mut links = self.inner.write().unwrap();
let interface = links.local_links.tag_observe_contracts.remove(&contract_id).unwrap();
links.update(self, false, true, false, false);
interface.close_stream();
}
pub(crate) fn link_local_tag_expose_contract(self: &Arc<Self>, contract_id: TagExposeContractId, contract: ExposeContractInlet, capacity: u32) {
let mut links = self.inner.write().unwrap();
let interface = ExposeContractLink { contract, sink: None, capacity };
links.local_links.tag_expose_contracts.insert(contract_id, interface).is_none().assert_true();
links.update(self, false, false, true, false);
let assigned_exposer = links.assigned_exposer;
links.local_links.tag_expose_contracts.get_mut(&contract_id).unwrap().update_sink(
self,
ObjectStateSinkOwner::TagExposeContract(contract_id),
assigned_exposer,
);
}
pub(crate) fn unlink_local_tag_expose_contract(self: &Arc<Self>, contract_id: TagExposeContractId) {
let mut links = self.inner.write().unwrap();
let interface = links.local_links.tag_expose_contracts.remove(&contract_id).unwrap();
links.update(self, false, false, true, false);
interface.close_sink();
}
#[cfg(feature = "downstream")]
pub(crate) fn link_downstream_object_observe_contract(
self: &Arc<Self>,
downstream_connection_id: DownstreamConnectionId,
downstream_object_interface: Arc<DownsteamObjectInterface>,
) -> Result<(), DownstreamBehaviourError> {
let mut links = self.inner.write().unwrap();
if downstream_object_interface.is_terminated() {
return Ok(());
};
let link = links.downstream_links.entry(downstream_connection_id).or_insert_with(|| DownstreamLink::new(downstream_object_interface.clone()));
if !link.object_observer && !link.object_exposer {
let successfully_registered = link.object_interface.linked_object_register(self);
if !successfully_registered {
links.downstream_links.remove(&downstream_connection_id);
return Ok(());
}
}
link.object_observer.true_to_err(DownstreamBehaviourError::DoubleObjectObserve)?;
link.object_observer = true;
links.update(self, true, true, false, false);
let assigned_exposer = links.assigned_exposer;
links.downstream_links.get_mut(&downstream_connection_id).unwrap().update_stream(downstream_connection_id, self, assigned_exposer);
Ok(())
}
#[cfg(feature = "downstream")]
pub(crate) fn link_downstream_object_expose_contract(
self: &Arc<Self>,
downstream_connection_id: DownstreamConnectionId,
downstream_object_interface: Arc<DownsteamObjectInterface>,
capacity: u32,
) -> Result<(), DownstreamBehaviourError> {
let mut links = self.inner.write().unwrap();
if downstream_object_interface.is_terminated() {
return Ok(());
};
let link = links.downstream_links.entry(downstream_connection_id).or_insert_with(|| DownstreamLink::new(downstream_object_interface.clone()));
if !link.object_observer && !link.object_exposer {
let successfully_registered = link.object_interface.linked_object_register(self);
if !successfully_registered {
links.downstream_links.remove(&downstream_connection_id);
return Ok(());
}
}
link.object_exposer.true_to_err(DownstreamBehaviourError::DoubleObjectExpose)?;
link.object_exposer = true;
link.object_exposer_capacity = capacity;
links.contribute_capacity(capacity);
links.update(self, true, true, true, false);
let assigned_exposer = links.assigned_exposer;
links.downstream_links.get_mut(&downstream_connection_id).unwrap().update_sink(downstream_connection_id, self, assigned_exposer);
Ok(())
}
#[cfg(feature = "downstream")]
pub(crate) fn unlink_downstream_object_observe_contract(
self: &Arc<Self>,
downstream_connection_id: DownstreamConnectionId,
) -> Result<(), DownstreamBehaviourError> {
let mut links = self.inner.write().unwrap();
let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) else {
return Ok(());
};
if link.object_observer && !link.object_exposer {
link.object_interface.linked_object_unregister(self);
};
link.object_observer.false_to_err(DownstreamBehaviourError::ObjectUnobserveWithoutMatchingObserve)?;
link.object_observer = false;
if !link.object_observer
&& !link.object_exposer
&& link.tag_observer_count == 0
&& link.tag_exposer_count == 0
&& link.stream.is_none()
&& link.sink.is_none()
{
links.downstream_links.remove(&downstream_connection_id);
}
links.update(self, true, true, false, false);
let assigned_exposer = links.assigned_exposer;
if let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) {
link.update_stream(downstream_connection_id, self, assigned_exposer);
}
Ok(())
}
#[cfg(feature = "downstream")]
pub(crate) fn unlink_downstream_object_expose_contract(
self: &Arc<Self>,
downstream_connection_id: DownstreamConnectionId,
) -> Result<(), DownstreamBehaviourError> {
let mut links = self.inner.write().unwrap();
let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) else {
return Ok(());
};
if !link.object_observer && link.object_exposer {
link.object_interface.linked_object_unregister(self);
};
link.object_exposer.false_to_err(DownstreamBehaviourError::ObjectUnexposeWithoutMatchingExpose)?;
link.object_exposer = false;
let withdrawn_capacity = link.object_exposer_capacity;
link.object_exposer_capacity = 0;
if !link.object_observer
&& !link.object_exposer
&& link.tag_observer_count == 0
&& link.tag_exposer_count == 0
&& link.stream.is_none()
&& link.sink.is_none()
{
links.downstream_links.remove(&downstream_connection_id);
}
links.withdraw_capacity(withdrawn_capacity);
links.update(self, true, true, true, false);
let assigned_exposer = links.assigned_exposer;
if let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) {
link.update_sink(downstream_connection_id, self, assigned_exposer);
}
Ok(())
}
#[cfg(feature = "downstream")]
pub(crate) fn set_downstream_object_expose_capacity(
self: &Arc<Self>,
downstream_connection_id: DownstreamConnectionId,
capacity: u32,
) -> Result<(), DownstreamBehaviourError> {
let mut links = self.inner.write().unwrap();
let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) else {
return Ok(());
};
link.object_exposer.false_to_err(DownstreamBehaviourError::ChangeExposeCapacityWhileNotExposing)?;
let withdrawn_capacity = link.object_exposer_capacity;
link.object_exposer_capacity = capacity;
links.withdraw_capacity(withdrawn_capacity);
links.contribute_capacity(capacity);
links.notify_upstream(self);
Ok(())
}
#[cfg(feature = "downstream")]
pub(crate) fn link_downstream_tag_observe_contract(
self: &Arc<Self>,
downstream_connection_id: DownstreamConnectionId,
downstream_object_interface: Arc<DownsteamObjectInterface>,
) {
let mut links = self.inner.write().unwrap();
if downstream_object_interface.is_terminated() {
return;
};
let link = links.downstream_links.entry(downstream_connection_id).or_insert_with(|| DownstreamLink::new(downstream_object_interface.clone()));
link.tag_observer_count += 1;
links.update(self, false, true, false, false);
let assigned_exposer = links.assigned_exposer;
links.downstream_links.get_mut(&downstream_connection_id).unwrap().update_stream(downstream_connection_id, self, assigned_exposer);
}
#[cfg(feature = "downstream")]
pub(crate) fn link_downstream_tag_expose_contract(
self: &Arc<Self>,
downstream_connection_id: DownstreamConnectionId,
downstream_object_interface: Arc<DownsteamObjectInterface>,
capacity: u32,
) {
let mut links = self.inner.write().unwrap();
if downstream_object_interface.is_terminated() {
return;
};
let link = links.downstream_links.entry(downstream_connection_id).or_insert_with(|| DownstreamLink::new(downstream_object_interface.clone()));
link.tag_exposer_count += 1;
link.tag_exposer_capacity = link.tag_exposer_capacity.checked_add(capacity as u64).unwrap();
links.update(self, true, true, true, false);
let assigned_exposer = links.assigned_exposer;
links.downstream_links.get_mut(&downstream_connection_id).unwrap().update_sink(downstream_connection_id, self, assigned_exposer);
}
#[cfg(feature = "downstream")]
pub(crate) fn unlink_downstream_tag_observe_contract(self: &Arc<Self>, downstream_connection_id: DownstreamConnectionId) {
let mut links = self.inner.write().unwrap();
let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) else {
return;
};
link.tag_observer_count = link.tag_observer_count.checked_sub(1).unwrap();
if !link.object_observer
&& !link.object_exposer
&& link.tag_observer_count == 0
&& link.tag_exposer_count == 0
&& link.stream.is_none()
&& link.sink.is_none()
{
links.downstream_links.remove(&downstream_connection_id);
}
links.update(self, false, true, false, false);
let assigned_exposer = links.assigned_exposer;
if let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) {
link.update_stream(downstream_connection_id, self, assigned_exposer);
};
}
#[cfg(feature = "downstream")]
pub(crate) fn unlink_downstream_tag_expose_contract(self: &Arc<Self>, downstream_connection_id: DownstreamConnectionId, capacity: u32) {
let mut links = self.inner.write().unwrap();
let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) else {
return;
};
link.tag_exposer_count = link.tag_exposer_count.checked_sub(1).unwrap();
link.tag_exposer_capacity = link.tag_exposer_capacity.checked_sub(capacity as u64).unwrap();
if !link.object_observer
&& !link.object_exposer
&& link.tag_observer_count == 0
&& link.tag_exposer_count == 0
&& link.stream.is_none()
&& link.sink.is_none()
{
links.downstream_links.remove(&downstream_connection_id);
}
links.update(self, true, true, true, false);
let assigned_exposer = links.assigned_exposer;
if let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) {
link.update_sink(downstream_connection_id, self, assigned_exposer);
};
}
#[cfg(feature = "downstream")]
pub(crate) fn change_downstream_tag_expose_capacity(
self: &Arc<Self>,
downstream_connection_id: DownstreamConnectionId,
old_capacity: u32,
new_capacity: u32,
) -> Result<(), DownstreamBehaviourError> {
let mut links = self.inner.write().unwrap();
let Some(link) = links.downstream_links.get_mut(&downstream_connection_id) else {
return Ok(());
};
(link.tag_exposer_count > 0).assert_true();
link.tag_exposer_capacity = link.tag_exposer_capacity.checked_sub(old_capacity as u64).unwrap();
link.tag_exposer_capacity = link.tag_exposer_capacity.checked_add(new_capacity as u64).unwrap();
Ok(())
}
#[cfg(feature = "downstream")]
pub(crate) fn downstream_connection_dropped(self: &Arc<Self>, downstream_connection_id: DownstreamConnectionId) {
let mut links = self.inner.write().unwrap();
if let Some(mut interface) = links.downstream_links.remove(&downstream_connection_id) {
interface.close_stream();
interface.close_sink();
links.withdraw_capacity(interface.object_exposer_capacity);
links.update(self, true, true, true, false);
}
}
pub(crate) fn create_upstream_object_state_stream(self: &Arc<Self>, connection_id: UpstreamConnectionId) -> Option<ObjectStateStream> {
let mut links = self.inner.write().unwrap();
if links.upstream_link.is_none() || links.upstream_link.as_ref().unwrap().interface.id != connection_id {
return None;
}
let (new_stream, new_stream_control) =
ObjectStateStream::new(self.clone(), self.exchange.next_state_stream_id(), ObjectStateStreamOwner::UpstreamConnection(connection_id));
links.upstream_link.as_mut().unwrap().streams_sending_to_upstream.insert(new_stream_control.id(), new_stream_control);
links.update(self, true, true, false, false);
Some(new_stream)
}
pub(crate) fn create_upstream_object_state_sink(self: &Arc<Self>, connection_id: UpstreamConnectionId) -> Option<ObjectStateSink> {
let mut links = self.inner.write().unwrap();
if links.upstream_link.is_none() || links.upstream_link.as_ref().unwrap().interface.id != connection_id {
return None;
}
let (new_sink, new_sink_control) =
ObjectStateSink::new(self.clone(), self.exchange.next_state_sink_id(), ObjectStateSinkOwner::UpstreamConnection(connection_id));
links.upstream_link.as_mut().unwrap().sinks_receiving_from_upstream.insert(new_sink_control.id(), new_sink_control);
links.update(self, true, true, true, false);
Some(new_sink)
}
pub(crate) fn set_upstream_connection(self: &Arc<Self>, new_upstream_connection: Arc<UpstreamObjectInterface>) {
let mut links = self.inner.write().unwrap();
if links.upstream_link.is_some() && links.upstream_link.as_ref().unwrap().interface.id.0 < new_upstream_connection.id.0 {
let mut upstream_link = links.upstream_link.take().unwrap();
upstream_link.streams_sending_to_upstream.drain().for_each(|(_id, stream)| stream.close());
upstream_link.sinks_receiving_from_upstream.drain().for_each(|(_id, sink)| sink.close());
links.update(self, true, true, true, false);
}
if links.upstream_link.is_none() {
#[cfg(feature = "downstream")]
new_upstream_connection.object_contracts_changed(
self.clone(),
!links.local_links.object_observe_contracts.is_empty()
|| links.downstream_links.values().any(|downstream_link| downstream_link.object_observer),
!links.local_links.object_expose_contracts.is_empty() || links.downstream_links.values().any(|downstream_link| downstream_link.object_exposer),
min(links.cumulative_object_exposer_capacity, u32::MAX as u64) as u32,
);
#[cfg(not(feature = "downstream"))]
new_upstream_connection.object_contracts_changed(
self.clone(),
!links.local_links.object_observe_contracts.is_empty(),
!links.local_links.object_expose_contracts.is_empty(),
min(links.cumulative_object_exposer_capacity, u32::MAX as u64) as u32,
);
links.upstream_link = Some(UpstreamLink {
interface: new_upstream_connection,
streams_sending_to_upstream: HashMap::new(),
sinks_receiving_from_upstream: HashMap::new(),
});
}
}
pub(crate) fn remove_upstream_connection(self: &Arc<Self>, remove_upstream_connection_id: UpstreamConnectionId) {
let mut links = self.inner.write().unwrap();
if links.upstream_link.is_some() && links.upstream_link.as_ref().unwrap().interface.id.0 <= remove_upstream_connection_id.0 {
let mut upstream_link = links.upstream_link.take().unwrap();
upstream_link.streams_sending_to_upstream.drain().for_each(|(_id, stream)| stream.close());
upstream_link.sinks_receiving_from_upstream.drain().for_each(|(_id, sink)| sink.close());
links.update(self, true, true, true, false);
}
}
pub(crate) fn state_stream_dropped(self: &Arc<Self>, state_stream_id: StateStreamId, owner: &ObjectStateStreamOwner) {
match owner {
ObjectStateStreamOwner::ObjectObserveContract(object_observe_contract_id) => {
let mut links = self.inner.write().unwrap();
let should_exist = links.should_exist;
if let Some(link) = links.local_links.object_observe_contracts.get_mut(object_observe_contract_id) {
drop(link.stream.take().unwrap());
link.update_stream(self, owner.clone(), should_exist);
}
}
ObjectStateStreamOwner::TagObserveContract(tag_observe_contract_id) => {
let mut links = self.inner.write().unwrap();
let should_exist = links.should_exist;
if let Some(link) = links.local_links.tag_observe_contracts.get_mut(tag_observe_contract_id) {
drop(link.stream.take().unwrap());
link.update_stream(self, owner.clone(), should_exist);
}
}
ObjectStateStreamOwner::UpstreamConnection(upstream_connection_id) => {
let mut links = self.inner.write().unwrap();
if links.upstream_link.is_none() || links.upstream_link.as_ref().unwrap().interface.id != *upstream_connection_id {
return;
}
links.upstream_link.as_mut().unwrap().streams_sending_to_upstream.remove(&state_stream_id).is_some().assert_true();
links.update(self, true, true, false, false);
}
#[cfg(feature = "downstream")]
ObjectStateStreamOwner::DownstreamConnection(downstream_connection_id) => {
let mut links = self.inner.write().unwrap();
let assigned_exposer = links.assigned_exposer;
if let Some(link) = links.downstream_links.get_mut(downstream_connection_id) {
link.stream.take().unwrap();
link.update_stream(*downstream_connection_id, self, assigned_exposer);
if !link.object_observer
&& !link.object_exposer
&& link.tag_observer_count == 0
&& link.tag_exposer_count == 0
&& link.stream.is_none()
&& link.sink.is_none()
{
links.downstream_links.remove(downstream_connection_id);
}
};
}
}
}
pub(crate) fn state_sink_dropped(self: &Arc<Self>, state_sink_id: StateSinkId, owner: &ObjectStateSinkOwner) {
match owner {
ObjectStateSinkOwner::ObjectExposeContract(object_expose_contract_id) => {
let mut links = self.inner.write().unwrap();
let assigned_exposer = links.assigned_exposer;
if let Some(link) = links.local_links.object_expose_contracts.get_mut(object_expose_contract_id) {
link.sink.take().unwrap();
link.update_sink(self, *owner, assigned_exposer);
}
}
ObjectStateSinkOwner::TagExposeContract(tag_expose_contract_id) => {
trace!("ssdrop: {:?}", self.descriptor);
let mut links = self.inner.write().unwrap();
let assigned_exposer = links.assigned_exposer;
if let Some(link) = links.local_links.tag_expose_contracts.get_mut(tag_expose_contract_id) {
link.sink.take().unwrap();
link.update_sink(self, *owner, assigned_exposer);
}
}
ObjectStateSinkOwner::UpstreamConnection(upstream_connection_id) => {
let mut links = self.inner.write().unwrap();
if links.upstream_link.is_none() || links.upstream_link.as_ref().unwrap().interface.id != *upstream_connection_id {
return;
}
links.upstream_link.as_mut().unwrap().sinks_receiving_from_upstream.remove(&state_sink_id).is_some().assert_true();
links.update(self, true, true, true, false);
}
#[cfg(feature = "downstream")]
ObjectStateSinkOwner::DownstreamConnection(downstream_connection_id) => {
let mut links = self.inner.write().unwrap();
let assigned_exposer = links.assigned_exposer;
if let Some(link) = links.downstream_links.get_mut(downstream_connection_id) {
link.sink.take().unwrap();
link.update_sink(*downstream_connection_id, self, assigned_exposer);
if !link.object_observer
&& !link.object_exposer
&& link.tag_observer_count == 0
&& link.tag_exposer_count == 0
&& link.stream.is_none()
&& link.sink.is_none()
{
links.downstream_links.remove(downstream_connection_id);
}
};
}
}
}
pub(crate) fn state_get(&self) -> Option<ObjectState<DataBytes>> {
let locked_state = self.inner.read().unwrap();
locked_state.state.clone()
}
pub(crate) fn desynchronize(&self, object_state_sink_id: StateSinkId) {
let mut links = self.inner.write().unwrap();
let new_synchronized = links.desynchronize(object_state_sink_id);
if !links.data_needed && new_synchronized != DataSynchronized::Now {
links.state = None;
links.notify_state_observers();
} else if links.state.is_some() && links.state.as_ref().map(|state| &state.synchronized) != Some(&new_synchronized) {
let previous_state = links.state.take().unwrap();
links.state = Some(previous_state.with_synchronized(new_synchronized));
links.notify_state_observers();
}
}
pub(crate) fn synchronize(&self, synchronized: DataSynchronized, object_state_sink_id: StateSinkId) {
let mut links = self.inner.write().unwrap();
let new_synchronized = links.synchronize(synchronized, object_state_sink_id);
if !links.data_needed && new_synchronized != DataSynchronized::Now {
links.state = None;
links.notify_state_observers();
} else if links.state.is_some() && links.state.as_ref().map(|state| &state.synchronized) != Some(&new_synchronized) {
let previous_state = links.state.take().unwrap();
links.state = Some(previous_state.with_synchronized(new_synchronized));
links.notify_state_observers();
}
}
pub(super) fn state_set(self: &Arc<Self>, object_state: ObjectState<DataBytes>, object_state_sink_id: StateSinkId) {
let mut links = self.inner.write().unwrap();
let state_changed = links.state_set(object_state, object_state_sink_id);
trace!(
"Object#{}: new state: {:?} state_changed:{:?} synchronized: {:?}",
self.id.0,
self.descriptor,
state_changed,
links.state.as_ref().map(|x| &x.synchronized)
);
if state_changed {
links.notify_state_observers();
}
}
pub(crate) fn snapshot(&self) -> ObjectSnapshot {
let links = self.inner.read().unwrap();
#[cfg(feature = "downstream")]
let mut downstream_links: Vec<ObjectDownstreamLinkSnapshot> = links
.downstream_links
.iter()
.map(|(id, link)| ObjectDownstreamLinkSnapshot {
downstream_connection_id: id.0,
object_observer: link.object_observer,
object_exposer: link.object_exposer,
object_exposer_capacity: link.object_exposer_capacity,
tag_observer_count: link.tag_observer_count,
tag_exposer_count: link.tag_exposer_count,
stream: link.stream.is_some(),
sink: link.sink.is_some(),
})
.collect();
#[cfg(not(feature = "downstream"))]
let mut downstream_links: Vec<ObjectDownstreamLinkSnapshot> = vec![];
downstream_links.sort_by_key(|link| link.downstream_connection_id);
ObjectSnapshot {
id: self.id.0,
descriptor: self.descriptor.clone(),
should_exist: links.should_exist,
data_needed: links.data_needed,
assigned_exposer: links.assigned_exposer.map(|exposer| match exposer {
ObjectStateSinkOwner::ObjectExposeContract(object_expose_contract_id) => {
ObjectStateSinkOwnerSnapshot::ObjectExposeContract(object_expose_contract_id.0)
}
ObjectStateSinkOwner::TagExposeContract(tag_expose_contract_id) => ObjectStateSinkOwnerSnapshot::TagExposeContract(tag_expose_contract_id.0),
ObjectStateSinkOwner::UpstreamConnection(upstream_connection_id) => ObjectStateSinkOwnerSnapshot::UpstreamConnection(upstream_connection_id.0),
#[cfg(feature = "downstream")]
ObjectStateSinkOwner::DownstreamConnection(downstream_connection_id) => ObjectStateSinkOwnerSnapshot::DownstreamConnection(downstream_connection_id.0),
}),
object_observe_contract_count: links.local_links.object_observe_contracts.len(),
object_expose_contract_count: links.local_links.object_expose_contracts.len(),
tag_observe_contract_count: links.local_links.tag_observe_contracts.len(),
tag_expose_contract_count: links.local_links.tag_expose_contracts.len(),
upstream_link: links.upstream_link.as_ref().map(|link| ObjectUpstreamLinkSnapshot {
sinks_receiving_from_upstream_count: link.sinks_receiving_from_upstream.len(),
streams_sending_to_upstream_count: link.streams_sending_to_upstream.len(),
}),
downstream_links,
cumulative_object_exposer_capacity: links.cumulative_object_exposer_capacity,
data_state: links.state.as_ref().map(|state| ObjectStateSnapshot {
version: state.version.clone(),
format: state.format.clone(),
size: state.data.len(),
synchronized_ms_ago: state.synchronized.micros_ago(),
}),
}
}
}
impl Drop for Object {
fn drop(&mut self) {
for tag in self.tags.iter() {
tag.object_remove(self.id);
}
self.exchange.object_release(&self.descriptor);
}
}
impl Hash for Object {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
impl PartialEq for Object {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for Object {}