hakuban 0.8.5

Data-object sharing library
Documentation
use std::{pin::Pin, sync::Arc};

use futures::{Sink, SinkExt, Stream, StreamExt};

use super::termination::ConnectionTerminationReason;
use crate::{
	connection::{
		diff,
		message::{Message, StateAction},
	},
	object::ObjectStateSinkParams,
	DataBytes, DataSynchronized, ObjectState, ObjectStateSink, ObjectStateStream,
};

pub(super) async fn receive_state(
	input: Pin<Box<dyn Stream<Item = Message> + Send + Sync + 'static>>,
	output: Pin<Box<dyn Sink<Message, Error = ConnectionTerminationReason> + Send + Sync + 'static>>,
	object_state_sink: ObjectStateSink,
) -> Result<(), ConnectionTerminationReason> {
	drop(output);
	let object_core = object_state_sink.object_core.clone();
	let object_state_sink_id = object_state_sink.id;

	let (mut state_sink, params_stream) = object_state_sink.split::<ObjectState<DataBytes>>();

	let mut last_received_state: Option<ObjectState<DataBytes>> = None;

	enum WhichStream {
		Input(Option<Message>),
		ObjectStateSink(Option<ObjectStateSinkParams>),
	}

	let mut select_stream = futures::stream::select(
		input.map(Some).map(WhichStream::Input).chain(Box::pin(futures::stream::once(async { WhichStream::Input(None) }))),
		params_stream.map(Some).map(WhichStream::ObjectStateSink).chain(Box::pin(futures::stream::once(async { WhichStream::ObjectStateSink(None) }))),
	);

	while let Some(item) = select_stream.next().await {
		match item {
			WhichStream::Input(Some(message)) => match message {
				Message::StateStreamActions(actions) => {
					for action in actions {
						match action {
							StateAction::PatchAndSetAndMemorize(version, maybe_format, diff, synchronized) => {
								last_received_state = Some(ObjectState {
									data: Arc::new(diff::patch(
										&last_received_state.as_ref().map(|state| state.data.clone()).unwrap_or_else(|| Arc::new(vec![])),
										&diff,
									)?),
									format: maybe_format.or_else(|| last_received_state.as_ref().map(|state| state.format.clone())).ok_or(
										ConnectionTerminationReason::SeriousError(
											"Received patch with no data_format and no previously sent data_format.".to_string(),
										),
									)?,
									version,
									synchronized: DataSynchronized::from_micros_ago(synchronized),
								});
								state_sink.send(last_received_state.as_ref().unwrap().clone()).await.unwrap();
							}
							StateAction::SetSynchronized(synchronized_ms_ago) => {
								object_core.synchronize(DataSynchronized::from_micros_ago(synchronized_ms_ago), object_state_sink_id);
							}
						}
					}
				}
				_ => {
					return Err(ConnectionTerminationReason::SeriousError(
						"state receive channel received message different than Message::StateStreamActions".to_string(),
					))
				}
			},
			WhichStream::Input(None) => break,
			WhichStream::ObjectStateSink(Some(_object_state_sink_params)) => { /* TODO: implement */ }
			WhichStream::ObjectStateSink(None) => break,
		}
	}
	Ok(())
}

pub(super) async fn send_state(
	input: Pin<Box<dyn Stream<Item = Message> + Send + Sync + 'static>>,
	mut output: Pin<Box<dyn Sink<Message, Error = ConnectionTerminationReason> + Send + Sync + 'static>>,
	object_state_stream: ObjectStateStream,
) -> Result<(), ConnectionTerminationReason> {
	let object_core = object_state_stream.object_core.clone();

	let mut last_sent_state: Option<ObjectState<DataBytes>> = None;

	enum WhichStream {
		Input(Option<Message>),
		ObjectStateStream(Option<ObjectState<DataBytes>>),
	}

	let mut select_stream = futures::stream::select(
		input.map(Some).map(WhichStream::Input).chain(Box::pin(futures::stream::once(async { WhichStream::Input(None) }))),
		object_state_stream
			.map(Some)
			.map(WhichStream::ObjectStateStream)
			.chain(Box::pin(futures::stream::once(async { WhichStream::ObjectStateStream(None) }))),
	);
	while let Some(item) = select_stream.next().await {
		match item {
			WhichStream::Input(Some(_)) => return Err(ConnectionTerminationReason::SeriousError("state send channel received a message, weird".to_string())),
			WhichStream::Input(None) => break,
			WhichStream::ObjectStateStream(Some(state)) => {
				if last_sent_state.as_ref().map(|state| &state.version) != Some(&state.version) {
					let diff = object_core
						.diff_cache
						.get_diff(
							last_sent_state.as_ref().map(|state| &state.version),
							last_sent_state.as_ref().map(|state| &state.data),
							&state.version,
							&state.data,
						)
						.ok_or(ConnectionTerminationReason::SeriousError("Diffing failed. Weird.".to_string()))?;
					output
						.send(Message::StateStreamActions(vec![StateAction::PatchAndSetAndMemorize(
							state.version.clone(),
							if last_sent_state.as_ref().map(|state| &state.format) != Some(&state.format) { Some(state.format.clone()) } else { None },
							diff,
							state.synchronized.micros_ago(),
						)]))
						.await?;
					last_sent_state = Some(state);
				} else if last_sent_state.as_ref().map(|state| &state.synchronized) != Some(&state.synchronized) {
					output.send(Message::StateStreamActions(vec![StateAction::SetSynchronized(state.synchronized.micros_ago())])).await?;
					last_sent_state = Some(state);
				}
			}
			WhichStream::ObjectStateStream(None) => {
				break;
			}
		}
	}
	Ok(())
}