use std::{collections::{HashMap, VecDeque}, sync::{atomic::Ordering, Arc, Mutex}, task::Waker};
use super::core::TagCore;
use crate::{contract::Contract, exchange::core::ExchangeCore, expose_contract::ExposeContract, object::{core::ObjectCore, state_sink::{ExposeContractMutable, ObjectStateSink, ObjectStateSinkSharedMutable}}, LocalExchange, TagDescriptor};
#[derive(Clone)]
pub struct TagExposeContract {
drop_guard: Arc<TagExposeContractDropGuard>,
}
struct TagExposeContractDropGuard {
shared: Arc<TagExposeContractShared>,
}
pub(crate) struct TagExposeContractShared {
exchange: LocalExchange,
tag_core: Arc<TagCore>,
composition_cost: u32,
mutable: Mutex<TagExposeContractSharedMutable>,
}
struct TagExposeContractSharedMutable {
terminated: bool,
object_state_sinks_ready_for_check_out: VecDeque<Arc<ObjectCore>>,
object_state_sinks: HashMap<Arc<ObjectCore>, (bool, Option<Arc<Mutex<ObjectStateSinkSharedMutable>>>)>,
waker: Vec<Waker>,
}
impl TagExposeContract {
pub(super) fn new(exchange: LocalExchange, tag_core: Arc<TagCore>, composition_cost: u32) -> TagExposeContract {
let shared = Arc::new(TagExposeContractShared {
exchange,
tag_core,
composition_cost,
mutable: Mutex::new(TagExposeContractSharedMutable {
terminated: false,
waker: Vec::new(),
object_state_sinks: HashMap::new(),
object_state_sinks_ready_for_check_out: VecDeque::new(),
}),
});
shared.tag_core.link_tag_expose_contract(shared.clone());
TagExposeContract { drop_guard: Arc::new(TagExposeContractDropGuard { shared }) }
}
pub fn descriptor(&self) -> &TagDescriptor {
&self.drop_guard.shared.tag_core.descriptor
}
pub fn ready(&mut self) -> Vec<ObjectStateSink> {
let mut shared_mutable = self.drop_guard.shared.mutable.lock().unwrap();
let mut ret = Vec::new();
while let Some(Some(object_state_stream)) = shared_mutable.check_out_object_state_sink(&self.drop_guard.shared) {
ret.push(object_state_stream);
}
ret
}
pub fn terminate(&mut self) {
self.drop_guard.shared.terminate();
}
}
impl Drop for TagExposeContractDropGuard {
fn drop(&mut self) {
self.shared.contract_dropped();
}
}
impl TagExposeContractShared {
fn contract_dropped(self: &Arc<Self>) {
self.terminate();
assert_eq!(Arc::strong_count(self), 1);
}
pub(crate) fn object_state_sink_dropped(self: &Arc<Self>, object_core: &Arc<ObjectCore>) {
let mut mutable = self.mutable.lock().unwrap();
let (assigned, object_state_sink) = mutable.object_state_sinks.get_mut(object_core).unwrap();
*object_state_sink = None;
if *assigned {
mutable.object_state_sinks_ready_for_check_out.push_back(object_core.clone());
} else {
mutable.object_state_sinks.remove(object_core);
}
mutable.wake();
}
fn terminate(self: &Arc<Self>) {
let mut mutable = self.mutable.lock().unwrap();
if !mutable.terminated {
mutable.terminate();
drop(mutable);
let dyn_self: Arc<dyn ExposeContract> = self.clone();
self.tag_core.unlink_tag_expose_contract(&dyn_self);
drop(dyn_self);
}
}
}
impl Contract for TagExposeContractShared {
fn id(&self) -> usize {
(self as *const Self) as usize
}
fn exchange_core(&self) -> &Arc<ExchangeCore> {
self.exchange.shared.exchange_core.as_ref().unwrap()
}
}
impl TagExposeContractSharedMutable {
fn wake(&mut self) {
for waker in self.waker.drain(..) {
waker.wake();
}
}
fn check_out_object_state_sink(&mut self, shared: &Arc<TagExposeContractShared>) -> Option<Option<ObjectStateSink>> {
if self.terminated {
Some(None)
} else if let Some(object_core) = self.object_state_sinks_ready_for_check_out.pop_front() {
let object_state_sinks = ObjectStateSink::new(object_core.clone(), ExposeContractMutable::Tag(shared.clone()));
self.object_state_sinks.get_mut(&object_core).unwrap().1 = Some(object_state_sinks.drop_guard.shared.clone());
Some(Some(object_state_sinks))
} else {
None
}
}
fn terminate(&mut self) {
if !self.terminated {
self.terminated = true;
self.wake();
for (_assigned, object_state_sink) in self.object_state_sinks.values() {
if let Some(object_state_sink) = object_state_sink {
object_state_sink.lock().unwrap().contract_terminated();
}
}
}
}
}
impl ExposeContract for TagExposeContractShared {
fn composition_cost(&self) -> u32 {
self.composition_cost
}
fn assign(&self, object_core: &Arc<ObjectCore>) -> bool {
let mut mutable = self.mutable.lock().unwrap();
if let Some((assigned, object_state_sink)) = mutable.object_state_sinks.get_mut(object_core) {
if *assigned {
panic!("Double assign");
};
*assigned = true;
if let Some(object_state_sink) = object_state_sink {
object_state_sink.lock().unwrap().assign();
} else {
mutable.object_state_sinks_ready_for_check_out.push_back(object_core.clone());
}
} else {
mutable.object_state_sinks.insert(object_core.clone(), (true, None));
mutable.object_state_sinks_ready_for_check_out.push_back(object_core.clone());
}
self.exchange_core().load.fetch_add(self.composition_cost(), Ordering::Relaxed);
mutable.wake();
true
}
fn unassign(&self, object_core: &Arc<ObjectCore>) -> bool {
let mut mutable = self.mutable.lock().unwrap();
if let Some((assigned, object_state_sink)) = mutable.object_state_sinks.get_mut(object_core) {
if !*assigned {
panic!("Double unassign (#1)");
};
*assigned = false;
if let Some(object_state_sink) = object_state_sink {
object_state_sink.lock().unwrap().unassign();
} else {
let index = mutable.object_state_sinks_ready_for_check_out.iter().position(|v| v == object_core).unwrap();
mutable.object_state_sinks_ready_for_check_out.remove(index);
}
} else {
panic!("Double unassign (#2)");
}
self.exchange_core().load.fetch_sub(self.composition_cost(), Ordering::Relaxed);
mutable.wake();
true
}
fn object(&self) -> bool {
false
}
}
impl futures::Stream for TagExposeContract {
type Item = ObjectStateSink;
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
let mut shared_mutable = self.drop_guard.shared.mutable.lock().unwrap();
match shared_mutable.check_out_object_state_sink(&self.drop_guard.shared) {
Some(object_state_sink) => std::task::Poll::Ready(object_state_sink),
None => {
shared_mutable.waker.push(cx.waker().clone());
std::task::Poll::Pending
}
}
}
}
impl std::fmt::Debug for TagExposeContractShared {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str(&format!("TagExposeContract:{}", self.tag_core.descriptor))
}
}