hakuban 0.7.2

Data-object sharing library
Documentation
use std::{collections::{HashMap, VecDeque}, sync::{atomic::Ordering, Arc, Mutex}, task::Waker};

use super::core::TagCore;
use crate::{contract::Contract, exchange::core::ExchangeCore, expose_contract::ExposeContract, object::{core::ObjectCore, state_sink::{ExposeContractMutable, ObjectStateSink, ObjectStateSinkSharedMutable}}, LocalExchange, TagDescriptor};


/// Represents a wish, a __contract__ to expose any object with specific tag
///
/// TagExposeContract is a [Stream](futures_util::stream::Stream), emitting [ObjectStateSink] objects.
///
/// A new ObjectStateSink will get emitted if/when [LocalExchange] gets to know at least one observer ([ObjectObserveContract](crate::ObjectObserveContract)) of an object tagged with the tag the contract pertains to.
/// TagExposeContract will not emit more than one [ObjectStateSink] per object at a time. It will emit multiple ObjectStateSinks accepting ObjectStates of different objects.
/// New [ObjectStateSink] for a specific object will only get emitted when previous ObjectStateSink of that object gets dropped.
///
/// The [Stream](futures_util::stream::Stream) will end when contract gets [TagExposeContract::terminate]d.
///
/// # Cloning
/// TagExposeContract can be cloned, but cloning does not create a new contract. Think of it as creating o paper copy of an on-paper contract.
/// It's a new sheet of paper, but not a new deal/agreement. Clones are basically references to a logical agreement.
///
/// Accordingly, terminating any of the clones (or the original), renders all of them terminated.
#[derive(Clone)]
pub struct TagExposeContract {
	drop_guard: Arc<TagExposeContractDropGuard>,
}

struct TagExposeContractDropGuard {
	shared: Arc<TagExposeContractShared>,
}


pub(crate) struct TagExposeContractShared {
	exchange: LocalExchange,
	tag_core: Arc<TagCore>,
	composition_cost: u32,
	mutable: Mutex<TagExposeContractSharedMutable>,
}


struct TagExposeContractSharedMutable {
	terminated: bool,
	object_state_sinks_ready_for_check_out: VecDeque<Arc<ObjectCore>>,
	object_state_sinks: HashMap<Arc<ObjectCore>, (bool, Option<Arc<Mutex<ObjectStateSinkSharedMutable>>>)>,
	waker: Vec<Waker>,
}

//LockCheck:  scope=TagExposeContract
impl TagExposeContract {
	//LockCheck:  call=TagCore::link_tag_expose_contract
	pub(super) fn new(exchange: LocalExchange, tag_core: Arc<TagCore>, composition_cost: u32) -> TagExposeContract {
		let shared = Arc::new(TagExposeContractShared {
			exchange,
			tag_core,
			composition_cost,
			mutable: Mutex::new(TagExposeContractSharedMutable {
				terminated: false,
				waker: Vec::new(),
				object_state_sinks: HashMap::new(),
				object_state_sinks_ready_for_check_out: VecDeque::new(),
			}),
		});
		shared.tag_core.link_tag_expose_contract(shared.clone());
		TagExposeContract { drop_guard: Arc::new(TagExposeContractDropGuard { shared }) }
	}

	pub fn descriptor(&self) -> &TagDescriptor {
		&self.drop_guard.shared.tag_core.descriptor
	}

	//LockCheck:  lock=mutable  call=TagExposeContractMutable::check_out_object_state_sink  unlock=mutable
	pub fn ready(&mut self) -> Vec<ObjectStateSink> {
		let mut shared_mutable = self.drop_guard.shared.mutable.lock().unwrap();
		let mut ret = Vec::new();
		while let Some(Some(object_state_stream)) = shared_mutable.check_out_object_state_sink(&self.drop_guard.shared) {
			ret.push(object_state_stream);
		}
		ret
	}

	pub fn terminate(&mut self) {
		self.drop_guard.shared.terminate();
	}
}


//LockCheck:  scope=TagExposeContract
impl Drop for TagExposeContractDropGuard {
	//LockCheck:  call=contract_dropped
	fn drop(&mut self) {
		self.shared.contract_dropped();
	}
}

//LockCheck:  scope=TagExposeContract
impl TagExposeContractShared {
	//LockCheck:  call=terminate
	fn contract_dropped(self: &Arc<Self>) {
		self.terminate();
		assert_eq!(Arc::strong_count(self), 1);
	}

	//LockCheck:  lock=mutable  unlock=mutable
	pub(crate) fn object_state_sink_dropped(self: &Arc<Self>, object_core: &Arc<ObjectCore>) {
		let mut mutable = self.mutable.lock().unwrap();
		let (assigned, object_state_sink) = mutable.object_state_sinks.get_mut(object_core).unwrap();
		*object_state_sink = None;
		if *assigned {
			mutable.object_state_sinks_ready_for_check_out.push_back(object_core.clone());
		} else {
			mutable.object_state_sinks.remove(object_core);
		}
		mutable.wake();
	}

