use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use super::WeakTopology;
use crate::{
cmap::{options::ConnectionPoolOptions, ConnectionPool, EstablishError},
options::{ClientOptions, ServerAddress},
runtime::{AcknowledgedMessage, HttpClient},
sdam::monitor::Monitor,
};
#[derive(Debug)]
pub(crate) struct Server {
pub(crate) address: ServerAddress,
pub(crate) pool: ConnectionPool,
operation_count: AtomicU32,
}
impl Server {
#[cfg(test)]
pub(crate) fn new_mocked(address: ServerAddress, operation_count: u32) -> Self {
Self {
address: address.clone(),
pool: ConnectionPool::new_mocked(address),
operation_count: AtomicU32::new(operation_count),
}
}
pub(crate) fn create(
address: ServerAddress,
options: &ClientOptions,
topology: WeakTopology,
http_client: HttpClient,
) -> (Arc<Self>, Monitor) {
let (update_sender, update_receiver) = ServerUpdateSender::channel();
let server = Arc::new(Self {
pool: ConnectionPool::new(
address.clone(),
http_client,
update_sender,
Some(ConnectionPoolOptions::from_client_options(options)),
),
address: address.clone(),
operation_count: AtomicU32::new(0),
});
let monitor = Monitor::new(address, &server, topology, options.clone(), update_receiver);
(server, monitor)
}
pub(crate) fn increment_operation_count(&self) {
self.operation_count.fetch_add(1, Ordering::SeqCst);
}
pub(crate) fn decrement_operation_count(&self) {
self.operation_count.fetch_sub(1, Ordering::SeqCst);
}
pub(crate) fn operation_count(&self) -> u32 {
self.operation_count.load(Ordering::SeqCst)
}
}
#[derive(Debug)]
pub(crate) enum ServerUpdate {
Error { error: EstablishError },
}
#[derive(Debug)]
pub(crate) struct ServerUpdateReceiver {
receiver: tokio::sync::mpsc::Receiver<AcknowledgedMessage<ServerUpdate>>,
}
impl ServerUpdateReceiver {
pub(crate) async fn recv(&mut self) -> Option<AcknowledgedMessage<ServerUpdate>> {
self.receiver.recv().await
}
}
#[derive(Clone, Debug)]
pub(crate) struct ServerUpdateSender {
sender: tokio::sync::mpsc::Sender<AcknowledgedMessage<ServerUpdate>>,
}
impl ServerUpdateSender {
pub(crate) fn channel() -> (Self, ServerUpdateReceiver) {
let (sender, receiver) = tokio::sync::mpsc::channel(1);
(
ServerUpdateSender { sender },
ServerUpdateReceiver { receiver },
)
}
pub(crate) async fn handle_error(&mut self, error: EstablishError) {
let reason = ServerUpdate::Error { error };
let (message, callback) = AcknowledgedMessage::package(reason);
let _: std::result::Result<_, _> = self.sender.send(message).await;
callback.wait_for_acknowledgment().await;
}
}