use std::{pin::Pin, sync::Arc};
use futures::{Sink, SinkExt, Stream, StreamExt};
use super::termination::ConnectionTerminationReason;
use crate::{
connection::{
diff,
message::{Message, StateAction},
},
object::ObjectStateSinkParams,
DataBytes, DataSynchronized, ObjectState, ObjectStateSink, ObjectStateStream,
};
pub(super) async fn receive_state(
input: Pin<Box<dyn Stream<Item = Message> + Send + Sync + 'static>>,
output: Pin<Box<dyn Sink<Message, Error = ConnectionTerminationReason> + Send + Sync + 'static>>,
object_state_sink: ObjectStateSink,
) -> Result<(), ConnectionTerminationReason> {
drop(output);
let object_core = object_state_sink.object_core.clone();
let object_state_sink_id = object_state_sink.id;
let (mut state_sink, params_stream) = object_state_sink.split::<ObjectState<DataBytes>>();
let mut last_received_state: Option<ObjectState<DataBytes>> = None;
enum WhichStream {
Input(Option<Message>),
ObjectStateSink(Option<ObjectStateSinkParams>),
}
let mut select_stream = futures::stream::select(
input.map(Some).map(WhichStream::Input).chain(Box::pin(futures::stream::once(async { WhichStream::Input(None) }))),
params_stream.map(Some).map(WhichStream::ObjectStateSink).chain(Box::pin(futures::stream::once(async { WhichStream::ObjectStateSink(None) }))),
);
while let Some(item) = select_stream.next().await {
match item {
WhichStream::Input(Some(message)) => match message {
Message::StateStreamActions(actions) => {
for action in actions {
match action {
StateAction::PatchAndSetAndMemorize(version, maybe_format, diff, synchronized) => {
last_received_state = Some(ObjectState {
data: Arc::new(diff::patch(
&last_received_state.as_ref().map(|state| state.data.clone()).unwrap_or_else(|| Arc::new(vec![])),
&diff,
)?),
format: maybe_format.or_else(|| last_received_state.as_ref().map(|state| state.format.clone())).ok_or(
ConnectionTerminationReason::SeriousError(
"Received patch with no data_format and no previously sent data_format.".to_string(),
),
)?,
version,
synchronized: DataSynchronized::from_micros_ago(synchronized),
});
state_sink.send(last_received_state.as_ref().unwrap().clone()).await.unwrap();
}
StateAction::SetSynchronized(synchronized_ms_ago) => {
object_core.synchronize(DataSynchronized::from_micros_ago(synchronized_ms_ago), object_state_sink_id);
}
}
}
}
_ => {
return Err(ConnectionTerminationReason::SeriousError(
"state receive channel received message different than Message::StateStreamActions".to_string(),
))
}
},
WhichStream::Input(None) => break,
WhichStream::ObjectStateSink(Some(_object_state_sink_params)) => { }
WhichStream::ObjectStateSink(None) => break,
}
}
Ok(())
}
pub(super) async fn send_state(
input: Pin<Box<dyn Stream<Item = Message> + Send + Sync + 'static>>,
mut output: Pin<Box<dyn Sink<Message, Error = ConnectionTerminationReason> + Send + Sync + 'static>>,
object_state_stream: ObjectStateStream,
) -> Result<(), ConnectionTerminationReason> {
let object_core = object_state_stream.object_core.clone();
let mut last_sent_state: Option<ObjectState<DataBytes>> = None;
enum WhichStream {
Input(Option<Message>),
ObjectStateStream(Option<ObjectState<DataBytes>>),
}
let mut select_stream = futures::stream::select(
input.map(Some).map(WhichStream::Input).chain(Box::pin(futures::stream::once(async { WhichStream::Input(None) }))),
object_state_stream
.map(Some)
.map(WhichStream::ObjectStateStream)
.chain(Box::pin(futures::stream::once(async { WhichStream::ObjectStateStream(None) }))),
);
while let Some(item) = select_stream.next().await {
match item {
WhichStream::Input(Some(_)) => return Err(ConnectionTerminationReason::SeriousError("state send channel received a message, weird".to_string())),
WhichStream::Input(None) => break,
WhichStream::ObjectStateStream(Some(state)) => {
if last_sent_state.as_ref().map(|state| &state.version) != Some(&state.version) {
let diff = object_core
.diff_cache
.get_diff(
last_sent_state.as_ref().map(|state| &state.version),
last_sent_state.as_ref().map(|state| &state.data),
&state.version,
&state.data,
)
.ok_or(ConnectionTerminationReason::SeriousError("Diffing failed. Weird.".to_string()))?;
output
.send(Message::StateStreamActions(vec![StateAction::PatchAndSetAndMemorize(
state.version.clone(),
if last_sent_state.as_ref().map(|state| &state.format) != Some(&state.format) { Some(state.format.clone()) } else { None },
diff,
state.synchronized.micros_ago(),
)]))
.await?;
last_sent_state = Some(state);
} else if last_sent_state.as_ref().map(|state| &state.synchronized) != Some(&state.synchronized) {
output.send(Message::StateStreamActions(vec![StateAction::SetSynchronized(state.synchronized.micros_ago())])).await?;
last_sent_state = Some(state);
}
}
WhichStream::ObjectStateStream(None) => {
break;
}
}
}
Ok(())
}