use std::collections::HashMap;
use std::time::Duration;
use automerge::Automerge;
use crate::{
ConnectionId, DocumentId, StorageKey, UnixTimestamp,
actors::{
document::{
DocActorResult, SyncDirection, SyncMessageStat,
phase::{
loading::Loading,
ready::Ready,
request::{Request, RequestState},
},
},
messages::{Broadcast, DocMessage, SyncMessage},
},
};
use super::{
DocumentStatus,
peer_doc_connection::{AnnouncePolicy, PeerDocConnection},
};
#[derive(Debug)]
pub(super) struct DocState {
phase: Phase,
document_id: DocumentId,
doc: Automerge,
any_dialer_connecting: bool,
}
#[derive(Debug)]
pub enum Phase {
Loading(Loading),
Requesting(Request),
Ready(Ready),
NotFound,
}
#[derive(Debug)]
enum PhaseTransition {
None,
ToReady,
ToNotFound,
ToRequesting(Request),
ToLoading,
}
impl DocState {
pub fn new_loading(
document_id: DocumentId,
doc: Automerge,
any_dialer_connecting: bool,
) -> Self {
Self {
phase: Phase::Loading(Loading::new()),
document_id,
doc,
any_dialer_connecting,
}
}
pub fn new_ready(document_id: DocumentId, doc: Automerge, any_dialer_connecting: bool) -> Self {
Self {
phase: Phase::Ready(Ready::new()),
document_id,
doc,
any_dialer_connecting,
}
}
fn handle_phase_transition(&mut self, out: &mut DocActorResult, transition: PhaseTransition) {
match transition {
PhaseTransition::None => {}
PhaseTransition::ToReady => {
tracing::trace!("transitioning to ready");
out.send_doc_status_update(DocumentStatus::Ready);
out.emit_doc_changed(self.doc.get_heads());
self.phase = Phase::Ready(Ready::new());
}
PhaseTransition::ToNotFound => {
tracing::trace!("transitioning to NotFound");
out.send_doc_status_update(DocumentStatus::NotFound);
if let Phase::Requesting(request) = &self.phase {
for peer in request.peers_waiting_for_us_to_respond() {
out.send_sync_message(
peer,
self.document_id.clone(),
SyncMessage::DocUnavailable,
);
}
}
self.phase = Phase::NotFound;
}
PhaseTransition::ToRequesting(request) => {
tracing::trace!("transitioning to requesting");
out.send_doc_status_update(DocumentStatus::Requesting);
self.phase = Phase::Requesting(request);
}
PhaseTransition::ToLoading => {
tracing::trace!("transitioning to loading");
out.send_doc_status_update(DocumentStatus::Loading);
self.phase = Phase::Loading(Loading::new());
}
}
}
fn check_request_completion(&self) -> PhaseTransition {
if let Phase::Requesting(request) = &self.phase {
let RequestState { finished, found } =
request.status(&self.doc, self.any_dialer_connecting);
if finished {
if found {
PhaseTransition::ToReady
} else {
PhaseTransition::ToNotFound
}
} else {
PhaseTransition::None
}
} else {
PhaseTransition::None
}
}
pub fn handle_load(
&mut self,
now: UnixTimestamp,
out: &mut DocActorResult,
peer_connections: &mut HashMap<ConnectionId, PeerDocConnection>,
snapshots: &HashMap<StorageKey, Vec<u8>>,
incrementals: &HashMap<StorageKey, Vec<u8>>,
) {
tracing::trace!("handling load");
for (key, snapshot) in snapshots {
if let Err(e) = self.doc.load_incremental(snapshot) {
tracing::warn!(err=?e, %key, "error loading snapshot chunk");
}
}
for (key, incremental) in incrementals {
if let Err(e) = self.doc.load_incremental(incremental) {
tracing::warn!(err=?e, %key, "error loading incremental chunk");
}
}
if let Phase::Loading(loading) = &mut self.phase {
let pending_sync_messages = loading.take_pending_sync_messages();
let phase_transition = if self.doc.get_heads().is_empty() {
let eligible_conns = peer_connections
.values()
.any(|p| p.announce_policy() != AnnouncePolicy::DontAnnounce);
if eligible_conns || self.any_dialer_connecting {
tracing::debug!(
eligible_conns,
self.any_dialer_connecting,
"no data found on disk, requesting document"
);
PhaseTransition::ToRequesting(Request::new(
self.document_id.clone(),
peer_connections.values(),
))
} else {
tracing::debug!(
"no data found on disk and no connections available, transitioning to NotFound"
);
PhaseTransition::ToNotFound
}
} else {
tracing::trace!("load complete, transitioning to ready");
PhaseTransition::ToReady
};
self.handle_phase_transition(out, phase_transition);
for (conn_id, msgs) in pending_sync_messages {
for msg in msgs {
self.handle_sync_message(now, out, conn_id, peer_connections, msg, now);
}
}
}
}
pub(crate) fn add_connection(&mut self, conn: &PeerDocConnection) {
if let Phase::Requesting(request) = &mut self.phase {
request.add_connection(conn)
}
}
pub(crate) fn remove_connection(&mut self, conn_id: ConnectionId) {
if let Phase::Requesting(request) = &mut self.phase {
request.remove_connection(conn_id)
}
}
pub(crate) fn handle_doc_message(
&mut self,
now: UnixTimestamp,
out: &mut DocActorResult,
connection_id: ConnectionId,
peer_connections: &mut HashMap<ConnectionId, PeerDocConnection>,
msg: DocMessage,
received_at: UnixTimestamp,
) {
match msg {
DocMessage::Ephemeral(msg) => {
out.emit_ephemeral_message(msg.data.clone());
let targets = peer_connections
.iter()
.filter_map(|(c, conn)| {
if conn.peer_id != msg.sender_id {
Some(*c)
} else {
None
}
})
.collect();
out.send_broadcast(targets, Broadcast::Gossip { msg });
}
DocMessage::Sync(msg) => self.handle_sync_message(
now,
out,
connection_id,
peer_connections,
msg,
received_at,
),
};
}
fn handle_sync_message(
&mut self,
now: UnixTimestamp,
out: &mut DocActorResult,
connection_id: ConnectionId,
peer_connections: &mut HashMap<ConnectionId, PeerDocConnection>,
msg: SyncMessage,
received_at: UnixTimestamp,
) {
let Some(peer_conn) = peer_connections.get_mut(&connection_id) else {
tracing::warn!(?connection_id, "no sync state found for message");
return;
};
tracing::debug!(?connection_id, peer_id=?peer_conn.peer_id, ?msg, "received msg");
if let SyncMessage::Request { .. } = msg {
peer_conn.mark_requested();
}
let bytes = match &msg {
SyncMessage::Request { data } | SyncMessage::Sync { data } => data.len(),
SyncMessage::DocUnavailable => 0,
};
let (transition, duration) = match &mut self.phase {
Phase::Loading(loading) => {
loading.receive_sync_message(connection_id, msg);
(PhaseTransition::None, None)
}
Phase::Requesting(request) => {
let duration = request.receive_message(now, &mut self.doc, peer_conn, msg);
(self.check_request_completion(), duration)
}
Phase::Ready(ready) => {
let heads_before = self.doc.get_heads();
let duration = ready.receive_sync_message(now, &mut self.doc, peer_conn, msg);
let heads_after = self.doc.get_heads();
if heads_before != heads_after {
out.emit_doc_changed(heads_after);
}
(PhaseTransition::None, duration)
}
Phase::NotFound => match msg {
SyncMessage::Request { data } => {
tracing::trace!("received request whilst in notfound, restarting request");
let request = Request::new(self.document_id.clone(), peer_connections.values());
self.handle_phase_transition(out, PhaseTransition::ToRequesting(request));
self.handle_sync_message(
now,
out,
connection_id,
peer_connections,
SyncMessage::Request { data },
received_at,
);
return;
}
SyncMessage::Sync { data } => {
tracing::trace!("received sync whilst in notfound, moving to ready");
self.handle_phase_transition(out, PhaseTransition::ToReady);
self.handle_sync_message(
now,
out,
connection_id,
peer_connections,
SyncMessage::Sync { data },
received_at,
);
return;
}
SyncMessage::DocUnavailable => (PhaseTransition::None, None),
},
};
if let Some(duration) = duration {
let queue_duration = if now >= received_at {
now - received_at
} else {
Duration::ZERO
};
if bytes > 0 {
out.sync_message_stats.push(SyncMessageStat {
connection_id,
direction: SyncDirection::Received,
bytes,
duration,
queue_duration,
});
}
}
self.handle_phase_transition(out, transition);
}
pub fn generate_sync_messages(
&mut self,
now: UnixTimestamp,
out: &mut DocActorResult,
peer_connections: &mut HashMap<ConnectionId, PeerDocConnection>,
) -> HashMap<ConnectionId, Vec<SyncMessage>> {
let mut result: HashMap<ConnectionId, Vec<SyncMessage>> = HashMap::new();
for (conn_id, peer_conn) in peer_connections {
if let Phase::Loading(loading) = &self.phase {
out.pending_sync_messages = loading.pending_msg_count();
continue;
}
let generated = match &mut self.phase {
Phase::Ready(ready) => ready.generate_sync_message(now, &mut self.doc, peer_conn),
Phase::Requesting(request) => request.generate_message(now, &self.doc, peer_conn),
Phase::NotFound | Phase::Loading { .. } => None,
};
if let Some((msg, duration)) = generated {
let bytes = match &msg {
SyncMessage::Request { data } | SyncMessage::Sync { data } => data.len(),
SyncMessage::DocUnavailable => 0,
};
if bytes > 0 {
out.sync_message_stats.push(SyncMessageStat {
connection_id: *conn_id,
direction: SyncDirection::Generated,
bytes,
duration,
queue_duration: Duration::ZERO,
});
}
tracing::debug!(?conn_id, peer_id=?peer_conn.peer_id, ?msg, "sending sync msg");
result.entry(*conn_id).or_default().push(msg);
}
}
result
}
pub(crate) fn request_if_not_already_available(
&mut self,
out: &mut DocActorResult,
peer_connections: &mut HashMap<ConnectionId, PeerDocConnection>,
) {
if let Phase::NotFound = self.phase {
tracing::debug!("reloading document actor");
for peer_conn in peer_connections.values_mut() {
peer_conn.reset_sync_state();
}
self.handle_phase_transition(out, PhaseTransition::ToLoading);
}
}
pub(crate) fn is_ready(&self) -> bool {
matches!(self.phase, Phase::Ready(_))
}
pub(crate) fn document_mut(&mut self) -> &mut automerge::Automerge {
&mut self.doc
}
pub(crate) fn document(&self) -> &automerge::Automerge {
&self.doc
}
pub(crate) fn set_announce_policy(
&mut self,
out: &mut DocActorResult,
connection_id: ConnectionId,
policy: AnnouncePolicy,
) {
let transition = if let Phase::Requesting(request) = &mut self.phase {
request.announce_policy_changed(connection_id, policy);
self.check_request_completion()
} else {
PhaseTransition::None
};
self.handle_phase_transition(out, transition);
}
pub(crate) fn set_any_dialer_connecting(&mut self, out: &mut DocActorResult, value: bool) {
self.any_dialer_connecting = value;
let transition = self.check_request_completion();
self.handle_phase_transition(out, transition);
}
}