use crate::daemon::{DocMessage, DocumentActorHandle};
use crate::types::EphemeralMessage;
use anyhow::{Context, Result};
use async_trait::async_trait;
use automerge::sync::{Message as AutomergeSyncMessage, State as SyncState};
use serde::{Deserialize, Serialize};
use std::mem;
use tokio::sync::{broadcast, oneshot};
use tracing::debug;
#[derive(Deserialize, Serialize)]
pub enum PeerMessage {
Sync(Vec<u8>),
Ephemeral(EphemeralMessage),
}
#[async_trait]
pub trait Connection<T>: Send + Sync {
async fn send(&mut self, message: T) -> Result<()>;
async fn next(&mut self) -> Result<T>;
}
pub struct SyncActor {
peer_state: SyncState,
document_handle: DocumentActorHandle,
connection: Box<dyn Connection<PeerMessage>>,
}
impl SyncActor {
pub fn new(
document_handle: DocumentActorHandle,
connection: Box<dyn Connection<PeerMessage>>,
) -> Self {
Self {
peer_state: SyncState::new(),
document_handle,
connection,
}
}
async fn receive_peer_message(&mut self, message: PeerMessage) -> Result<()> {
let (reponse_tx, response_rx) = oneshot::channel();
match message {
PeerMessage::Sync(message_buf) => {
let message = AutomergeSyncMessage::decode(&message_buf)?;
self.document_handle
.send_message(DocMessage::ReceiveSyncMessage {
message,
state: mem::take(&mut self.peer_state),
response_tx: reponse_tx,
})
.await;
self.peer_state = response_rx
.await
.expect("Couldn't read response from Document channel");
}
PeerMessage::Ephemeral(cursor) => {
self.document_handle
.send_message(DocMessage::ReceiveEphemeral(cursor))
.await;
}
}
Ok(())
}
async fn generate_sync_message(&mut self) -> Result<()> {
let (reponse_tx, response_rx) = oneshot::channel();
self.document_handle
.send_message(DocMessage::GenerateSyncMessage {
state: mem::take(&mut self.peer_state),
response_tx: reponse_tx,
})
.await;
let (ps, message) = response_rx
.await
.context("Could not read response from Document channel")?;
self.peer_state = ps;
if let Some(message) = message {
self.connection
.send(PeerMessage::Sync(message.encode()))
.await?;
}
Ok(())
}
pub async fn run(mut self) -> Result<()> {
let mut doc_changed_ping_rx = self.document_handle.subscribe_document_changes();
let mut ephemeral_messages_rx = self.document_handle.subscribe_ephemeral_messages();
self.generate_sync_message().await?;
loop {
tokio::select! {
doc_ping = doc_changed_ping_rx.recv() => {
match doc_ping {
Ok(()) => { self.generate_sync_message().await?; }
Err(broadcast::error::RecvError::Closed) => {
panic!("Doc changed channel has been closed");
}
Err(broadcast::error::RecvError::Lagged(_)) => {
debug!("Doc changed ping channel lagged (this is probably fine).");
}
}
}
ephemeral_message = ephemeral_messages_rx.recv() => {
match ephemeral_message {
Ok(ephemeral_message) => {
self.connection.send(PeerMessage::Ephemeral(ephemeral_message)).await?;
}
Err(broadcast::error::RecvError::Closed) => {
panic!("Ephemeral message channel has been closed");
}
Err(broadcast::error::RecvError::Lagged(_)) => {
debug!("Ephemeral message channel lagged (this is unfortunate, but okay).");
}
}
}
message = self.connection.next() => {
self.receive_peer_message(message?).await?;
}
}
}
}
}