use std::collections::HashMap;
use std::time::Duration;
use automerge::{Automerge, ChangeHash, ReadDoc, sync};
use crate::{
ConnectionId, DocumentId, PeerRequestState, UnixTimestamp,
actors::{
document::peer_doc_connection::{AnnouncePolicy, PeerDocConnection},
messages::SyncMessage,
},
};
#[derive(Debug)]
pub(crate) struct Request {
#[allow(dead_code)]
doc_id: DocumentId,
peer_states: HashMap<ConnectionId, Peer>,
}
#[derive(Debug)]
struct Peer {
state: PeerState,
}
#[derive(Debug)]
enum PeerState {
Requesting(Requesting),
RequestedFromUs { unavailable_response_sent: bool },
Unavailable,
Syncing { their_heads: Vec<ChangeHash> },
}
#[derive(Debug)]
enum Requesting {
AwaitingSend,
Sent,
AwaitingAnnouncePolicy,
NotSentDueToAnnouncePolicy,
}
impl From<AnnouncePolicy> for Requesting {
fn from(value: AnnouncePolicy) -> Self {
match value {
AnnouncePolicy::DontAnnounce => Requesting::NotSentDueToAnnouncePolicy,
AnnouncePolicy::Announce => Requesting::AwaitingSend,
AnnouncePolicy::Loading | AnnouncePolicy::Unknown => Requesting::AwaitingAnnouncePolicy,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RequestOutcome {
Searching,
Found,
Exhausted,
}
impl Request {
pub(crate) fn new<'a, I: Iterator<Item = &'a PeerDocConnection>>(
doc_id: DocumentId,
connections: I,
) -> Self {
Self {
peer_states: connections
.map(|c| {
(
c.connection_id,
Peer {
state: PeerState::Requesting(c.announce_policy().into()),
},
)
})
.collect(),
doc_id,
}
}
pub(crate) fn add_connection(&mut self, conn: &PeerDocConnection) {
self.peer_states.insert(
conn.connection_id,
Peer {
state: PeerState::Requesting(conn.announce_policy().into()),
},
);
}
pub(crate) fn remove_connection(&mut self, id: ConnectionId) {
self.peer_states.remove(&id);
}
pub(crate) fn receive_message(
&mut self,
now: UnixTimestamp,
doc: &mut Automerge,
conn: &mut PeerDocConnection,
msg: SyncMessage,
) -> Option<Duration> {
let Some(peer) = self.peer_states.get_mut(&conn.connection_id) else {
tracing::warn!(connection_id=?conn.connection_id, "received message for unknown connection");
return None;
};
match (msg, &mut peer.state) {
(SyncMessage::Request { .. }, PeerState::Requesting { .. }) => {
peer.state = PeerState::RequestedFromUs {
unavailable_response_sent: false,
};
None
}
(SyncMessage::Request { .. }, PeerState::RequestedFromUs { .. }) => None,
(SyncMessage::Request { .. }, PeerState::Unavailable) => None,
(SyncMessage::Request { .. }, PeerState::Syncing { .. }) => {
peer.state = PeerState::Unavailable;
None
}
(SyncMessage::Sync { data }, PeerState::Requesting { .. }) => {
let duration = apply_sync_data(now, doc, conn, peer, &data)?;
let their_heads = conn.their_heads().unwrap_or_default();
if their_heads.is_empty() {
tracing::trace!("their heads are empty, transitioning to unavailable");
peer.state = PeerState::Unavailable;
} else {
tracing::info!(connection_id=?conn.connection_id, "starting sync with peer");
peer.state = PeerState::Syncing { their_heads };
}
Some(duration)
}
(
SyncMessage::Sync { data },
PeerState::Unavailable | PeerState::RequestedFromUs { .. },
) => {
let duration = apply_sync_data(now, doc, conn, peer, &data)?;
let their_heads = conn.their_heads().unwrap_or_default();
peer.state = PeerState::Syncing { their_heads };
Some(duration)
}
(SyncMessage::Sync { data }, PeerState::Syncing { .. }) => {
apply_sync_data(now, doc, conn, peer, &data)
}
(
SyncMessage::DocUnavailable,
PeerState::Requesting { .. } | PeerState::RequestedFromUs { .. },
) => {
peer.state = PeerState::Unavailable;
None
}
(SyncMessage::DocUnavailable, PeerState::Unavailable) => None,
(SyncMessage::DocUnavailable, PeerState::Syncing { .. }) => {
peer.state = PeerState::Unavailable;
None
}
}
}
pub(crate) fn generate_message(
&mut self,
now: UnixTimestamp,
doc: &Automerge,
conn: &mut PeerDocConnection,
) -> Option<(SyncMessage, Duration)> {
let any_peer_is_syncing = self
.peer_states
.values()
.any(|s| matches!(s.state, PeerState::Syncing { .. }));
let Some(peer) = self.peer_states.get_mut(&conn.connection_id) else {
tracing::warn!(conn_id=?conn.connection_id, "no peer state for connection ID");
return None;
};
match &mut peer.state {
PeerState::Requesting(requesting) => {
if !matches!(requesting, Requesting::AwaitingSend) {
return None;
}
if any_peer_is_syncing {
return None;
}
conn.reset_sync_state();
*requesting = Requesting::Sent;
conn.generate_sync_message(now, doc)
.map(|(msg, duration)| (SyncMessage::Request { data: msg.encode() }, duration))
}
PeerState::Syncing { .. } => conn
.generate_sync_message(now, doc)
.map(|(msg, duration)| (SyncMessage::Sync { data: msg.encode() }, duration)),
PeerState::Unavailable => None,
PeerState::RequestedFromUs {
unavailable_response_sent,
} => {
if *unavailable_response_sent {
None
} else {
*unavailable_response_sent = true;
Some((SyncMessage::DocUnavailable, Duration::default()))
}
}
}
}
pub(crate) fn outcome(&self, doc: &Automerge) -> RequestOutcome {
let all_peers_unavailable = self.peer_states.values().all(|peer| {
matches!(
peer.state,
PeerState::Unavailable
| PeerState::RequestedFromUs { .. }
| PeerState::Requesting(Requesting::NotSentDueToAnnouncePolicy)
)
});
let any_sync_is_done = self.peer_states.values().any(|peer| {
matches!(&peer.state, PeerState::Syncing { their_heads }
if their_heads.iter().all(|head| doc.get_change_by_hash(head).is_some()))
});
if any_sync_is_done {
RequestOutcome::Found
} else if all_peers_unavailable && !self.peer_states.is_empty() {
RequestOutcome::Exhausted
} else {
RequestOutcome::Searching
}
}
pub(crate) fn peer_states(&self) -> HashMap<ConnectionId, PeerRequestState> {
self.peer_states
.iter()
.map(|(conn_id, peer)| {
let state = match &peer.state {
PeerState::Requesting(_) | PeerState::RequestedFromUs { .. } => {
PeerRequestState::Requested
}
PeerState::Unavailable => PeerRequestState::Unavailable,
PeerState::Syncing { .. } => PeerRequestState::Syncing,
};
(*conn_id, state)
})
.collect()
}
pub(crate) fn announce_policy_changed(&mut self, peer: ConnectionId, policy: AnnouncePolicy) {
let Some(peer) = self.peer_states.get_mut(&peer) else {
return;
};
if let PeerState::Requesting(requesting) = &peer.state {
match requesting {
Requesting::AwaitingAnnouncePolicy => match policy {
AnnouncePolicy::Announce => {
peer.state = PeerState::Requesting(Requesting::AwaitingSend);
}
AnnouncePolicy::DontAnnounce => {
peer.state = PeerState::Requesting(Requesting::NotSentDueToAnnouncePolicy);
}
_ => {}
},
Requesting::NotSentDueToAnnouncePolicy if policy == AnnouncePolicy::Announce => {
peer.state = PeerState::Requesting(Requesting::AwaitingSend);
}
_ => {}
}
}
}
}
fn apply_sync_data(
now: UnixTimestamp,
doc: &mut Automerge,
conn: &mut PeerDocConnection,
peer: &mut Peer,
data: &[u8],
) -> Option<Duration> {
let sync_msg = match sync::Message::decode(data) {
Ok(msg) => msg,
Err(e) => {
tracing::warn!(
connection_id=?conn.connection_id, err=?e,
"failed to decode sync message, marking peer as unavailable"
);
peer.state = PeerState::Unavailable;
return None;
}
};
match conn.receive_sync_message(now, doc, sync_msg) {
Ok(duration) => Some(duration),
Err(e) => {
tracing::warn!(
connection_id=?conn.connection_id, err=?e,
"failed to apply sync message, marking peer as unavailable"
);
peer.state = PeerState::Unavailable;
None
}
}
}