use std::{cmp::max, collections::{HashMap, HashSet}, hash::{Hash, Hasher}, sync::{atomic::AtomicUsize, Arc, Mutex, RwLock}};
use instant::Instant;
use log::trace;
use super::{DataBytes, DataSynchronized, DataVersion, ObjectState, ObjectStateStreamSharedMutable};
use crate::{channel::MpscSender, descriptor::ObjectDescriptor, diff, exchange::{core::ExchangeCore, LocalExchangeShared}, expose_contract::ExposeContract, observe_contract::ObserveContract, tag::core::TagCore};
type VersionFromTo = (Option<DataVersion>, DataVersion);
pub(crate) struct ObjectCore {
pub local_exchange: Arc<LocalExchangeShared>,
pub descriptor: ObjectDescriptor,
tags: Vec<Arc<TagCore>>,
mutable: RwLock<ObjectCoreMutable>,
pub(super) object_state_sinks_sequence: AtomicUsize,
pub(super) object_state_streams_sequence: AtomicUsize,
pub(super) diffs: Mutex<HashMap<VersionFromTo, Arc<Mutex<Option<DataBytes>>>>>,
pub(super) diff_states: Mutex<HashMap<DataVersion, Arc<Mutex<Option<diff::State>>>>>,
pub(super) remote_interfaces_sequence: AtomicUsize,
}
pub(crate) struct ObjectCoreMutable {
observe_contracts: HashMap<Arc<ExchangeCore>, HashSet<Arc<dyn ObserveContract>>>,
object_observe_contract_count: usize,
expose_contracts: HashMap<Arc<ExchangeCore>, HashSet<Arc<dyn ExposeContract>>>,
object_expose_contract_count: usize,
object_state_streams: HashMap<usize, Arc<Mutex<ObjectStateStreamSharedMutable>>>,
synchronized_object_state_sinks: HashSet<usize>,
active: bool,
live: bool,
assigned_expose_contract: Option<Arc<dyn ExposeContract>>,
state: Option<ObjectState<DataBytes>>,
remote_interfaces: HashMap<usize, MpscSender<HashSet<ObjectDescriptor>>>,
}
impl ObjectCoreMutable {
fn recalculate_active(&mut self, object_core: &Arc<ObjectCore>) -> bool {
let previous_active = self.active;
self.active = (self.object_observe_contract_count > 0 || (self.object_expose_contract_count > 0 && !self.observe_contracts.is_empty()))
&& !(self.observe_contracts.len() == 1
&& !self.observe_contracts.keys().next().unwrap().local
&& self.expose_contracts.contains_key(self.observe_contracts.keys().next().unwrap()));
if self.active && self.assigned_expose_contract.is_none() {
self.expose_contract_assign(object_core);
} else if !self.active && self.assigned_expose_contract.is_some() {
self.expose_contract_unassign(object_core.clone(), self.assigned_expose_contract.as_ref().unwrap().clone());
};
self.active != previous_active
}
fn recalculate_live(&mut self, object_core: &Arc<ObjectCore>) {
let previous_live = self.live;
self.live = self.active && self.state.is_some();
match (previous_live, self.live) {
(false, false) => (),
(false, true) => {
self.observe_contracts.values().for_each(|contracts_set| contracts_set.iter().for_each(|contract| contract.initialize_activate(object_core)));
}
(true, false) => {
self.observe_contracts
.values()
.for_each(|contracts_set| contracts_set.iter().for_each(|contract| contract.deinitialize_deactivate(object_core)));
}
(true, true) => (),
}
}
fn expose_contract_assign(&mut self, object_core: &Arc<ObjectCore>) {
loop {
let mut best_expected_score = (true, u32::MAX); let mut best_expose_contract = None;
for expose_contracts in self.expose_contracts.values() {
for candidate_expose_contract in expose_contracts {
if let Some(load) = candidate_expose_contract.expected_proportional_load_with() {
let candidate_score = (!candidate_expose_contract.exchange_core().upstream, load);
if candidate_score < best_expected_score {
best_expected_score = candidate_score;
best_expose_contract = Some(candidate_expose_contract.clone() as Arc<dyn ExposeContract>);
}
}
}
}
if let Some(selected_expose_contract) = &best_expose_contract {
if selected_expose_contract.assign(object_core) {
self.assigned_expose_contract = best_expose_contract;
break;
};
} else {
break;
}
}
}
fn potentially_reassign_to(&mut self, object_core: &Arc<ObjectCore>, candidate_contract: Arc<dyn ExposeContract>) {
#[allow(clippy::collapsible_if, clippy::collapsible_else_if)] if let Some(current_expose_contract) = &self.assigned_expose_contract {
let current_score = (!current_expose_contract.exchange_core().upstream, current_expose_contract.expected_proportional_load_without());
let candidate_score = (!candidate_contract.exchange_core().upstream, candidate_contract.expected_proportional_load_without());
if current_score > candidate_score {
if candidate_contract.assign(object_core) {
self.expose_contract_unassign(object_core.clone(), current_expose_contract.clone());
self.assigned_expose_contract = Some(candidate_contract);
}
};
} else {
if candidate_contract.assign(object_core) {
self.assigned_expose_contract = Some(candidate_contract);
}
}
}
fn expose_contract_unassign(&mut self, object_core: Arc<ObjectCore>, contract_to_unassign: Arc<dyn ExposeContract>) {
contract_to_unassign.unassign(&object_core);
self.assigned_expose_contract = None;
}
fn synchronize(&mut self, object_state_sink_synchronized: DataSynchronized, object_state_sink_id: usize) -> DataSynchronized {
match object_state_sink_synchronized {
DataSynchronized::LastAt(new_timestamp) => {
self.synchronized_object_state_sinks.remove(&object_state_sink_id);
if self.synchronized_object_state_sinks.is_empty() {
if let Some(DataSynchronized::LastAt(old_timestamp)) = self.state.as_ref().map(|state| &state.synchronized) {
DataSynchronized::LastAt(max(new_timestamp, *old_timestamp))
} else {
DataSynchronized::LastAt(new_timestamp)
}
} else {
DataSynchronized::Now
}
}
DataSynchronized::Now => {
self.synchronized_object_state_sinks.insert(object_state_sink_id);
DataSynchronized::Now
}
}
}
fn desynchronize(&mut self, object_state_sink_id: usize) -> DataSynchronized {
self.synchronized_object_state_sinks.remove(&object_state_sink_id);
if self.synchronized_object_state_sinks.is_empty() {
if let Some(DataSynchronized::LastAt(old_timestamp)) = self.state.as_ref().map(|state| &state.synchronized) {
DataSynchronized::LastAt(max(Instant::now(), *old_timestamp))
} else {
DataSynchronized::LastAt(Instant::now())
}
} else {
DataSynchronized::Now
}
}
pub(crate) fn state_set(&mut self, object_state: ObjectState<DataBytes>, object_state_sink_id: usize) -> (bool, bool) {
let old_synchronized = self.state.as_ref().map(|state| state.synchronized.clone());
let new_synchronized = self.synchronize(object_state.synchronized.clone(), object_state_sink_id);
let (state_changed, state_initialized) = if self.state.is_none() {
self.state = Some(object_state);
(true, true)
} else if object_state.version > self.state.as_mut().unwrap().version {
self.state = Some(object_state);
(true, false)
} else if old_synchronized.as_ref() != Some(&new_synchronized) {
let old_state = self.state.take().unwrap();
self.state = Some(old_state.with_synchronized(new_synchronized));
(true, false)
} else {
(false, false)
};
(state_changed, state_initialized)
}
}
impl Hash for ObjectCore {
fn hash<H: Hasher>(&self, state: &mut H) {
self.descriptor.hash(state);
}
}
impl PartialEq for ObjectCore {
fn eq(&self, other: &Self) -> bool {
self.descriptor == other.descriptor
}
}
impl Eq for ObjectCore {}
impl ObjectCore {
pub(crate) fn new(local_exchange: Arc<LocalExchangeShared>, descriptor: ObjectDescriptor, tags: Vec<Arc<TagCore>>) -> Arc<ObjectCore> {
let object_core = Arc::new(ObjectCore {
local_exchange,
descriptor,
object_state_sinks_sequence: AtomicUsize::new(1),
object_state_streams_sequence: AtomicUsize::new(1),
mutable: RwLock::new(ObjectCoreMutable {
observe_contracts: HashMap::new(),
expose_contracts: HashMap::new(),
object_state_streams: HashMap::new(),
active: false,
assigned_expose_contract: None,
synchronized_object_state_sinks: HashSet::new(),
state: None,
live: false,
object_observe_contract_count: 0,
object_expose_contract_count: 0,
remote_interfaces: HashMap::new(),
}),
tags: tags.clone(),
diffs: Mutex::new(HashMap::new()),
diff_states: Mutex::new(HashMap::new()),
remote_interfaces_sequence: AtomicUsize::new(1),
});
for tag in tags.iter() {
tag.object_insert(&object_core);
}
object_core
}
pub(crate) fn link_remote_interface(self: &Arc<Self>, notification_sender: MpscSender<HashSet<ObjectDescriptor>>) -> usize {
let id = self.remote_interfaces_sequence.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut links = self.mutable.write().unwrap();
links.remote_interfaces.insert(id, notification_sender);
id
}
pub(crate) fn unlink_remote_interface(self: &Arc<Self>, id: usize) {
let mut links = self.mutable.write().unwrap();
links.remote_interfaces.remove(&id);
}
pub(crate) fn link_observe_contract(self: &Arc<Self>, contract: Arc<dyn ObserveContract>) {
let mut links = self.mutable.write().unwrap();
if contract.object() {
links.object_observe_contract_count += 1
};
if links.live {
contract.clone().initialize_activate(self)
};
links.observe_contracts.entry(contract.exchange_core().clone()).or_insert_with(HashSet::new).insert(contract);
links.recalculate_active(self);
links.recalculate_live(self);
links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
}
pub(crate) fn link_expose_contract(self: &Arc<Self>, contract: Arc<dyn ExposeContract>) {
let mut links = self.mutable.write().unwrap();
if contract.object() {
links.object_expose_contract_count += 1
};
links.expose_contracts.entry(contract.exchange_core().clone()).or_insert_with(HashSet::new).insert(contract.clone());
links.potentially_reassign_to(self, contract); links.recalculate_active(self);
links.recalculate_live(self);
links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
}
pub(crate) fn unlink_expose_contract(self: &Arc<Self>, contract: &Arc<dyn ExposeContract>) {
let mut links = self.mutable.write().unwrap();
if contract.object() {
links.object_expose_contract_count -= 1
};
let exchange_expose_contracts = links.expose_contracts.get_mut(contract.exchange_core()).unwrap();
exchange_expose_contracts.remove(contract);
if exchange_expose_contracts.is_empty() {
links.expose_contracts.remove(contract.exchange_core());
};
if links.assigned_expose_contract.as_ref().map(|contract| contract.id()) == Some(contract.id()) {
links.expose_contract_unassign(self.clone(), contract.clone());
};
links.recalculate_active(self);
links.recalculate_live(self);
links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
}
pub(crate) fn unlink_observe_contract(self: &Arc<Self>, contract: &Arc<dyn ObserveContract>) {
let mut links = self.mutable.write().unwrap();
if contract.object() {
links.object_observe_contract_count -= 1
};
let exchange_observe_contracts = links.observe_contracts.get_mut(contract.exchange_core()).unwrap();
exchange_observe_contracts.remove(contract);
if exchange_observe_contracts.is_empty() {
links.observe_contracts.remove(contract.exchange_core());
};
links.recalculate_active(self);
links.recalculate_live(self);
links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
}
pub(crate) fn state_get(&self) -> Option<ObjectState<DataBytes>> {
let locked_state = self.mutable.read().unwrap();
locked_state.state.clone()
}
pub(crate) fn add_object_state_stream(&self, object_state_stream_id: usize, object_state_stream: Arc<Mutex<ObjectStateStreamSharedMutable>>) {
let mut mutable = self.mutable.write().unwrap();
mutable.object_state_streams.insert(object_state_stream_id, object_state_stream);
}
pub(crate) fn remove_object_state_stream(&self, object_state_stream_id: usize) {
let mut mutable = self.mutable.write().unwrap();
mutable.object_state_streams.remove(&object_state_stream_id);
}
pub(crate) fn desynchronize(&self, object_state_sink_id: usize) {
let mut links = self.mutable.write().unwrap();
let new_synchronized = links.desynchronize(object_state_sink_id);
if links.state.is_some() && links.state.as_ref().map(|state| &state.synchronized) != Some(&new_synchronized) {
let previous_state = links.state.take().unwrap();
links.state = Some(previous_state.with_synchronized(new_synchronized));
let streams_to_notify: Vec<Arc<Mutex<ObjectStateStreamSharedMutable>>> = links.object_state_streams.values().map(Arc::clone).collect();
links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
drop(links);
for object_state_stream in streams_to_notify {
object_state_stream.lock().unwrap().object_changed();
}
}
}
pub(super) fn state_set(self: &Arc<Self>, object_state: ObjectState<DataBytes>, object_state_sink_id: usize) {
let mut links = self.mutable.write().unwrap();
let (state_changed, state_initialized) = links.state_set(object_state, object_state_sink_id);
trace!(
"New object state: {:?} state_changed:{:?} state_initialized: {:?} synchronized: {:?}",
self.descriptor,
state_changed,
state_initialized,
links.state.as_ref().map(|x| &x.synchronized)
);
links.recalculate_live(self);
if state_changed {
let streams_to_notify: Vec<Arc<Mutex<ObjectStateStreamSharedMutable>>> = links.object_state_streams.values().map(Arc::clone).collect();
links.remote_interfaces.values().for_each(|sender| sender.send(self.descriptor.clone()));
drop(links);
for object_state_stream in streams_to_notify {
object_state_stream.lock().unwrap().object_changed();
}
}
}
pub(super) fn has_observer_other_than(self: &Arc<Self>, exchange: &Arc<ExchangeCore>, check_tags: bool) -> bool {
let object_links = self.mutable.read().unwrap();
if check_tags {
object_links.observe_contracts.keys().any(|contract_exchange| contract_exchange != exchange)
} else {
object_links
.observe_contracts
.iter()
.any(|(contract_exchange, contracts)| contract_exchange != exchange && contracts.iter().any(|contract| contract.object()))
}
}
pub(super) fn has_exposer_other_than(self: &Arc<Self>, exchange: &Arc<ExchangeCore>, check_tags: bool) -> bool {
let object_links = self.mutable.read().unwrap();
if check_tags {
object_links.expose_contracts.keys().any(|contract_exchange| contract_exchange != exchange)
} else {
object_links
.expose_contracts
.iter()
.any(|(contract_exchange, contracts)| contract_exchange != exchange && contracts.iter().any(|contract| contract.object()))
}
}
pub(super) fn is_assigned_to_exchange_other_than(self: &Arc<Self>, exchange: &Arc<ExchangeCore>) -> bool {
let object_links = self.mutable.read().unwrap();
if let Some(ref expose_contract) = object_links.assigned_expose_contract {
expose_contract.exchange_core() != exchange
} else {
false
}
}
pub(super) fn is_exposed_by_exchange(self: &Arc<Self>, exchange: &Arc<ExchangeCore>) -> bool {
let object_links = self.mutable.read().unwrap();
object_links.expose_contracts.contains_key(exchange)
}
pub(super) fn is_observed_by_exchange(self: &Arc<Self>, exchange: &Arc<ExchangeCore>) -> bool {
let object_links = self.mutable.read().unwrap();
object_links.observe_contracts.contains_key(exchange)
}
pub(super) fn minimum_cost_excluding_exchange(self: &Arc<Self>, exchange: &Arc<ExchangeCore>) -> Option<u32> {
let object_links = self.mutable.read().unwrap();
let mut lowest_cost = None;
for expose_contracts in object_links.expose_contracts.values() {
for expose_contract in expose_contracts {
if expose_contract.exchange_core() != exchange {
let cost = expose_contract.composition_cost();
if lowest_cost.is_none() || cost < lowest_cost.unwrap() {
lowest_cost = Some(cost);
}
}
}
}
lowest_cost
}
}
impl Drop for ObjectCore {
fn drop(&mut self) {
self.local_exchange.object_release(&self.descriptor);
}
}
impl std::fmt::Debug for ObjectCore {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if fmt.alternate() {
let mutable = self.mutable.read().unwrap();
fmt.write_fmt(format_args!(
"│ Object: {:?}\n│ Active: {:?} Live: {:?}\n│ Observe contracts: {:?}\n│ Expose contracts: {:?}\n│ Tags: {:?}\n│ Assigned: {:?}\n│ Diffs stored: {} diff states stored: {}\n",
self.descriptor,
mutable.active,
mutable.live,
mutable.observe_contracts.keys(),
mutable.expose_contracts.keys(),
self.tags,
mutable.assigned_expose_contract.as_ref().map(|contract| contract.exchange_core()),
self.diffs.lock().unwrap().len(),
self.diff_states.lock().unwrap().len(),
)).unwrap();
match &mutable.state {
Some(data) => fmt.write_fmt(format_args!("│ Data: {:?} {:?} {:?}B {:?}\n", data.version, data.format, data.data.len(), data.synchronized)),
None => fmt.write_fmt(format_args!("│ Data: None\n")),
}
.unwrap();
} else {
fmt.write_fmt(format_args!("ObjectCore<{:?}>", self.descriptor)).unwrap();
}
Ok(())
}
}