mod command;
mod stream_description;
mod wire;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use derivative::Derivative;
use self::wire::Message;
use super::manager::PoolManager;
use crate::{
bson::oid::ObjectId,
cmap::{
options::{ConnectionOptions, StreamOptions},
PoolGeneration,
},
error::{load_balanced_mode_mismatch, ErrorKind, Result},
event::cmap::{
CmapEventHandler,
ConnectionCheckedInEvent,
ConnectionCheckedOutEvent,
ConnectionClosedEvent,
ConnectionClosedReason,
ConnectionCreatedEvent,
ConnectionReadyEvent,
},
options::{ServerAddress, TlsOptions},
runtime::AsyncStream,
};
pub(crate) use command::{Command, RawCommand, RawCommandResponse};
pub(crate) use stream_description::StreamDescription;
pub(crate) use wire::next_request_id;
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
pub id: u32,
pub address: ServerAddress,
}
#[derive(Derivative)]
#[derivative(Debug)]
pub(crate) struct Connection {
pub(super) id: u32,
pub(crate) address: ServerAddress,
pub(crate) generation: ConnectionGeneration,
pub(super) stream_description: Option<StreamDescription>,
ready_and_available_time: Option<Instant>,
pub(super) pool_manager: Option<PoolManager>,
command_executing: bool,
error: bool,
stream: AsyncStream,
#[derivative(Debug = "ignore")]
handler: Option<Arc<dyn CmapEventHandler>>,
}
impl Connection {
async fn new(
id: u32,
address: ServerAddress,
generation: u32,
options: Option<ConnectionOptions>,
) -> Result<Self> {
let stream_options = StreamOptions {
address: address.clone(),
connect_timeout: options.as_ref().and_then(|opts| opts.connect_timeout),
tls_options: options.as_ref().and_then(|opts| opts.tls_options.clone()),
};
let conn = Self {
id,
generation: ConnectionGeneration::Normal(generation),
pool_manager: None,
command_executing: false,
ready_and_available_time: None,
stream: AsyncStream::connect(stream_options).await?,
address,
handler: options.and_then(|options| options.event_handler),
stream_description: None,
error: false,
};
Ok(conn)
}
pub(super) async fn connect(pending_connection: PendingConnection) -> Result<Self> {
let generation = match pending_connection.generation {
PoolGeneration::Normal(gen) => gen,
PoolGeneration::LoadBalanced(_) => 0,
};
Self::new(
pending_connection.id,
pending_connection.address.clone(),
generation,
pending_connection.options,
)
.await
}
pub(crate) async fn connect_monitoring(
address: ServerAddress,
connect_timeout: Option<Duration>,
tls_options: Option<TlsOptions>,
) -> Result<Self> {
Self::new(
0,
address,
0,
Some(ConnectionOptions {
connect_timeout,
tls_options,
event_handler: None,
}),
)
.await
}
#[cfg(test)]
pub(crate) async fn new_testing(
id: u32,
address: ServerAddress,
generation: u32,
options: Option<ConnectionOptions>,
) -> Result<Self> {
Self::new(id, address, generation, options).await
}
pub(crate) fn info(&self) -> ConnectionInfo {
ConnectionInfo {
id: self.id,
address: self.address.clone(),
}
}
pub(crate) fn address(&self) -> &ServerAddress {
&self.address
}
pub(super) fn mark_as_available(&mut self) {
self.pool_manager.take();
self.ready_and_available_time = Some(Instant::now());
}
pub(super) fn mark_as_in_use(&mut self, manager: PoolManager) {
self.pool_manager = Some(manager);
self.ready_and_available_time.take();
}
pub(super) fn is_idle(&self, max_idle_time: Option<Duration>) -> bool {
self.ready_and_available_time
.and_then(|ready_and_available_time| {
max_idle_time.map(|max_idle_time| {
Instant::now().duration_since(ready_and_available_time) >= max_idle_time
})
})
.unwrap_or(false)
}
pub(super) fn is_executing(&self) -> bool {
self.command_executing
}
pub(super) fn has_errored(&self) -> bool {
self.error
}
pub(super) fn checked_out_event(&self) -> ConnectionCheckedOutEvent {
ConnectionCheckedOutEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn checked_in_event(&self) -> ConnectionCheckedInEvent {
ConnectionCheckedInEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn ready_event(&self) -> ConnectionReadyEvent {
ConnectionReadyEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn closed_event(&self, reason: ConnectionClosedReason) -> ConnectionClosedEvent {
ConnectionClosedEvent {
address: self.address.clone(),
connection_id: self.id,
reason,
}
}
async fn send_message(&mut self, message: Message) -> Result<RawCommandResponse> {
self.command_executing = true;
let write_result = message.write_to(&mut self.stream).await;
self.error = write_result.is_err();
write_result?;
let response_message_result = Message::read_from(&mut self.stream).await;
self.command_executing = false;
self.error = response_message_result.is_err();
RawCommandResponse::new(self.address.clone(), response_message_result?)
}
pub(crate) async fn send_command(
&mut self,
command: Command,
request_id: impl Into<Option<i32>>,
) -> Result<RawCommandResponse> {
let message = Message::with_command(command, request_id.into())?;
self.send_message(message).await
}
pub(crate) async fn send_raw_command(
&mut self,
command: RawCommand,
request_id: impl Into<Option<i32>>,
) -> Result<RawCommandResponse> {
let message = Message::with_raw_command(command, request_id.into());
self.send_message(message).await
}
pub(crate) fn stream_description(&self) -> Result<&StreamDescription> {
self.stream_description.as_ref().ok_or_else(|| {
ErrorKind::Internal {
message: "Stream checked out but not handshaked".to_string(),
}
.into()
})
}
pub(super) fn close_and_drop(mut self, reason: ConnectionClosedReason) {
self.close(reason);
}
fn close(&mut self, reason: ConnectionClosedReason) {
self.pool_manager.take();
if let Some(ref handler) = self.handler {
handler.handle_connection_closed_event(self.closed_event(reason));
}
}
fn take(&mut self) -> Connection {
Connection {
id: self.id,
address: self.address.clone(),
generation: self.generation.clone(),
stream: std::mem::replace(&mut self.stream, AsyncStream::Null),
handler: self.handler.take(),
stream_description: self.stream_description.take(),
command_executing: self.command_executing,
error: self.error,
pool_manager: None,
ready_and_available_time: None,
}
}
}
impl Drop for Connection {
fn drop(&mut self) {
if let Some(pool_manager) = self.pool_manager.take() {
let dropped_connection = self.take();
if let Err(mut conn) = pool_manager.check_in(dropped_connection) {
conn.close(ConnectionClosedReason::PoolClosed);
}
}
}
}
#[derive(Debug, Clone)]
pub(crate) enum ConnectionGeneration {
Normal(u32),
LoadBalanced {
generation: u32,
service_id: ObjectId,
},
}
impl ConnectionGeneration {
pub(crate) fn service_id(&self) -> Option<ObjectId> {
match self {
ConnectionGeneration::Normal(_) => None,
ConnectionGeneration::LoadBalanced { service_id, .. } => Some(*service_id),
}
}
pub(crate) fn is_stale(&self, current_generation: &PoolGeneration) -> bool {
match (self, current_generation) {
(ConnectionGeneration::Normal(cgen), PoolGeneration::Normal(pgen)) => cgen != pgen,
(
ConnectionGeneration::LoadBalanced {
generation: cgen,
service_id,
},
PoolGeneration::LoadBalanced(gen_map),
) => cgen != gen_map.get(service_id).unwrap_or(&0),
_ => load_balanced_mode_mismatch!(false),
}
}
}
#[derive(Debug)]
pub(super) struct PendingConnection {
pub(super) id: u32,
pub(super) address: ServerAddress,
pub(super) generation: PoolGeneration,
pub(super) options: Option<ConnectionOptions>,
}
impl PendingConnection {
pub(super) fn created_event(&self) -> ConnectionCreatedEvent {
ConnectionCreatedEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
}