hakuban 0.8.5

Data-object sharing library
Documentation
use std::{
	hash::Hash,
	sync::{
		atomic::{AtomicBool, Ordering::Relaxed},
		Arc,
	},
};

use futures::task::AtomicWaker;

use super::{core::Object, DataBytes, DataSynchronized, DataVersion, ObjectObserveContractId, ObjectState};
#[cfg(feature = "downstream")]
use crate::connection::downstream::DownstreamConnectionId;
use crate::{connection::upstream::UpstreamConnectionId, tag::TagObserveContractId, ObjectDescriptor};

#[derive(PartialEq, Eq, Hash, Copy, Clone)]
pub(crate) struct StateStreamId(pub u64);

#[derive(Clone)]
pub(crate) enum ObjectStateStreamOwner {
	ObjectObserveContract(ObjectObserveContractId),
	TagObserveContract(TagObserveContractId),
	UpstreamConnection(UpstreamConnectionId),
	#[cfg(feature = "downstream")]
	DownstreamConnection(DownstreamConnectionId),
}

/// A [futures::stream::Stream], emitting [ObjectState]s
///
/// The Stream will end when, either, contract which emitted this ObjectStateStream gets dropped, or when hakuban network decides to drop the object.
/// Hakuban network will only keep alive those objects which have at least one [ObjectObserveContract](crate::ObjectObserveContract) or [ObjectExposeContract](crate::ObjectExposeContract).
pub struct ObjectStateStream {
	id: StateStreamId,
	owner: ObjectStateStreamOwner,
	pub(crate) object_core: Arc<Object>,
	last_reported_version: Option<Option<DataVersion>>,
	last_reported_synchronized: Option<Option<DataSynchronized>>,
	shared: Arc<ObjectStateStreamShared>,
	notify_object_on_drop: bool,
}

pub(crate) struct ObjectStateStreamInlet {
	id: StateStreamId,
	shared: Arc<ObjectStateStreamShared>,
}

struct ObjectStateStreamShared {
	closed: AtomicBool,
	waker: AtomicWaker,
}

impl ObjectStateStream {
	pub(crate) fn new(object_core: Arc<Object>, id: StateStreamId, owner: ObjectStateStreamOwner) -> (ObjectStateStream, ObjectStateStreamInlet) {
		let shared = Arc::new(ObjectStateStreamShared { waker: AtomicWaker::new(), closed: AtomicBool::new(false) });
		(
			ObjectStateStream {
				id,
				owner,
				object_core,
				last_reported_version: None,
				last_reported_synchronized: None,
				shared: shared.clone(),
				notify_object_on_drop: true,
			},
			ObjectStateStreamInlet { id, shared },
		)
	}

	#[cfg(feature = "downstream")]
	pub(crate) fn drop_without_notification(mut self) {
		self.notify_object_on_drop = false;
		drop(self);
	}

	pub fn descriptor(&self) -> &ObjectDescriptor {
		&self.object_core.descriptor
	}
}

impl Drop for ObjectStateStream {
	fn drop(&mut self) {
		if self.notify_object_on_drop {
			self.object_core.state_stream_dropped(self.id, &self.owner);
		}
	}
}

impl ObjectStateStreamInlet {
	pub(crate) fn id(&self) -> StateStreamId {
		self.id
	}

	pub(crate) fn wake(&self) {
		self.shared.waker.wake();
	}

	pub(crate) fn close(&self) {
		self.shared.closed.store(true, Relaxed);
		self.shared.waker.wake();
	}
}

impl futures::Stream for ObjectStateStream {
	type Item = ObjectState<DataBytes>;

	fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
		self.shared.waker.register(cx.waker());

		if self.shared.closed.load(Relaxed) {
			return std::task::Poll::Ready(None);
		};

		let Some(state) = self.object_core.state_get() else {
			return std::task::Poll::Pending;
		};

		let changed = self.last_reported_version.is_none()
			|| self.last_reported_version.as_ref().unwrap().as_ref() != Some(&state.version)
			|| self.last_reported_synchronized.is_none()
			|| self.last_reported_synchronized.as_ref().unwrap().as_ref() != Some(&state.synchronized);
		if changed {
			self.last_reported_version = Some(Some(state.version.clone()));
			self.last_reported_synchronized = Some(Some(state.synchronized.clone()));
			std::task::Poll::Ready(Some(state))
		} else {
			std::task::Poll::Pending
		}
	}
}