hakuban 0.8.5

Data-object sharing library
Documentation
/*! Communication primitives; only for tests and custom transports
*/

use std::pin::Pin;

use bincode::{Decode, Encode};
use futures::{Sink, Stream};

use crate::{
	object::state::{DataBytes, DataVersion},
	DataFormat,
};

pub type MessageSink = Pin<Box<dyn Sink<Message, Error = futures::channel::mpsc::SendError> + Send + Sync>>;
pub type MessageStream = Pin<Box<dyn Stream<Item = Message> + Send + Sync>>;

// There are 2 types of message channels
//  - Control channel - one per connection direction - handling exchange, descriptor and contract actions
//  - Data channel - one per Sink/Stream - handling object data transmission
//
// Messages within a single channel need to be shipped serially.

//TODO: JSON-validate descriptors

#[derive(Encode, Decode, PartialEq, Eq, Hash, Clone, Debug)]
pub enum ChannelType {
	UpstreamControl,
	DownstreamControl,
	ObjectSinkToUpstream(Vec<String>, String),
	ObjectStreamFromUpstream(Vec<String>, String),
}

#[allow(clippy::enum_variant_names)]
#[derive(Encode, Decode, Debug)]
pub enum Message {
	UninitializedChannelActions(Vec<UninitializedChannelAction>),
	UpstreamControlChannelActions(Vec<UpstreamControlChannelAction>),
	DownstreamControlChannelActions(Vec<DownstreamControlChannelAction>),
	StateStreamActions(Vec<StateAction>),
}

#[derive(Encode, Decode, Debug)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub enum UninitializedChannelAction {
	Initialize(ChannelType),
}

#[allow(clippy::enum_variant_names)]
#[derive(Encode, Decode, Debug)]
pub enum UpstreamControlChannelAction {
	ExchangeAction(ExchangeAction),
	ObjectContractAction(ObjectContractAction),
	TagContractAction(TagContractAction),
}

#[derive(Encode, Decode, Debug)]
pub enum DownstreamControlChannelAction {
	ExchangeAction(ExchangeAction),
}

#[derive(Encode, Decode, Clone, Debug)]
pub enum ExchangeAction {
	SetName(String),
	SetLoadLimit(Vec<(f32, u32)>),
}

#[derive(Encode, Decode, Debug)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub enum ObjectContractAction {
	Observe(Vec<String>, String),
	Unobserve(Vec<String>, String),
	Expose(Vec<String>, String, u32),
	Unexpose(Vec<String>, String),
	SetExposeCapacity(Vec<String>, String, u32),
}

#[derive(Encode, Decode, Debug)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub enum TagContractAction {
	Observe(String),
	Unobserve(String),
	Expose(String, u32),
	Unexpose(String),
	SetExposeCapacity(String, u32),
}

#[derive(Encode, Decode)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub enum StateAction {
	PatchAndSetAndMemorize(DataVersion, Option<DataFormat>, DataBytes, u64),
	SetSynchronized(u64),
}

impl std::fmt::Debug for StateAction {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		match self {
			StateAction::PatchAndSetAndMemorize(version, format, bytes, synchronized) => {
				write!(f, "PatchAndSetAndMemorize({version:?}, {format:?}, {}B, {synchronized})", bytes.len())
			}
			StateAction::SetSynchronized(synchronized) => write!(f, "SetSynchronized({synchronized})"),
		}
	}
}