	//LockCheck:  lock=mutable  call=TagExposeContractMutable::terminate  unlock=mutable  call=TagCore::unlink_tag_expose_contract
	fn terminate(self: &Arc<Self>) {
		let mut mutable = self.mutable.lock().unwrap();
		if !mutable.terminated {
			mutable.terminate();
			drop(mutable);
			let dyn_self: Arc<dyn ExposeContract> = self.clone();
			self.tag_core.unlink_tag_expose_contract(&dyn_self);
			drop(dyn_self);
		}
	}
}


impl Contract for TagExposeContractShared {
	fn id(&self) -> usize {
		(self as *const Self) as usize
	}

	fn exchange_core(&self) -> &Arc<ExchangeCore> {
		self.exchange.shared.exchange_core.as_ref().unwrap()
	}
}


//LockCheck:  scope=TagExposeContractMutable
impl TagExposeContractSharedMutable {
	fn wake(&mut self) {
		for waker in self.waker.drain(..) {
			waker.wake();
		}
	}

	//LockCheck:  call=ObjectStateSink::new
	fn check_out_object_state_sink(&mut self, shared: &Arc<TagExposeContractShared>) -> Option<Option<ObjectStateSink>> {
		if self.terminated {
			Some(None)
		} else if let Some(object_core) = self.object_state_sinks_ready_for_check_out.pop_front() {
			let object_state_sinks = ObjectStateSink::new(object_core.clone(), ExposeContractMutable::Tag(shared.clone()));
			self.object_state_sinks.get_mut(&object_core).unwrap().1 = Some(object_state_sinks.drop_guard.shared.clone());
			Some(Some(object_state_sinks))
		} else {
			None
		}
	}

	//LockCheck: lock=ObjectStateSink::mutable  call=ObjectStateSink::contract_terminated  unlock=ObjectStateSink::mutable
	fn terminate(&mut self) {
		if !self.terminated {
			self.terminated = true;
			self.wake();
			//TODO: anything else here?
			for (_assigned, object_state_sink) in self.object_state_sinks.values() {
				if let Some(object_state_sink) = object_state_sink {
					object_state_sink.lock().unwrap().contract_terminated();
				}
			}
		}
	}
}

//LockCheck:  scope=TagExposeContract
impl ExposeContract for TagExposeContractShared {
	fn composition_cost(&self) -> u32 {
		self.composition_cost
	}

	//TODO: take ref instead; no return? is return for contract drop?
	//LockCheck:  lock=mutable  lock=ObjectStateSink::mutable  call=ObjectStateSink::assign  unlock=ObjectStateSink::mutable  unlock=mutable
	fn assign(&self, object_core: &Arc<ObjectCore>) -> bool {
		let mut mutable = self.mutable.lock().unwrap();
		if let Some((assigned, object_state_sink)) = mutable.object_state_sinks.get_mut(object_core) {
			if *assigned {
				panic!("Double assign");
			};
			*assigned = true;
			if let Some(object_state_sink) = object_state_sink {
				object_state_sink.lock().unwrap().assign();
			} else {
				mutable.object_state_sinks_ready_for_check_out.push_back(object_core.clone());
			}
		} else {
			mutable.object_state_sinks.insert(object_core.clone(), (true, None));
			mutable.object_state_sinks_ready_for_check_out.push_back(object_core.clone());
		}

		self.exchange_core().load.fetch_add(self.composition_cost(), Ordering::Relaxed);

		mutable.wake();
		true
	}

	//LockCheck:  lock=mutable  lock=ObjectStateSink::mutable  call=ObjectStateSink::unassign  unlock=ObjectStateSink::mutable  unlock=mutable
	fn unassign(&self, object_core: &Arc<ObjectCore>) -> bool {
		let mut mutable = self.mutable.lock().unwrap();
		if let Some((assigned, object_state_sink)) = mutable.object_state_sinks.get_mut(object_core) {
			if !*assigned {
				panic!("Double unassign (#1)");
			};
			*assigned = false;
			if let Some(object_state_sink) = object_state_sink {
				object_state_sink.lock().unwrap().unassign();
			} else {
				let index = mutable.object_state_sinks_ready_for_check_out.iter().position(|v| v == object_core).unwrap();
				mutable.object_state_sinks_ready_for_check_out.remove(index);
			}
		} else {
			panic!("Double unassign (#2)");
		}

		self.exchange_core().load.fetch_sub(self.composition_cost(), Ordering::Relaxed);

		mutable.wake();
		true
	}

	fn object(&self) -> bool {
		false
	}
}


//LockCheck:  scope=TagExposeContract
impl futures::Stream for TagExposeContract {
	type Item = ObjectStateSink;

	//LockCheck:  lock=mutable  call=TagExposeContractMutable::check_out_object_state_sink  unlock=mutable
	fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
		let mut shared_mutable = self.drop_guard.shared.mutable.lock().unwrap();
		match shared_mutable.check_out_object_state_sink(&self.drop_guard.shared) {
			Some(object_state_sink) => std::task::Poll::Ready(object_state_sink),
			None => {
				shared_mutable.waker.push(cx.waker().clone());
				std::task::Poll::Pending
			}
		}
	}
}



impl std::fmt::Debug for TagExposeContractShared {
	fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		formatter.write_str(&format!("TagExposeContract:{}", self.tag_core.descriptor))
	}
}