use std::collections::HashMap;
use crate::{
ConnectionId, DialerId, DocSearch, DocumentActorId, DocumentId, ListenerId, PeerId, StorageId,
UnixTimestamp,
actors::{
document::SpawnArgs,
hub::{
Command, HubEvent, HubEventPayload, HubInput, HubResults,
connection::{ConnectionArgs, ReceiveEvent},
dialer::{ConnectionLostOutcome, DialerState},
io::{HubIoAction, HubIoResult},
listener::ListenerState,
},
messages::{Broadcast, DocMessage, DocToHubMsgPayload, HubToDocMsgPayload},
},
doc_search::DocSearchPhase,
ephemera::{EphemeralMessage, EphemeralSession, OutgoingSessionDetails},
network::{
ConnDirection, ConnectionEvent, ConnectionInfo, ConnectionOwner, ConnectionState,
DialRequest, DialerEvent, PeerDocState, PeerInfo, PeerMetadata,
wire_protocol::{WireMessage, WireMessageBuilder},
},
};
mod actor_info;
pub(crate) use actor_info::ActorInfo;
use automerge::Automerge;
use super::{CommandId, CommandResult, RunState, connection::Connection};
mod pending_commands;
mod searches;
use searches::Searches;
pub(crate) struct State {
pub(crate) storage_id: StorageId,
pub(crate) peer_id: PeerId,
actors: HashMap<DocumentActorId, ActorInfo>,
connections: HashMap<ConnectionId, Connection>,
document_to_actor: HashMap<DocumentId, DocumentActorId>,
pending_commands: pending_commands::PendingCommands,
ephemeral_session: EphemeralSession,
run_state: RunState,
dialers: HashMap<DialerId, DialerState>,
listeners: HashMap<ListenerId, ListenerState>,
searches: Searches,
}
impl State {
pub(crate) fn new(
storage_id: StorageId,
peer_id: PeerId,
ephemeral_session: EphemeralSession,
) -> Self {
Self {
storage_id,
peer_id,
actors: HashMap::new(),
connections: HashMap::new(),
document_to_actor: HashMap::new(),
pending_commands: pending_commands::PendingCommands::new(),
ephemeral_session,
run_state: RunState::Running,
dialers: HashMap::new(),
listeners: HashMap::new(),
searches: Searches::new(),
}
}
pub(crate) fn storage_id(&self) -> StorageId {
self.storage_id.clone()
}
pub(crate) fn find_listener_for_url(&self, url: &url::Url) -> Option<ListenerId> {
self.listeners
.iter()
.find(|(_, l)| l.url == *url)
.map(|(id, _)| *id)
}
pub(crate) fn dialer_attempt(&self, dialer_id: DialerId) -> Option<u32> {
self.dialers.get(&dialer_id).map(|d| d.attempts)
}
pub(crate) fn pending_dialer_urls(&self) -> Vec<url::Url> {
use super::dialer::DialerStatus;
self.dialers
.values()
.filter(|d| match d.status {
DialerStatus::Connected { connection_id } => self
.connections
.get(&connection_id)
.map(|c| c.remote_peer_id().is_none())
.unwrap_or(false),
DialerStatus::NeedTransport => true,
DialerStatus::TransportPending => true,
DialerStatus::WaitingToRetry { .. } => false,
DialerStatus::Failed => false,
})
.map(|d| d.url.clone())
.collect()
}
pub(crate) fn add_connection(
&mut self,
connection_id: ConnectionId,
connection_state: Connection,
) {
self.connections.insert(connection_id, connection_state);
}
fn remove_connection<'a, A: Into<RemoveConnArgs<'a>>>(
&mut self,
results: &mut HubResults,
args: A,
) -> Option<Connection> {
let RemoveConnArgs {
connection_id,
notify_doc_actors,
} = args.into();
let conn = self.connections.remove(connection_id)?;
let msg = match conn.owner() {
ConnectionOwner::Dialer(dialer_id) => {
format!("Dialer {:?} connection removed", dialer_id)
}
ConnectionOwner::Listener(listener_id) => {
format!("Listener {:?} connection removed", listener_id)
}
};
results.emit_disconnect_event(*connection_id, conn.owner(), msg);
results.emit_io_action(HubIoAction::Disconnect {
connection_id: *connection_id,
});
if notify_doc_actors {
self.notify_doc_actors_of_removed_connection(results, *connection_id);
}
Some(conn)
}
pub(crate) fn add_document_to_connection(
&mut self,
connection_id: &ConnectionId,
document_id: DocumentId,
) {
if let Some(connection) = self.connections.get_mut(connection_id) {
connection.add_document(document_id);
}
}
pub(crate) fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub(crate) fn connections(&self) -> Vec<ConnectionInfo> {
self.connections
.iter()
.map(|(conn_id, conn)| {
let (doc_connections, state) =
if let Some(established) = conn.established_connection() {
(
established.document_subscriptions().clone(),
ConnectionState::Connected {
their_peer_id: established.remote_peer_id().clone(),
},
)
} else {
(HashMap::new(), ConnectionState::Handshaking)
};
ConnectionInfo {
id: *conn_id,
last_received: conn.last_received(),
last_sent: conn.last_sent(),
docs: doc_connections,
state,
}
})
.collect()
}
pub(crate) fn established_peers(&self) -> Vec<(ConnectionId, PeerId)> {
self.connections
.iter()
.filter_map(|(connection_id, connection_state)| {
connection_state
.remote_peer_id()
.map(|remote| (*connection_id, remote.clone()))
})
.collect()
}
pub(crate) fn established_connection(
&mut self,
conn_id: ConnectionId,
) -> Option<(&mut Connection, PeerId)> {
let conn = self.connections.get_mut(&conn_id)?;
if let Some(peer_id) = conn.remote_peer_id().cloned() {
Some((conn, peer_id))
} else {
None
}
}
pub(crate) fn is_connected_to(&self, peer_id: &PeerId) -> bool {
self.connections.values().any(|connection_state| {
connection_state
.established_connection()
.map(|established| established.remote_peer_id() == peer_id)
.unwrap_or(false)
})
}
pub(crate) fn add_document_actor(
&mut self,
actor_id: DocumentActorId,
document_id: DocumentId,
) {
let handle = ActorInfo::new_with_id(actor_id, document_id.clone());
self.actors.insert(actor_id, handle);
self.document_to_actor.insert(document_id, actor_id);
}
pub(crate) fn find_actor_for_document(&self, document_id: &DocumentId) -> Option<&ActorInfo> {
self.document_to_actor
.get(document_id)
.and_then(|actor_id| self.actors.get(actor_id))
}
pub(crate) fn find_document_for_actor(&self, actor_id: &DocumentActorId) -> Option<DocumentId> {
self.actors
.get(actor_id)
.map(|actor| actor.document_id.clone())
}
pub(crate) fn pop_completed_commands(&mut self) -> Vec<(CommandId, CommandResult)> {
self.pending_commands.pop_completed_commands()
}
pub(crate) fn document_actors(&self) -> impl Iterator<Item = &ActorInfo> {
self.actors.values()
}
pub(crate) fn update_document_search_phase(
&mut self,
actor_id: DocumentActorId,
new_phase: DocSearchPhase,
) {
let Some(actor_info) = self.actors.get_mut(&actor_id) else {
tracing::warn!("document actor ID not found in actors: {:?}", actor_id);
return;
};
actor_info.search_phase = new_phase.clone();
let doc_id = actor_info.document_id.clone();
if new_phase == DocSearchPhase::Ready {
self.pending_commands
.resolve_pending_create(actor_id, &doc_id);
}
}
pub(crate) fn ensure_connections(&mut self) -> Vec<(DocumentActorId, ConnectionId, PeerId)> {
let mut to_connect = Vec::new();
for (conn_id, conn) in &mut self.connections {
if let Some(established) = conn.established_connection_mut() {
for (doc_id, doc_actor) in &self.document_to_actor {
if !established.document_subscriptions().contains_key(doc_id) {
to_connect.push((
established.remote_peer_id().clone(),
*conn_id,
doc_id.clone(),
doc_actor,
));
established.add_document_subscription(doc_id.clone());
}
}
}
}
let mut result = Vec::new();
for (peer_id, conn_id, doc_id, actor) in to_connect {
let conn = self.connections.get_mut(&conn_id).unwrap();
conn.add_document(doc_id);
result.push((*actor, conn_id, peer_id));
}
result
}
pub(crate) fn update_peer_states(
&mut self,
actor: DocumentActorId,
new_states: HashMap<ConnectionId, PeerDocState>,
) {
let Some(actor) = self.actors.get(&actor) else {
tracing::warn!(
?actor,
"document actor ID not found in actors when updating peer states"
);
return;
};
for (conn, new_state) in new_states {
if let Some(connection) = self.connections.get_mut(&conn) {
connection.update_peer_state(&actor.document_id, new_state);
} else {
tracing::debug!(?conn, "connection not found when updating peer states");
}
}
}
pub(crate) fn pop_closed_connections(&mut self) -> Vec<ConnectionId> {
let closed: Vec<_> = self
.connections
.iter()
.filter_map(|(id, conn)| if conn.is_closed() { Some(*id) } else { None })
.collect();
for id in &closed {
self.connections.remove(id);
}
closed
}
pub(crate) fn pop_new_connection_info(&mut self) -> HashMap<ConnectionId, ConnectionInfo> {
self.connections
.iter_mut()
.filter_map(|(conn_id, conn)| conn.pop_new_info().map(|info| (*conn_id, info)))
.collect()
}
pub(crate) fn next_ephemeral_msg_details(&mut self) -> OutgoingSessionDetails {
self.ephemeral_session.next_message_session_details()
}
pub(crate) fn get_local_metadata(&self) -> PeerMetadata {
PeerMetadata {
is_ephemeral: false,
storage_id: Some(self.storage_id.clone()),
}
}
pub(crate) fn run_state(&self) -> RunState {
self.run_state
}
pub(crate) fn handle_event<R: rand::Rng>(
&mut self,
rng: &mut R,
now: UnixTimestamp,
event: HubEvent,
results: &mut HubResults,
) {
if self.run_state == RunState::Stopped {
tracing::warn!("ignoring event on stopped hub");
results.stopped = true;
return;
}
let event_type = event.event_type_for_metrics();
match event.payload {
HubEventPayload::IoComplete(io_completion) => {
match io_completion.payload {
HubIoResult::Send | HubIoResult::Disconnect => {
}
}
}
HubEventPayload::Input(input) => {
match input {
HubInput::Stop => {
if self.run_state == RunState::Running {
tracing::info!("stopping hub event loop");
self.run_state = RunState::Stopping;
for actor_info in self.document_actors() {
results.send_to_doc_actor(
actor_info.actor_id,
HubToDocMsgPayload::Terminate,
);
}
self.close_all_network_state(results);
}
}
HubInput::Command {
command_id,
command,
} => {
if self.run_state == RunState::Running
&& let Some(result) =
self.handle_command(rng, now, results, command_id, *command)
{
results.completed_commands.insert(command_id, result);
}
}
HubInput::Tick => {
self.handle_tick(rng, now, results);
}
HubInput::ActorMessage { actor_id, message } => match message {
DocToHubMsgPayload::SendSyncMessage {
document_id,
connection_id,
message,
} => {
let sender_id = self.peer_id.clone();
if let Some((conn, target_id)) =
self.established_connection(connection_id)
{
let wire_message = WireMessageBuilder {
sender_id,
target_id,
document_id,
}
.from_sync_message(message);
results.send(conn, wire_message.encode());
} else {
tracing::warn!(
?connection_id,
"received SendSyncMessage for unknown connection ID"
);
}
}
DocToHubMsgPayload::PeerStatesChanged(new_states) => {
self.update_peer_states(actor_id, new_states);
}
DocToHubMsgPayload::DocSearchChanged(new_phase) => {
self.update_document_search_phase(actor_id, new_phase);
}
DocToHubMsgPayload::Broadcast { connections, msg } => {
self.broadcast(results, actor_id, connections, msg);
}
DocToHubMsgPayload::Terminated => {
tracing::debug!(?actor_id, "document actor terminated");
self.actors.remove(&actor_id);
}
},
HubInput::ConnectionLost { connection_id } => {
self.handle_connection_lost(rng, now, results, connection_id);
}
HubInput::AddDialer { command_id, config } => {
let result = self.handle_add_dialer(results, config);
results.completed_commands.insert(command_id, result);
}
HubInput::AddListener { command_id, config } => {
let result = self.handle_add_listener(config);
results.completed_commands.insert(command_id, result);
}
HubInput::CreateDialerConnection {
command_id,
dialer_id,
} => {
let result = self.handle_create_dialer_connection(now, results, dialer_id);
results.completed_commands.insert(command_id, result);
}
HubInput::CreateListenerConnection {
command_id,
listener_id,
} => {
let result =
self.handle_create_listener_connection(now, results, listener_id);
results.completed_commands.insert(command_id, result);
}
HubInput::DialFailed { dialer_id, error } => {
self.handle_dial_failed(rng, now, results, dialer_id, &error);
}
HubInput::RemoveDialer { dialer_id } => {
self.handle_remove_dialer(results, dialer_id);
}
HubInput::RemoveListener { listener_id } => {
self.handle_remove_listener(results, listener_id);
}
}
}
}
for conn_id in self.pop_closed_connections() {
for doc in self.document_actors() {
results.send_to_doc_actor(
doc.actor_id,
HubToDocMsgPayload::ConnectionClosed {
connection_id: conn_id,
},
);
}
}
if self.run_state == RunState::Running {
for (actor_id, conn_id, peer_id) in self.ensure_connections() {
results.send_to_doc_actor(
actor_id,
HubToDocMsgPayload::NewConnection {
connection_id: conn_id,
peer_id,
},
);
}
}
let conns = self.connections.len();
let docs = self.document_to_actor.len();
tracing::trace!(
event_type,
connections = conns,
documents = docs,
"hub event processed"
);
for (conn_id, new_state) in self.pop_new_connection_info() {
if let Some(conn) = self.connections.get(&conn_id) {
let owner = conn.owner();
results.emit_connection_event(ConnectionEvent::StateChanged {
connection_id: conn_id,
owner,
new_state,
});
}
}
for (command_id, result) in self.pop_completed_commands() {
results.completed_commands.insert(command_id, result);
}
self.emit_search_state_updates(results);
if self.run_state == RunState::Stopping {
if self.actors.is_empty() {
tracing::debug!("hub stopped");
self.run_state = RunState::Stopped;
} else {
tracing::debug!(remaining_actors = self.actors.len(), "hub still stopping");
}
}
results.stopped = self.run_state == RunState::Stopped;
results.event_type = event_type;
results.connections_count = conns;
results.documents_count = docs;
}
fn handle_command<R: rand::Rng>(
&mut self,
rng: &mut R,
now: UnixTimestamp,
out: &mut HubResults,
command_id: CommandId,
command: Command,
) -> Option<CommandResult> {
match command {
Command::Receive { connection_id, msg } => {
Some(self.handle_receive(now, out, connection_id, msg))
}
Command::ActorReady { document_id: _ } => Some(CommandResult::ActorReady),
Command::CreateDocument { content } => {
self.handle_create_document(rng, out, command_id, *content);
None
}
Command::SearchForDoc { document_id } => {
self.handle_search_for_doc(out, command_id, document_id)
}
}
}
fn handle_receive(
&mut self,
now: UnixTimestamp,
out: &mut HubResults,
connection_id: ConnectionId,
msg: Vec<u8>,
) -> CommandResult {
tracing::trace!(?connection_id, msg_bytes = msg.len(), "receive");
let Some(conn) = self.connections.get_mut(&connection_id) else {
tracing::warn!(?connection_id, "receive command for nonexistent connection");
return CommandResult::Receive {
connection_id,
error: Some("Connection not found".to_string()),
};
};
let msg = match WireMessage::decode(&msg) {
Ok(msg) => msg,
Err(e) => {
tracing::warn!(
?connection_id,
err=?e,
"failed to decode message: {}",
e
);
let error_msg = format!("Message decode error: {e}");
if let Some(conn) = self.connections.get(&connection_id) {
tracing::debug!(error=?error_msg, remote_peer_id=?conn.remote_peer_id(), "failing connection");
self.remove_connection(out, &connection_id);
}
return CommandResult::Receive {
connection_id,
error: Some(format!("Decode error: {e}")),
};
}
};
for evt in conn.receive_msg(out, now, msg) {
match evt {
ReceiveEvent::HandshakeComplete { remote_peer_id } => {
tracing::debug!(?connection_id, ?remote_peer_id, "handshake completed");
self.reset_dialer_backoff_for_connection(connection_id);
let peer_info = PeerInfo {
peer_id: remote_peer_id.clone(),
metadata: Some(self.get_local_metadata()),
protocol_version: "1".to_string(),
};
let owner = self
.connections
.get(&connection_id)
.expect("connection must exist during receive handling")
.owner();
out.emit_connection_event(ConnectionEvent::HandshakeCompleted {
connection_id,
owner,
peer_info: peer_info.clone(),
})
}
ReceiveEvent::SyncMessage {
doc_id,
sender_id: _,
target_id,
msg,
} => self.handle_doc_message(
now,
out,
connection_id,
target_id,
doc_id,
DocMessage::Sync(msg),
),
ReceiveEvent::EphemeralMessage {
doc_id,
sender_id,
target_id,
count,
session_id,
msg,
} => {
let msg = EphemeralMessage {
sender_id,
session_id,
count,
data: msg,
};
if let Some(msg) = self.ephemeral_session.receive_message(msg) {
self.handle_doc_message(
now,
out,
connection_id,
target_id,
doc_id,
DocMessage::Ephemeral(msg),
)
}
}
}
}
CommandResult::Receive {
connection_id,
error: None,
}
}
fn handle_doc_message(
&mut self,
now: UnixTimestamp,
out: &mut HubResults,
connection_id: ConnectionId,
target_id: PeerId,
doc_id: DocumentId,
msg: DocMessage,
) {
if target_id != self.peer_id {
tracing::trace!(?connection_id, ?msg, "ignoring message for another peer");
}
if let Some(existing_actor) = self.find_actor_for_document(&doc_id) {
out.send_to_doc_actor(
existing_actor.actor_id,
HubToDocMsgPayload::HandleDocMessage {
connection_id,
message: msg,
received_at: now,
},
);
} else {
self.spawn_actor(out, doc_id, None, Some((connection_id, msg)));
}
}
#[tracing::instrument(skip(self, init_doc, rng), fields(command_id = %command_id))]
fn handle_create_document<R: rand::Rng>(
&mut self,
rng: &mut R,
out: &mut HubResults,
command_id: CommandId,
init_doc: Automerge,
) {
let document_id = DocumentId::new(rng);
tracing::debug!(%document_id, "creating new document");
let actor_id = self.spawn_actor(out, document_id, Some(init_doc), None);
self.searches.add_search(actor_id, DocSearchPhase::Ready);
self.pending_commands
.add_pending_create_command(actor_id, command_id);
}
#[tracing::instrument(skip(self, out), fields(document_id = %document_id))]
fn handle_search_for_doc(
&mut self,
out: &mut HubResults,
command_id: CommandId,
document_id: DocumentId,
) -> Option<CommandResult> {
tracing::debug!("search document command received");
let actor_id = if let Some(existing) = self.find_actor_for_document(&document_id) {
tracing::trace!(actor_id=%existing.actor_id, status=?existing.search_phase, "found existing actor for document");
existing.actor_id
} else {
tracing::trace!("no existing actor found for document, spawning new actor");
self.spawn_actor(out, document_id, None, None)
};
let phase = self
.actors
.get(&actor_id)
.map(|info| info.search_phase.clone())
.unwrap_or(DocSearchPhase::Loading);
let search_state = DocSearch {
phase,
pending_connections: self.pending_dialer_urls(),
};
Some(CommandResult::SearchForDoc {
actor_id,
search_state,
})
}
fn notify_doc_actors_of_removed_connection(
&mut self,
out: &mut HubResults,
connection_id: crate::ConnectionId,
) {
for actor_info in self.document_actors() {
out.send_to_doc_actor(
actor_info.actor_id,
HubToDocMsgPayload::ConnectionClosed { connection_id },
);
}
}
fn close_all_network_state(&mut self, results: &mut HubResults) {
let conn_ids: Vec<_> = self.connections.keys().copied().collect();
for conn_id in conn_ids {
self.remove_connection(
results,
RemoveConnArgs {
connection_id: &conn_id,
notify_doc_actors: false,
},
);
}
let dialer_ids: Vec<_> = self.dialers.keys().copied().collect();
for dialer_id in dialer_ids {
self.dialers.remove(&dialer_id);
tracing::debug!(?dialer_id, "dialer removed during shutdown");
}
let listener_ids: Vec<_> = self.listeners.keys().copied().collect();
for listener_id in listener_ids {
self.listeners.remove(&listener_id);
tracing::debug!(?listener_id, "listener removed during shutdown");
}
}
fn spawn_actor(
&mut self,
out: &mut HubResults,
document_id: DocumentId,
initial_doc: Option<Automerge>,
from_sync_msg: Option<(ConnectionId, DocMessage)>,
) -> DocumentActorId {
let actor_id = DocumentActorId::new();
self.add_document_actor(actor_id, document_id.clone());
let mut initial_connections: HashMap<ConnectionId, (PeerId, Option<DocMessage>)> = self
.established_peers()
.iter()
.map(|(c, p)| (*c, (p.clone(), None)))
.collect();
for conn in initial_connections.keys() {
self.add_document_to_connection(conn, document_id.clone());
}
if let Some((conn_id, msg)) = from_sync_msg
&& let Some((_, sync_msg)) = initial_connections.get_mut(&conn_id)
{
*sync_msg = Some(msg);
}
out.emit_spawn_actor(SpawnArgs {
actor_id,
local_peer_id: self.peer_id.clone(),
document_id,
initial_content: initial_doc,
initial_connections,
});
actor_id
}
fn broadcast(
&mut self,
out: &mut HubResults,
from_actor: DocumentActorId,
to_connections: Vec<ConnectionId>,
msg: Broadcast,
) {
let Some(doc_id) = self.find_document_for_actor(&from_actor) else {
tracing::warn!(
?from_actor,
"attempting to broadcast from an actor that does not exist"
);
return;
};
let OutgoingSessionDetails {
counter,
session_id,
} = self.next_ephemeral_msg_details();
for conn_id in to_connections {
let Some(conn) = self.connections.get_mut(&conn_id) else {
continue;
};
let Some(their_peer_id) = conn
.established_connection()
.map(|c| c.remote_peer_id().clone())
else {
continue;
};
let msg = match &msg {
Broadcast::New { msg } => WireMessage::Ephemeral {
sender_id: self.peer_id.clone(),
target_id: their_peer_id,
count: counter,
session_id: session_id.to_string(),
document_id: doc_id.clone(),
data: msg.clone(),
},
Broadcast::Gossip {
msg:
EphemeralMessage {
sender_id,
session_id,
count,
data,
},
} => WireMessage::Ephemeral {
sender_id: sender_id.clone(),
target_id: their_peer_id,
count: *count,
session_id: session_id.to_string(),
document_id: doc_id.clone(),
data: data.clone(),
},
};
out.send(conn, msg.encode());
}
}
fn handle_add_dialer(
&mut self,
out: &mut HubResults,
config: crate::network::DialerConfig,
) -> CommandResult {
let dialer_id = DialerId::new();
let url = config.url.clone();
let mut dialer = DialerState::new(dialer_id, config.url, config.backoff);
dialer.mark_transport_pending();
out.emit_dial_request(DialRequest {
dialer_id,
url: url.clone(),
});
tracing::debug!(
?dialer_id,
%url,
"dialer registered"
);
self.dialers.insert(dialer_id, dialer);
CommandResult::AddDialer { dialer_id }
}
fn handle_add_listener(&mut self, config: crate::network::ListenerConfig) -> CommandResult {
let listener_id = ListenerId::new();
let url = config.url.clone();
let listener = ListenerState::new(listener_id, config.url);
tracing::debug!(
?listener_id,
%url,
"listener registered"
);
self.listeners.insert(listener_id, listener);
CommandResult::AddListener { listener_id }
}
fn handle_connection_lost<R: rand::Rng>(
&mut self,
rng: &mut R,
now: UnixTimestamp,
results: &mut HubResults,
connection_id: ConnectionId,
) {
let Some(connection) = self.remove_connection(results, &connection_id) else {
return;
};
match connection.owner() {
ConnectionOwner::Dialer(dialer_id) => {
if let Some(dialer) = self.dialers.get_mut(&dialer_id) {
let url = dialer.url.clone();
match dialer.handle_connection_lost(rng, now, connection_id) {
ConnectionLostOutcome::WillRetry { retry_at } => {
tracing::debug!(
?dialer_id,
%url,
?retry_at,
"dialer will retry"
);
}
ConnectionLostOutcome::MaxRetriesReached => {
tracing::warn!(
?dialer_id,
%url,
"dialer max retries reached"
);
results.emit_connector_event(DialerEvent::MaxRetriesReached {
dialer_id,
url,
});
}
ConnectionLostOutcome::NotOurs => {}
}
}
}
ConnectionOwner::Listener(listener_id) => {
if let Some(listener) = self.listeners.get_mut(&listener_id) {
listener.remove_connection(&connection_id);
tracing::debug!(
?listener_id,
%listener.url,
?connection_id,
"connection removed from listener"
);
}
}
}
}
fn handle_create_dialer_connection(
&mut self,
now: UnixTimestamp,
out: &mut HubResults,
dialer_id: DialerId,
) -> CommandResult {
let dialer_exists = self.dialers.contains_key(&dialer_id);
if !dialer_exists {
tracing::warn!(?dialer_id, "create_dialer_connection for unknown dialer");
}
let owner = ConnectionOwner::Dialer(dialer_id);
let local_metadata = self.get_local_metadata();
let conn = Connection::new_handshaking(
out,
ConnectionArgs {
direction: ConnDirection::Outgoing,
owner,
local_peer_id: self.peer_id.clone(),
local_metadata: Some(local_metadata),
created_at: now,
},
);
let connection_id = conn.id();
if let Some(dialer) = self.dialers.get_mut(&dialer_id)
&& !dialer.set_connected(connection_id)
{
tracing::warn!(
?dialer_id,
"create_dialer_connection called but dialer not in TransportPending state"
);
}
self.add_connection(connection_id, conn);
tracing::debug!(?dialer_id, ?connection_id, "dialer connection created");
out.emit_connection_event(ConnectionEvent::StateChanged {
connection_id,
owner,
new_state: self.connections.get(&connection_id).unwrap().info(),
});
CommandResult::CreateConnection { connection_id }
}
fn handle_create_listener_connection(
&mut self,
now: UnixTimestamp,
out: &mut HubResults,
listener_id: ListenerId,
) -> CommandResult {
let listener_exists = self.listeners.contains_key(&listener_id);
if !listener_exists {
tracing::warn!(
?listener_id,
"create_listener_connection for unknown listener"
);
}
let owner = ConnectionOwner::Listener(listener_id);
let local_metadata = self.get_local_metadata();
let conn = Connection::new_handshaking(
out,
ConnectionArgs {
direction: ConnDirection::Incoming,
owner,
local_peer_id: self.peer_id.clone(),
local_metadata: Some(local_metadata),
created_at: now,
},
);
let connection_id = conn.id();
if let Some(listener) = self.listeners.get_mut(&listener_id) {
listener.add_connection(connection_id);
}
self.add_connection(connection_id, conn);
tracing::debug!(?listener_id, ?connection_id, "listener connection created");
out.emit_connection_event(ConnectionEvent::StateChanged {
connection_id,
owner,
new_state: self.connections.get(&connection_id).unwrap().info(),
});
CommandResult::CreateConnection { connection_id }
}
fn handle_dial_failed<R: rand::Rng>(
&mut self,
rng: &mut R,
now: UnixTimestamp,
results: &mut HubResults,
dialer_id: DialerId,
error: &str,
) {
let Some(dialer) = self.dialers.get_mut(&dialer_id) else {
tracing::warn!(
?dialer_id,
%error,
"dial_failed for unknown dialer"
);
return;
};
let url = dialer.url.clone();
tracing::warn!(
?dialer_id,
%url,
%error,
"dial failed"
);
match dialer.handle_dial_failed(rng, now) {
ConnectionLostOutcome::WillRetry { retry_at } => {
tracing::debug!(
?dialer_id,
%url,
?retry_at,
"dialer will retry after dial failure"
);
}
ConnectionLostOutcome::MaxRetriesReached => {
tracing::warn!(
?dialer_id,
%url,
"dialer max retries reached after dial failure"
);
results.emit_connector_event(DialerEvent::MaxRetriesReached { dialer_id, url });
}
ConnectionLostOutcome::NotOurs => {}
}
}
fn handle_remove_dialer(&mut self, results: &mut HubResults, dialer_id: DialerId) {
let Some(dialer) = self.dialers.remove(&dialer_id) else {
tracing::warn!(?dialer_id, "remove_dialer for unknown dialer");
return;
};
tracing::debug!(
?dialer_id,
%dialer.url,
"removing dialer"
);
if let Some(conn_id) = dialer.active_connection() {
self.remove_connection(results, &conn_id);
}
}
fn handle_remove_listener(&mut self, results: &mut HubResults, listener_id: ListenerId) {
let Some(listener) = self.listeners.remove(&listener_id) else {
tracing::warn!(?listener_id, "remove_listener for unknown listener");
return;
};
tracing::debug!(
?listener_id,
%listener.url,
"removing listener"
);
for conn_id in listener.active_connections.iter() {
self.remove_connection(results, conn_id);
}
}
fn handle_tick(
&mut self,
_rng: &mut impl rand::Rng,
now: UnixTimestamp,
results: &mut HubResults,
) {
let mut need_dial = Vec::new();
for (dialer_id, dialer) in &mut self.dialers {
if dialer.check_retry(now) {
need_dial.push((*dialer_id, dialer.url.clone()));
}
}
for (dialer_id, url) in need_dial {
if let Some(dialer) = self.dialers.get_mut(&dialer_id) {
dialer.mark_transport_pending();
tracing::debug!(
?dialer_id,
%url,
"retry timer expired, requesting dial"
);
results.emit_dial_request(DialRequest { dialer_id, url });
}
}
}
fn reset_dialer_backoff_for_connection(&mut self, connection_id: ConnectionId) {
let Some(conn) = self.connections.get(&connection_id) else {
return;
};
let ConnectionOwner::Dialer(dialer_id) = conn.owner() else {
return;
};
if let Some(dialer) = self.dialers.get_mut(&dialer_id) {
dialer.reset_backoff();
}
}
fn emit_search_state_updates(&mut self, results: &mut HubResults) {
let updates = self.searches.pop_state_updates(&self.actors, &self.dialers);
if updates.is_empty() {
return;
}
let pending_dialer_urls = self.pending_dialer_urls();
updates.into_iter().for_each(|(actor_id, new_phase)| {
let Some(actor) = self.actors.get(&actor_id) else {
return;
};
results.search_state_updates.push((
actor.document_id.clone(),
DocSearch {
phase: new_phase,
pending_connections: pending_dialer_urls.clone(),
},
));
});
}
}
struct RemoveConnArgs<'a> {
connection_id: &'a ConnectionId,
notify_doc_actors: bool,
}
impl<'a> From<&'a ConnectionId> for RemoveConnArgs<'a> {
fn from(connection_id: &'a ConnectionId) -> Self {
Self {
connection_id,
notify_doc_actors: true,
}
}
}