use std::{
collections::{HashMap, HashSet},
future::Future,
sync::{Arc, Weak},
time::Duration,
};
use bson::oid::ObjectId;
use futures_util::{
stream::{FuturesUnordered, StreamExt},
FutureExt,
};
use serde::Serialize;
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
watch::{self, Ref},
};
use crate::{
client::options::{ClientOptions, ServerAddress},
cmap::{
conn::ConnectionGeneration,
establish::{ConnectionEstablisher, EstablisherOptions},
Command,
Connection,
PoolGeneration,
},
error::{load_balanced_mode_mismatch, Error, Result},
event::sdam::{
handle_sdam_event,
SdamEvent,
ServerClosedEvent,
ServerDescriptionChangedEvent,
ServerOpeningEvent,
TopologyClosedEvent,
TopologyDescriptionChangedEvent,
TopologyOpeningEvent,
},
runtime::{self, AcknowledgedMessage, WorkerHandle, WorkerHandleListener},
selection_criteria::SelectionCriteria,
ClusterTime,
ServerInfo,
ServerType,
TopologyType,
};
#[cfg(feature = "tracing-unstable")]
use crate::trace::topology::TopologyTracingEventEmitter;
use super::{
monitor::{MonitorManager, MonitorRequestReceiver},
srv_polling::SrvPollingMonitor,
Monitor,
Server,
ServerDescription,
TopologyDescription,
TransactionSupportStatus,
};
#[derive(Debug)]
pub(crate) struct Topology {
#[cfg(any(feature = "tracing-unstable", test))]
pub(crate) id: ObjectId,
watcher: TopologyWatcher,
updater: TopologyUpdater,
_worker_handle: WorkerHandle,
}
impl Topology {
pub(crate) fn new(options: ClientOptions) -> Result<Topology> {
let description = TopologyDescription::default();
let id = ObjectId::new();
let event_emitter =
if options.sdam_event_handler.is_some() || cfg!(feature = "tracing-unstable") {
let user_handler = options.sdam_event_handler.clone();
#[cfg(feature = "tracing-unstable")]
let tracing_emitter =
TopologyTracingEventEmitter::new(options.tracing_max_document_length_bytes, id);
let (tx, mut rx) = mpsc::unbounded_channel::<AcknowledgedMessage<SdamEvent>>();
runtime::execute(async move {
while let Some(event) = rx.recv().await {
let (event, ack) = event.into_parts();
if let Some(ref user_handler) = user_handler {
#[cfg(feature = "tracing-unstable")]
handle_sdam_event(user_handler.as_ref(), event.clone());
#[cfg(not(feature = "tracing-unstable"))]
handle_sdam_event(user_handler.as_ref(), event);
}
#[cfg(feature = "tracing-unstable")]
handle_sdam_event(&tracing_emitter, event);
ack.acknowledge(());
}
});
Some(SdamEventEmitter { sender: tx })
} else {
None
};
let (updater, update_receiver) = TopologyUpdater::channel();
let (worker_handle, handle_listener) = WorkerHandleListener::channel();
let state = TopologyState {
description: description.clone(),
servers: Default::default(),
};
let (watcher, publisher) = TopologyWatcher::channel(state);
let connection_establisher =
ConnectionEstablisher::new(EstablisherOptions::from_client_options(&options))?;
let worker = TopologyWorker {
id,
topology_description: description,
servers: Default::default(),
update_receiver,
publisher,
options,
topology_watcher: watcher.clone(),
topology_updater: updater.clone(),
handle_listener,
event_emitter,
connection_establisher,
};
worker.start();
Ok(Topology {
#[cfg(any(feature = "tracing-unstable", test))]
id,
watcher,
updater,
_worker_handle: worker_handle,
})
}
pub(crate) fn watch(&self) -> TopologyWatcher {
let mut watcher = self.watcher.clone();
watcher.receiver.borrow_and_update();
watcher
}
#[cfg(test)]
pub(crate) fn clone_updater(&self) -> TopologyUpdater {
self.updater.clone()
}
pub(crate) async fn handle_application_error(
&self,
address: ServerAddress,
error: Error,
phase: HandshakePhase,
) {
self.updater
.handle_application_error(address, error, phase)
.await;
}
pub(crate) fn cluster_time(&self) -> Option<ClusterTime> {
self.watcher
.peek_latest()
.description
.cluster_time()
.cloned()
}
pub(crate) async fn advance_cluster_time(&self, to: ClusterTime) {
self.updater.advance_cluster_time(to).await;
}
pub(crate) fn topology_type(&self) -> TopologyType {
self.watcher.peek_latest().description.topology_type
}
pub(crate) fn logical_session_timeout(&self) -> Option<Duration> {
self.watcher
.peek_latest()
.description
.logical_session_timeout
}
pub(crate) fn transaction_support_status(&self) -> TransactionSupportStatus {
self.watcher
.peek_latest()
.description
.transaction_support_status()
}
pub(crate) fn update_command_with_read_pref<T: Serialize>(
&self,
server_address: &ServerAddress,
command: &mut Command<T>,
criteria: Option<&SelectionCriteria>,
) {
self.watcher
.peek_latest()
.description
.update_command_with_read_pref(server_address, command, criteria)
}
pub(crate) async fn shutdown(&self) {
self.updater.shutdown().await;
}
pub(crate) async fn warm_pool(&self) {
self.updater.fill_pool().await;
}
#[cfg(test)]
pub(crate) fn server_addresses(&mut self) -> HashSet<ServerAddress> {
self.servers().into_keys().collect()
}
#[cfg(test)]
pub(crate) fn servers(&mut self) -> HashMap<ServerAddress, Arc<Server>> {
self.watcher.peek_latest().servers()
}
#[cfg(test)]
pub(crate) fn description(&self) -> TopologyDescription {
self.watcher.peek_latest().description.clone()
}
#[cfg(test)]
pub(crate) async fn sync_workers(&self) {
self.updater.sync_workers().await;
}
}
#[derive(Debug, Clone)]
pub(crate) struct TopologyState {
pub(crate) description: TopologyDescription,
servers: HashMap<ServerAddress, Weak<Server>>,
}
impl TopologyState {
pub(crate) fn servers(&self) -> HashMap<ServerAddress, Arc<Server>> {
let mut out = HashMap::new();
for (k, v) in self.servers.iter() {
if let Some(server) = v.upgrade() {
out.insert(k.clone(), server);
}
}
out
}
}
#[derive(Debug)]
pub(crate) enum UpdateMessage {
AdvanceClusterTime(ClusterTime),
ServerUpdate(Box<ServerDescription>),
SyncHosts(HashSet<ServerAddress>),
MonitorError {
address: ServerAddress,
error: Error,
},
ApplicationError {
address: ServerAddress,
error: Error,
phase: HandshakePhase,
},
Broadcast(BroadcastMessage),
}
#[derive(Debug, Clone)]
pub(crate) enum BroadcastMessage {
Shutdown,
FillPool,
#[cfg(test)]
SyncWorkers,
}
struct TopologyWorker {
id: ObjectId,
update_receiver: TopologyUpdateReceiver,
handle_listener: WorkerHandleListener,
publisher: TopologyPublisher,
servers: HashMap<ServerAddress, MonitoredServer>,
topology_description: TopologyDescription,
connection_establisher: ConnectionEstablisher,
event_emitter: Option<SdamEventEmitter>,
options: ClientOptions,
topology_watcher: TopologyWatcher,
topology_updater: TopologyUpdater,
}
impl TopologyWorker {
async fn initialize(&mut self) {
self.emit_event(|| {
SdamEvent::TopologyOpening(TopologyOpeningEvent {
topology_id: self.id,
})
});
let mut new_description = self.topology_description.clone();
new_description.initialize(&self.options);
self.update_topology(new_description).await;
if self.options.load_balanced == Some(true) {
let base = ServerDescription::new(self.options.hosts[0].clone());
self.update_server(ServerDescription {
server_type: ServerType::LoadBalancer,
average_round_trip_time: None,
..base
})
.await;
}
if self.monitoring_enabled() {
SrvPollingMonitor::start(
self.topology_updater.clone(),
self.topology_watcher.clone(),
self.options.clone(),
);
}
#[cfg(test)]
let _ = self.publisher.initialized_sender.send(true);
}
fn start(mut self) {
runtime::execute(async move {
self.initialize().await;
let mut shutdown_ack = None;
loop {
tokio::select! {
Some(update) = self.update_receiver.recv() => {
let (update, ack) = update.into_parts();
let mut ack = Some(ack);
let changed = match update {
UpdateMessage::AdvanceClusterTime(to) => {
self.advance_cluster_time(to);
true
}
UpdateMessage::SyncHosts(hosts) => {
self.sync_hosts(hosts).await
}
UpdateMessage::ServerUpdate(sd) => self.update_server(*sd).await,
UpdateMessage::MonitorError { address, error } => {
self.handle_monitor_error(address, error).await
}
UpdateMessage::ApplicationError {
address,
error,
phase,
} => self.handle_application_error(address, error, phase).await,
UpdateMessage::Broadcast(msg) => {
let rxen: FuturesUnordered<_> = self
.servers
.values()
.map(|v| v.pool.broadcast(msg.clone()))
.collect();
let _: Vec<_> = rxen.collect().await;
if matches!(msg, BroadcastMessage::Shutdown) {
shutdown_ack = ack.take();
break
}
false
}
};
if let Some(ack) = ack {
ack.acknowledge(changed);
}
},
_ = self.handle_listener.wait_for_all_handle_drops() => {
break
}
}
}
drop(self.publisher);
let mut close_futures = FuturesUnordered::new();
for (address, server) in self.servers.into_iter() {
if let Some(ref emitter) = self.event_emitter {
emitter
.emit(SdamEvent::ServerClosed(ServerClosedEvent {
address,
topology_id: self.id,
}))
.await;
}
drop(server.inner);
close_futures.push(server.monitor_manager.close_monitor());
}
while close_futures.next().await.is_some() {}
if let Some(emitter) = self.event_emitter {
if !self.topology_description.servers.is_empty()
&& self.options.load_balanced != Some(true)
{
let previous_description = self.topology_description;
let mut new_description = previous_description.clone();
new_description.servers.clear();
emitter
.emit(SdamEvent::TopologyDescriptionChanged(Box::new(
TopologyDescriptionChangedEvent {
topology_id: self.id,
previous_description: previous_description.into(),
new_description: new_description.into(),
},
)))
.await;
}
emitter
.emit(SdamEvent::TopologyClosed(TopologyClosedEvent {
topology_id: self.id,
}))
.await;
}
if let Some(ack) = shutdown_ack {
ack.acknowledge(true);
}
});
}
fn publish_state(&self) {
let servers = self
.servers
.iter()
.map(|(k, v)| (k.clone(), Arc::downgrade(&v.inner)))
.collect();
self.publisher.publish_new_state(TopologyState {
description: self.topology_description.clone(),
servers,
})
}
fn advance_cluster_time(&mut self, to: ClusterTime) {
self.topology_description.advance_cluster_time(&to);
self.publish_state()
}
async fn sync_hosts(&mut self, hosts: HashSet<ServerAddress>) -> bool {
let mut new_description = self.topology_description.clone();
new_description.sync_hosts(hosts);
self.update_topology(new_description).await
}
async fn update_server(&mut self, sd: ServerDescription) -> bool {
let mut new_description = self.topology_description.clone();
let _ = new_description.update(sd);
self.update_topology(new_description).await
}
async fn update_topology(&mut self, new_topology_description: TopologyDescription) -> bool {
let old_description =
std::mem::replace(&mut self.topology_description, new_topology_description);
let diff = old_description.diff(&self.topology_description);
let changed = diff.is_some();
if let Some(diff) = diff {
#[cfg(not(test))]
let changed_servers = diff.changed_servers;
#[cfg(test)]
let changed_servers = {
let mut servers = diff.changed_servers.into_iter().collect::<Vec<_>>();
servers.sort_by_key(|(addr, _)| match addr {
ServerAddress::Tcp { host, port } => (host, port),
#[cfg(unix)]
ServerAddress::Unix { .. } => unreachable!(),
});
servers
};
for (address, (previous_description, new_description)) in changed_servers {
if new_description.server_type.is_data_bearing()
|| (new_description.server_type != ServerType::Unknown
&& self.topology_description.topology_type() == TopologyType::Single)
{
if let Some(s) = self.servers.get(address) {
s.pool.mark_as_ready().await;
}
}
self.emit_event(|| {
SdamEvent::ServerDescriptionChanged(Box::new(ServerDescriptionChangedEvent {
address: address.clone(),
topology_id: self.id,
previous_description: ServerInfo::new_owned(previous_description.clone()),
new_description: ServerInfo::new_owned(new_description.clone()),
}))
});
}
#[cfg(not(test))]
let removed_addresses = diff.removed_addresses;
#[cfg(test)]
let removed_addresses = {
let mut addresses = diff.removed_addresses.into_iter().collect::<Vec<_>>();
addresses.sort_by_key(|addr| match addr {
ServerAddress::Tcp { host, port } => (host, port),
#[cfg(unix)]
ServerAddress::Unix { .. } => unreachable!(),
});
addresses
};
for address in removed_addresses {
let removed_server = self.servers.remove(address);
debug_assert!(
removed_server.is_some(),
"tried to remove non-existent address from topology: {}",
address
);
self.emit_event(|| {
SdamEvent::ServerClosed(ServerClosedEvent {
address: address.clone(),
topology_id: self.id,
})
});
}
self.emit_event(|| {
SdamEvent::TopologyDescriptionChanged(Box::new(TopologyDescriptionChangedEvent {
topology_id: self.id,
previous_description: old_description.clone().into(),
new_description: self.topology_description.clone().into(),
}))
});
#[cfg(not(test))]
let added_addresses = diff.added_addresses;
#[cfg(test)]
let added_addresses = {
let mut addresses = diff.added_addresses.into_iter().collect::<Vec<_>>();
addresses.sort_by_key(|addr| match addr {
ServerAddress::Tcp { host, port } => (host, port),
#[cfg(unix)]
ServerAddress::Unix { .. } => unreachable!(),
});
addresses
};
for address in added_addresses {
if self.servers.contains_key(address) {
debug_assert!(
false,
"adding address that already exists in topology: {}",
address
);
continue;
}
let (monitor_handle, listener) = WorkerHandleListener::channel();
let monitor_manager = MonitorManager::new(monitor_handle);
let monitor_request_receiver = MonitorRequestReceiver::new(
&monitor_manager,
self.topology_watcher.subscribe_to_topology_check_requests(),
listener,
);
let server = Server::new(
address.clone(),
self.options.clone(),
self.connection_establisher.clone(),
self.topology_updater.clone(),
self.id,
);
self.servers.insert(
address.clone(),
MonitoredServer {
inner: server,
monitor_manager,
},
);
if self.monitoring_enabled() {
Monitor::start(
address.clone(),
self.topology_updater.clone(),
self.topology_watcher.clone(),
self.event_emitter.clone(),
monitor_request_receiver,
self.options.clone(),
self.connection_establisher.clone(),
);
}
self.emit_event(|| {
SdamEvent::ServerOpening(ServerOpeningEvent {
address: address.clone(),
topology_id: self.id,
})
});
}
}
self.publish_state();
changed
}
async fn mark_server_as_unknown(&mut self, address: ServerAddress, error: Error) -> bool {
let description = ServerDescription::new_from_error(address, error);
self.update_server(description).await
}
pub(crate) async fn handle_application_error(
&mut self,
address: ServerAddress,
error: Error,
handshake: HandshakePhase,
) -> bool {
if error.is_incompatible_server() {
return false;
}
match self.server_description(&address) {
Some(sd) => {
if let Some(existing_tv) = sd.topology_version() {
if let Some(tv) = error.topology_version() {
if !tv.is_more_recent_than(existing_tv) {
return false;
}
}
}
}
None => return false,
}
let mut server = match self.server(&address) {
Some(s) => s,
None => return false,
};
match &handshake {
HandshakePhase::PreHello { generation } => {
match (generation, server.pool.generation()) {
(PoolGeneration::Normal(hgen), PoolGeneration::Normal(sgen)) => {
if *hgen < sgen {
return false;
}
}
(PoolGeneration::LoadBalanced(_), PoolGeneration::LoadBalanced(_)) => {
return false
}
_ => load_balanced_mode_mismatch!(false),
}
}
HandshakePhase::PostHello { generation }
| HandshakePhase::AfterCompletion { generation, .. } => {
if generation.is_stale(&server.pool.generation()) {
return false;
}
}
}
let is_load_balanced =
self.topology_description.topology_type() == TopologyType::LoadBalanced;
if error.is_state_change_error() {
let updated =
is_load_balanced || self.mark_server_as_unknown(address, error.clone()).await;
if updated && (error.is_shutting_down() || handshake.wire_version().unwrap_or(0) < 8) {
server.pool.clear(error, handshake.service_id()).await;
}
server.monitor_manager.request_immediate_check();
updated
} else if error.is_non_timeout_network_error()
|| (handshake.is_before_completion()
&& (error.is_auth_error()
|| error.is_network_timeout()
|| error.is_command_error()))
{
let updated = if is_load_balanced {
handshake.service_id().is_some()
} else {
self.mark_server_as_unknown(server.address.clone(), error.clone())
.await
};
if updated {
server
.pool
.clear(error.clone(), handshake.service_id())
.await;
if !error.is_auth_error() {
server.monitor_manager.cancel_in_progress_check(error);
}
}
updated
} else {
false
}
}
pub(crate) async fn handle_monitor_error(
&mut self,
address: ServerAddress,
error: Error,
) -> bool {
match self.server(&address) {
Some(server) => {
let updated = self.mark_server_as_unknown(address, error.clone()).await;
if updated {
server.pool.clear(error, None).await;
}
updated
}
None => false,
}
}
fn server(&self, address: &ServerAddress) -> Option<MonitoredServer> {
self.servers.get(address).cloned()
}
fn server_description(&self, address: &ServerAddress) -> Option<ServerDescription> {
self.topology_description
.get_server_description(address)
.cloned()
}
fn emit_event(&self, make_event: impl FnOnce() -> SdamEvent) {
if let Some(ref emitter) = self.event_emitter {
#[allow(clippy::let_underscore_future)]
let _ = emitter.emit(make_event());
}
}
fn monitoring_enabled(&self) -> bool {
#[cfg(test)]
{
self.options
.test_options
.as_ref()
.map(|to| to.disable_monitoring_threads)
!= Some(true)
&& self.options.load_balanced != Some(true)
}
#[cfg(not(test))]
{
self.options.load_balanced != Some(true)
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct TopologyUpdater {
sender: mpsc::UnboundedSender<AcknowledgedMessage<UpdateMessage, bool>>,
}
impl TopologyUpdater {
pub(crate) fn channel() -> (TopologyUpdater, TopologyUpdateReceiver) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let updater = TopologyUpdater { sender: tx };
let update_receiver = TopologyUpdateReceiver {
update_receiver: rx,
};
(updater, update_receiver)
}
async fn send_message(&self, update: UpdateMessage) -> bool {
let (message, receiver) = AcknowledgedMessage::package(update);
match self.sender.send(message) {
Ok(_) => receiver.wait_for_acknowledgment().await.unwrap_or(false),
_ => false,
}
}
pub(crate) async fn handle_monitor_error(&self, address: ServerAddress, error: Error) -> bool {
self.send_message(UpdateMessage::MonitorError { address, error })
.await
}
pub(crate) async fn handle_application_error(
&self,
address: ServerAddress,
error: Error,
phase: HandshakePhase,
) -> bool {
self.send_message(UpdateMessage::ApplicationError {
address,
error,
phase,
})
.await
}
pub(crate) async fn update(&self, sd: ServerDescription) -> bool {
self.send_message(UpdateMessage::ServerUpdate(Box::new(sd)))
.await
}
pub(crate) async fn advance_cluster_time(&self, to: ClusterTime) {
self.send_message(UpdateMessage::AdvanceClusterTime(to))
.await;
}
pub(crate) async fn sync_hosts(&self, hosts: HashSet<ServerAddress>) {
self.send_message(UpdateMessage::SyncHosts(hosts)).await;
}
pub(crate) async fn shutdown(&self) {
self.send_message(UpdateMessage::Broadcast(BroadcastMessage::Shutdown))
.await;
}
pub(crate) async fn fill_pool(&self) {
self.send_message(UpdateMessage::Broadcast(BroadcastMessage::FillPool))
.await;
}
#[cfg(test)]
pub(crate) async fn sync_workers(&self) {
self.send_message(UpdateMessage::Broadcast(BroadcastMessage::SyncWorkers))
.await;
}
}
pub(crate) struct TopologyUpdateReceiver {
update_receiver: UnboundedReceiver<AcknowledgedMessage<UpdateMessage, bool>>,
}
impl TopologyUpdateReceiver {
pub(crate) async fn recv(&mut self) -> Option<AcknowledgedMessage<UpdateMessage, bool>> {
self.update_receiver.recv().await
}
}
#[derive(Debug, Clone)]
pub(crate) struct TopologyWatcher {
receiver: watch::Receiver<TopologyState>,
sender: Arc<watch::Sender<u32>>,
requested_check: bool,
#[cfg(test)]
initialized_receiver: watch::Receiver<bool>,
}
impl TopologyWatcher {
fn channel(initial_state: TopologyState) -> (TopologyWatcher, TopologyPublisher) {
#[cfg(test)]
let (initialized_sender, initialized_receiver) = watch::channel(false);
let (tx, rx) = watch::channel(initial_state);
let watcher = TopologyWatcher {
receiver: rx,
sender: Arc::new(watch::channel(0).0),
requested_check: false,
#[cfg(test)]
initialized_receiver,
};
let publisher = TopologyPublisher {
state_sender: tx,
#[cfg(test)]
initialized_sender,
};
(watcher, publisher)
}
pub(crate) fn is_alive(&self) -> bool {
self.receiver.has_changed().is_ok()
}
pub(crate) fn server_description(&self, address: &ServerAddress) -> Option<ServerDescription> {
self.receiver
.borrow()
.description
.get_server_description(address)
.cloned()
}
pub(crate) fn observe_latest(&mut self) -> TopologyState {
self.receiver.borrow_and_update().clone()
}
fn subscribe_to_topology_check_requests(&self) -> TopologyCheckRequestReceiver {
TopologyCheckRequestReceiver {
receiver: self.sender.subscribe(),
}
}
pub(crate) fn request_immediate_check(&mut self) {
if self.requested_check {
return;
}
self.requested_check = true;
self.sender.send_modify(|counter| *counter += 1);
}
pub(crate) async fn wait_for_update(&mut self, timeout: impl Into<Option<Duration>>) -> bool {
let changed = if let Some(timeout) = timeout.into() {
matches!(
runtime::timeout(timeout, self.receiver.changed()).await,
Ok(Ok(()))
)
} else {
self.receiver.changed().await.is_ok()
};
if changed {
self.retract_immediate_check_request();
}
changed
}
fn retract_immediate_check_request(&mut self) {
if self.requested_check {
self.requested_check = false;
self.sender.send_modify(|count| *count -= 1);
}
}
pub(crate) fn peek_latest(&self) -> Ref<TopologyState> {
self.receiver.borrow()
}
pub(crate) fn topology_type(&self) -> TopologyType {
self.peek_latest().description.topology_type
}
#[cfg(test)]
pub(crate) async fn wait_until_initialized(&mut self) {
while !*self.initialized_receiver.borrow() {
if self.initialized_receiver.changed().await.is_err() {
return;
}
}
}
}
impl Drop for TopologyWatcher {
fn drop(&mut self) {
self.retract_immediate_check_request();
}
}
struct TopologyPublisher {
state_sender: watch::Sender<TopologyState>,
#[cfg(test)]
initialized_sender: watch::Sender<bool>,
}
impl TopologyPublisher {
fn publish_new_state(&self, state: TopologyState) {
let _ = self.state_sender.send(state);
}
}
#[derive(Clone)]
pub(crate) struct SdamEventEmitter {
sender: UnboundedSender<AcknowledgedMessage<SdamEvent>>,
}
impl SdamEventEmitter {
pub(crate) fn emit(&self, event: impl Into<SdamEvent>) -> impl Future<Output = ()> {
let (msg, ack) = AcknowledgedMessage::package(event.into());
let _ = self.sender.send(msg);
ack.wait_for_acknowledgment().map(|_| ())
}
}
#[derive(Debug, Clone)]
pub(crate) enum HandshakePhase {
PreHello { generation: PoolGeneration },
PostHello { generation: ConnectionGeneration },
AfterCompletion {
generation: ConnectionGeneration,
max_wire_version: i32,
},
}
impl HandshakePhase {
pub(crate) fn after_completion(handshaked_connection: &Connection) -> Self {
Self::AfterCompletion {
generation: handshaked_connection.generation,
max_wire_version: handshaked_connection
.stream_description()
.ok()
.and_then(|sd| sd.max_wire_version)
.unwrap_or(0),
}
}
pub(crate) fn service_id(&self) -> Option<ObjectId> {
match self {
HandshakePhase::PreHello { .. } => None,
HandshakePhase::PostHello { generation, .. } => generation.service_id(),
HandshakePhase::AfterCompletion { generation, .. } => generation.service_id(),
}
}
fn is_before_completion(&self) -> bool {
!matches!(self, HandshakePhase::AfterCompletion { .. })
}
fn wire_version(&self) -> Option<i32> {
match self {
HandshakePhase::AfterCompletion {
max_wire_version, ..
} => Some(*max_wire_version),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct TopologyCheckRequestReceiver {
receiver: watch::Receiver<u32>,
}
impl TopologyCheckRequestReceiver {
pub(crate) async fn wait_for_check_request(&mut self) {
while *self.receiver.borrow() == 0 {
if self.receiver.changed().await.is_err() {
return;
};
}
}
}
#[derive(Debug, Clone)]
struct MonitoredServer {
inner: Arc<Server>,
monitor_manager: MonitorManager,
}
impl std::ops::Deref for MonitoredServer {
type Target = Server;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}