#![allow(clippy::mutex_atomic)]
use std::{collections::{hash_map::Entry, HashSet}, hash::{Hash, Hasher}, sync::{atomic::{AtomicU32, Ordering}, Arc, Mutex}};
use log::trace;
use super::{core::ObjectCore, state_sink::{ObjectStateSink, ObjectStateSinkSharedMutable}, ObjectStateStream, ObjectStateStreamSharedMutable, ObserveContractMutable};
use crate::{channel::MpscSender, contract::Contract, diff, exchange::{core::ExchangeCore, local::LocalExchangeShared}, expose_contract::ExposeContract, object::state_sink::ExposeContractMutable, observe_contract::ObserveContract, DataBytes, DataVersion, ObjectDescriptor};
pub(crate) struct RemoteObject {
object_interface: Arc<RemoteObjectInterface>,
}
pub(super) struct RemoteObjectInterface {
remote_exchange_core: Arc<ExchangeCore>,
object_core: Arc<ObjectCore>,
composition_cost: AtomicU32,
mutable: Arc<Mutex<RemoteObjectExposeContractSharedMutable>>,
id: Option<usize>,
}
pub(crate) struct RemoteObjectExposeContractSharedMutable {
expose: bool,
observe: bool,
object_state_sink: Option<Arc<Mutex<ObjectStateSinkSharedMutable>>>,
assigned: bool,
object_state_stream: Option<Arc<Mutex<ObjectStateStreamSharedMutable>>>,
live: bool,
}
impl RemoteObject {
pub(crate) fn new(
instantiate: bool, local_exchange: Arc<LocalExchangeShared>, remote_exchange_core: Arc<ExchangeCore>, descriptor: &ObjectDescriptor,
changes: Option<MpscSender<HashSet<ObjectDescriptor>>>,
) -> Option<RemoteObject> {
if instantiate {
let object_core = local_exchange.object_acquire(descriptor);
let object_interface = RemoteObjectInterface::new(object_core, remote_exchange_core, changes);
Some(RemoteObject { object_interface })
} else {
local_exchange.object_maybe_acquire(descriptor).map(|object_core| {
let object_interface = RemoteObjectInterface::new(object_core, remote_exchange_core, changes);
RemoteObject { object_interface }
})
}
}
pub(crate) fn expose(&self, value: bool, cost: u32) {
self.object_interface.composition_cost.store(cost, Ordering::Relaxed);
let mut shared_mutable = self.object_interface.mutable.lock().unwrap();
if shared_mutable.expose != value {
shared_mutable.expose = value;
let dyn_self: Arc<dyn ExposeContract> = self.object_interface.clone();
if value {
drop(shared_mutable);
self.object_interface.object_core.link_expose_contract(dyn_self);
} else {
if let Some(object_state_sink) = shared_mutable.object_state_sink.take() {
object_state_sink.lock().unwrap().contract_terminated();
}
drop(shared_mutable);
self.object_interface.object_core.unlink_expose_contract(&dyn_self);
}
self.object_interface.object_core.local_exchange.object_changed(self.object_interface.object_core.descriptor.clone());
}
}
pub(crate) fn observe(&self, value: bool) {
let mut shared_mutable = self.object_interface.mutable.lock().unwrap();
if shared_mutable.observe != value {
shared_mutable.observe = value;
let dyn_self: Arc<dyn ObserveContract> = self.object_interface.clone();
if value {
drop(shared_mutable);
self.object_interface.object_core.link_observe_contract(dyn_self);
} else {
if let Some(object_state_stream) = shared_mutable.object_state_stream.take() {
object_state_stream.lock().unwrap().contract_terminated();
};
drop(shared_mutable);
self.object_interface.object_core.unlink_observe_contract(&dyn_self);
}
self.object_interface.object_core.local_exchange.object_changed(self.object_interface.object_core.descriptor.clone());
}
}
pub(crate) fn is_observed_by_this_exchange(&self) -> bool {
self.object_interface.object_core.is_observed_by_exchange(&self.object_interface.remote_exchange_core)
}
pub(crate) fn is_exposed_by_this_exchange(&self) -> bool {
self.object_interface.object_core.is_exposed_by_exchange(&self.object_interface.remote_exchange_core)
}
pub(crate) fn is_assigned_to_other_exchange(&self) -> bool {
self.object_interface.object_core.is_assigned_to_exchange_other_than(&self.object_interface.remote_exchange_core)
}
pub(crate) fn is_observed_by_other_exchange(&self, check_tags: bool) -> bool {
self.object_interface.object_core.has_observer_other_than(&self.object_interface.remote_exchange_core, check_tags)
}
pub(crate) fn is_exposed_by_other_exchange(&self, check_tags: bool) -> bool {
self.object_interface.object_core.has_exposer_other_than(&self.object_interface.remote_exchange_core, check_tags)
}
pub(crate) fn accept_object_state_sink(&self) -> Option<ObjectStateSink> {
let mut shared_mutable = self.object_interface.mutable.lock().unwrap();
if shared_mutable.object_state_sink.is_some() {
panic!("Double accept")
}
if shared_mutable.assigned {
let object_state_sink =
ObjectStateSink::new(self.object_interface.object_core.clone(), ExposeContractMutable::RemoteObject(self.object_interface.mutable.clone()));
shared_mutable.object_state_sink = Some(object_state_sink.drop_guard.shared.clone());
Some(object_state_sink)
} else {
None
}
}
pub(crate) fn get_object_state_stream(&self) -> Option<ObjectStateStream> {
let mut shared_mutable = self.object_interface.mutable.lock().unwrap();
if shared_mutable.object_state_stream.is_some() {
panic!("Double object_state_stream checkout")
};
let ret = if shared_mutable.live {
let object_state_stream =
ObjectStateStream::new(self.object_interface.object_core.clone(), ObserveContractMutable::RemoteObject(self.object_interface.mutable.clone()));
shared_mutable.object_state_stream = Some(object_state_stream.drop_guard.shared.clone());
Some(object_state_stream)
} else {
None
};
drop(shared_mutable);
if let Some(stream) = &ret {
stream.link_to_object();
}
ret
}
pub(crate) fn cache_diff(&self, from_version: Option<Vec<i64>>, to_version: Vec<i64>, diff: Arc<Vec<u8>>) {
let mut diffs = self.object_interface.object_core.diffs.lock().unwrap();
let diff_key = (from_version, to_version);
if let std::collections::hash_map::Entry::Vacant(entry) = diffs.entry(diff_key) {
trace!("Memorizing diff submitted by remote.");
entry.insert(Arc::new(Mutex::new(Some(diff))));
while diffs.len() > 1 {
let key = diffs.keys().min().unwrap().clone();
diffs.remove(&key);
}
};
}
pub(crate) fn minimum_cost_in_other_exchanges(&self) -> Option<u32> {
self.object_interface.object_core.minimum_cost_excluding_exchange(&self.object_interface.remote_exchange_core)
}
pub(crate) fn get_diff(
&self, old_version: Option<&DataVersion>, old_data: Option<&DataBytes>, new_version: &[i64], new_data: &DataBytes,
) -> Option<DataBytes> {
let diff_mutex = {
let mut diffs = self.object_interface.object_core.diffs.lock().unwrap();
let ret = match diffs.entry((old_version.cloned(), new_version.into())) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => entry.insert(Arc::new(Mutex::new(None))).clone(),
};
while diffs.len() > 1 {
let key = diffs.keys().min().unwrap().clone();
diffs.remove(&key);
}
ret
};
let mut diff_mutex_lock = diff_mutex.lock().unwrap();
Some(
diff_mutex_lock
.get_or_insert_with(|| {
let (diff, new_state) = if let Some(old_version) = old_version {
let state_mutex = {
let mut diff_states = self.object_interface.object_core.diff_states.lock().unwrap();
let ret = match diff_states.entry(old_version.clone()) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => entry.insert(Arc::new(Mutex::new(None))).clone(),
};
while diff_states.len() > 2 {
let key = diff_states.keys().min().unwrap().clone();
diff_states.remove(&key);
}
ret
};
let mut state_mutex_lock = state_mutex.lock().unwrap();
let state = state_mutex_lock.get_or_insert_with(|| diff::State::new().diff(&[], old_data.unwrap(), 8, 6).unwrap().1);
let ret = state.diff(old_data.unwrap(), new_data, 8, 6).unwrap();
let patched = diff::patch(old_data.unwrap(), &ret.0).unwrap();
if patched.len() != new_data.len() {
panic!()
};
patched.iter().zip(new_data.iter()).for_each(|(a, b)| {
if a != b {
panic!()
}
});
ret
} else {
let state = diff::State::new();
state.diff(&[], new_data, 8, 6).unwrap()
};
let mut diff_states = self.object_interface.object_core.diff_states.lock().unwrap();
if !diff_states.contains_key(new_version) {
diff_states.insert(new_version.into(), Arc::new(Mutex::new(Some(new_state))));
}
while diff_states.len() > 2 {
let key = diff_states.keys().min().unwrap().clone();
diff_states.remove(&key);
}
Arc::new(diff)
})
.clone(),
)
}
}
impl Drop for RemoteObject {
fn drop(&mut self) {
self.object_interface.destroy();
}
}
impl RemoteObjectInterface {
pub(super) fn new(
object_core: Arc<ObjectCore>, exchange: Arc<ExchangeCore>, changes: Option<MpscSender<HashSet<ObjectDescriptor>>>,
) -> Arc<RemoteObjectInterface> {
let id = changes.map(|sender| object_core.link_remote_interface(sender));
Arc::new(RemoteObjectInterface {
remote_exchange_core: exchange,
object_core,
composition_cost: AtomicU32::new(0),
mutable: Arc::new(Mutex::new(RemoteObjectExposeContractSharedMutable {
object_state_sink: None,
assigned: false,
object_state_stream: None,
live: false,
observe: false,
expose: false,
})),
id,
})
}
}
impl Hash for RemoteObjectInterface {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id().hash(state);
}
}
impl PartialEq for RemoteObjectInterface {
fn eq(&self, other: &Self) -> bool {
self.id() == other.id()
}
}
impl Eq for RemoteObjectInterface {}
impl RemoteObjectInterface {
pub(super) fn destroy(self: &Arc<Self>) {
if let Some(id) = self.id {
self.object_core.unlink_remote_interface(id);
};
let (observe, expose) = {
let shared_mutable = self.mutable.lock().unwrap();
(shared_mutable.observe, shared_mutable.expose)
};
if observe {
let dyn_self: Arc<dyn ObserveContract> = self.clone();
self.object_core.unlink_observe_contract(&dyn_self);
self.object_core.local_exchange.object_changed(self.object_core.descriptor.clone());
};
if expose {
let dyn_self: Arc<dyn ExposeContract> = self.clone();
self.object_core.unlink_expose_contract(&dyn_self);
self.object_core.local_exchange.object_changed(self.object_core.descriptor.clone());
};
assert_eq!(Arc::strong_count(self), 1);
}
}
impl std::fmt::Debug for RemoteObjectInterface {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str(&format!("RemoteObjectInterface:{}", self.object_core.descriptor))
}
}
impl Contract for RemoteObjectInterface {
fn id(&self) -> usize {
(self as *const Self) as usize
}
fn exchange_core(&self) -> &Arc<ExchangeCore> {
&self.remote_exchange_core
}
}
impl ObserveContract for RemoteObjectInterface {
fn initialize_activate(&self, _object_core: &Arc<ObjectCore>) {
let mut shared_mutable = self.mutable.lock().unwrap();
shared_mutable.live = true;
}
fn deinitialize_deactivate(&self, _object_core: &Arc<ObjectCore>) {
let mut shared_mutable = self.mutable.lock().unwrap();
shared_mutable.live = false;
}
fn object(&self) -> bool {
true
}
}
impl ExposeContract for RemoteObjectInterface {
fn composition_cost(&self) -> u32 {
self.composition_cost.load(Ordering::Relaxed)
}
fn assign(&self, _object_core: &Arc<ObjectCore>) -> bool {
let mut mutable = self.mutable.lock().unwrap();
if mutable.assigned {
panic!("Double assign");
};
mutable.assigned = true;
if let Some(object_state_sink) = &mutable.object_state_sink {
object_state_sink.lock().unwrap().assign();
}
self.exchange_core().load.fetch_add(self.composition_cost(), Ordering::Relaxed);
true
}
fn unassign(&self, _object_core: &Arc<ObjectCore>) -> bool {
let mut mutable = self.mutable.lock().unwrap();
if !mutable.assigned {
panic!("Double unassign (#1)");
};
mutable.assigned = false;
if let Some(object_state_sink) = &mutable.object_state_sink {
object_state_sink.lock().unwrap().unassign();
}
self.exchange_core().load.fetch_sub(self.composition_cost(), Ordering::Relaxed);
true
}
fn object(&self) -> bool {
true
}
}
impl RemoteObjectExposeContractSharedMutable {
pub(crate) fn object_state_sink_dropped(&mut self) {
if self.object_state_sink.is_none() {
panic!("Double return")
}
drop(self.object_state_sink.take());
}
pub(crate) fn object_state_stream_dropped(&mut self) {
if self.object_state_stream.is_none() {
panic!("Double return")
}
drop(self.object_state_stream.take());
}
}