hakuban 0.8.5

Data-object sharing library
Documentation
use std::{
	sync::{
		atomic::{AtomicBool, Ordering::Relaxed},
		Arc,
	},
	task::Poll,
};

use futures::{task::AtomicWaker, Sink};

use super::{core::Object, expose_contract::ObjectExposeContractId, state::ObjectState};
#[cfg(feature = "downstream")]
use crate::connection::downstream::DownstreamConnectionId;
use crate::{connection::upstream::UpstreamConnectionId, tag::TagExposeContractId, DataBytes, ObjectDescriptor};

#[derive(PartialEq, Eq, Hash, Copy, Clone, Ord, PartialOrd)]
pub(crate) struct StateSinkId(pub u64);

#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub(crate) enum ObjectStateSinkOwner {
	ObjectExposeContract(ObjectExposeContractId),
	TagExposeContract(TagExposeContractId),
	UpstreamConnection(UpstreamConnectionId),
	#[cfg(feature = "downstream")]
	DownstreamConnection(DownstreamConnectionId),
}

/// Currently empty struct, to hold sink parameters in the future
#[derive(Eq, PartialEq, Clone)]
pub struct ObjectStateSinkParams {}

/// A [futures::sink::Sink], accepting [ObjectState]s
///
/// Basic function of the ObjectStateSink is passing new states of objects to the [Exchange](crate::Exchange).
///
/// The ObjectStateSink, besides being a Sink, also implements a [futures::stream::Stream], carrying [ObjectStateSinkParams] structs.
/// The Stream will end when, either, contract which emitted this ObjectStateSink gets dropped, or when the contract stops being the contract selected for exposing the object.
/// Hakuban network will, under normal conditions, try to pick only one expose contract per object, to keep that object exposed/up-to-date.
pub struct ObjectStateSink {
	pub(crate) id: StateSinkId,
	owner: ObjectStateSinkOwner,
	pub(crate) object_core: Arc<Object>,
	last_reported_params: Option<Option<ObjectStateSinkParams>>,
	shared: Arc<ObjectStateSinkShared>,
	notify_object_on_drop: bool,
}

pub(crate) struct ObjectStateSinkInlet {
	id: StateSinkId,
	shared: Arc<ObjectStateSinkShared>,
}

struct ObjectStateSinkShared {
	closed: AtomicBool,
	waker: AtomicWaker,
}

impl ObjectStateSink {
	pub(crate) fn new(object_core: Arc<Object>, id: StateSinkId, owner: ObjectStateSinkOwner) -> (ObjectStateSink, ObjectStateSinkInlet) {
		let shared = Arc::new(ObjectStateSinkShared { closed: AtomicBool::new(false), waker: AtomicWaker::new() });
		(ObjectStateSink { id, owner, object_core, shared: shared.clone(), last_reported_params: None, notify_object_on_drop: true }, ObjectStateSinkInlet {
			id,
			shared,
		})
	}

	#[cfg(feature = "downstream")]
	pub(crate) fn drop_without_notification(mut self) {
		self.notify_object_on_drop = false;
		drop(self);
	}

	pub fn descriptor(&self) -> &ObjectDescriptor {
		&self.object_core.descriptor
	}
}

impl Drop for ObjectStateSink {
	fn drop(&mut self) {
		if self.notify_object_on_drop {
			self.object_core.desynchronize(self.id);
			self.object_core.state_sink_dropped(self.id, &self.owner);
		}
	}
}

impl ObjectStateSinkInlet {
	pub(crate) fn id(&self) -> StateSinkId {
		self.id
	}

	//TODO: will get useful for implementation of SinkParams streaming
	/*
	fn wake(&self) {
		self.shared.waker.wake();
	}
	*/

	pub(crate) fn close(&self) {
		self.shared.closed.store(true, Relaxed);
		self.shared.waker.wake();
	}
}

impl futures::Stream for ObjectStateSink {
	type Item = ObjectStateSinkParams;

	fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
		fn return_candidate(this: &mut std::pin::Pin<&mut ObjectStateSink>) -> std::task::Poll<Option<ObjectStateSinkParams>> {
			if this.shared.closed.load(Relaxed) {
				return std::task::Poll::Ready(None);
			};

			let object_state_sink_params = ObjectStateSinkParams {};

			let changed = this.last_reported_params.is_none() || this.last_reported_params.as_ref().unwrap().as_ref() != Some(&object_state_sink_params);

			if changed {
				this.last_reported_params = Some(Some(object_state_sink_params.clone()));
				std::task::Poll::Ready(Some(object_state_sink_params))
			} else {
				std::task::Poll::Pending
			}
		}

		//TODO: measure if calling return_cadidate twice is worse than unconditionally registering the waker
		if let Poll::Ready(value) = return_candidate(&mut self) {
			Poll::Ready(value)
		} else {
			self.shared.waker.register(cx.waker());
			return_candidate(&mut self)
		}
	}
}

impl<T: Into<ObjectState<DataBytes>>> Sink<T> for ObjectStateSink {
	type Error = ();

	fn poll_ready(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}

	fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
		let object_state = item.into();
		self.object_core.state_set(object_state, self.id);
		Ok(())
	}

	fn poll_flush(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}

	fn poll_close(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}
}