mod command;
mod stream_description;
mod wire;
use std::{
sync::{Arc, Weak},
time::{Duration, Instant},
};
use derivative::Derivative;
use self::wire::Message;
use super::ConnectionPoolInner;
use crate::{
cmap::options::{ConnectionOptions, StreamOptions},
error::{ErrorKind, Result},
event::cmap::{
CmapEventHandler,
ConnectionCheckedInEvent,
ConnectionCheckedOutEvent,
ConnectionClosedEvent,
ConnectionClosedReason,
ConnectionCreatedEvent,
ConnectionReadyEvent,
},
options::{StreamAddress, TlsOptions},
runtime::AsyncStream,
RUNTIME,
};
pub(crate) use command::{Command, CommandResponse};
pub(crate) use stream_description::StreamDescription;
pub(crate) use wire::next_request_id;
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
pub id: u32,
pub address: StreamAddress,
}
#[derive(Derivative)]
#[derivative(Debug)]
pub(crate) struct Connection {
pub(super) id: u32,
pub(super) address: StreamAddress,
pub(super) generation: u32,
pub(super) stream_description: Option<StreamDescription>,
ready_and_available_time: Option<Instant>,
pub(super) pool: Option<Weak<ConnectionPoolInner>>,
command_executing: bool,
stream: AsyncStream,
#[derivative(Debug = "ignore")]
handler: Option<Arc<dyn CmapEventHandler>>,
}
impl Connection {
pub(crate) async fn new(
id: u32,
address: StreamAddress,
generation: u32,
options: Option<ConnectionOptions>,
) -> Result<Self> {
let stream_options = StreamOptions {
address: address.clone(),
connect_timeout: options.as_ref().and_then(|opts| opts.connect_timeout),
tls_options: options.as_ref().and_then(|opts| opts.tls_options.clone()),
};
let conn = Self {
id,
generation,
pool: None,
command_executing: false,
ready_and_available_time: None,
stream: AsyncStream::connect(stream_options).await?,
address,
handler: options.and_then(|options| options.event_handler),
stream_description: None,
};
Ok(conn)
}
pub(crate) async fn new_monitoring(
address: StreamAddress,
connect_timeout: Option<Duration>,
tls_options: Option<TlsOptions>,
) -> Result<Self> {
Self::new(
0,
address,
0,
Some(ConnectionOptions {
connect_timeout,
tls_options,
event_handler: None,
}),
)
.await
}
pub(crate) fn info(&self) -> ConnectionInfo {
ConnectionInfo {
id: self.id,
address: self.address.clone(),
}
}
pub(crate) fn address(&self) -> &StreamAddress {
&self.address
}
pub(super) fn mark_checked_in(&mut self) {
self.pool.take();
self.ready_and_available_time = Some(Instant::now());
}
pub(super) fn mark_checked_out(&mut self, pool: Weak<ConnectionPoolInner>) {
self.pool = Some(pool);
self.ready_and_available_time.take();
}
pub(super) fn is_idle(&self, max_idle_time: Option<Duration>) -> bool {
self.ready_and_available_time
.and_then(|ready_and_available_time| {
max_idle_time.map(|max_idle_time| {
Instant::now().duration_since(ready_and_available_time) >= max_idle_time
})
})
.unwrap_or(false)
}
pub(super) fn is_stale(&self, current_generation: u32) -> bool {
self.generation != current_generation
}
pub(super) fn is_executing(&self) -> bool {
self.command_executing
}
pub(super) fn checked_out_event(&self) -> ConnectionCheckedOutEvent {
ConnectionCheckedOutEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn checked_in_event(&self) -> ConnectionCheckedInEvent {
ConnectionCheckedInEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn ready_event(&self) -> ConnectionReadyEvent {
ConnectionReadyEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn created_event(&self) -> ConnectionCreatedEvent {
ConnectionCreatedEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn closed_event(&self, reason: ConnectionClosedReason) -> ConnectionClosedEvent {
ConnectionClosedEvent {
address: self.address.clone(),
connection_id: self.id,
reason,
}
}
pub(crate) async fn send_command(
&mut self,
command: Command,
request_id: impl Into<Option<i32>>,
) -> Result<CommandResponse> {
let message = Message::with_command(command, request_id.into());
self.command_executing = true;
message.write_to(&mut self.stream).await?;
let response_message = Message::read_from(&mut self.stream).await?;
self.command_executing = false;
CommandResponse::new(self.address.clone(), response_message)
}
pub(crate) fn stream_description(&self) -> Result<&StreamDescription> {
self.stream_description.as_ref().ok_or_else(|| {
ErrorKind::OperationError {
message: "Stream checked out but not handshaked".to_string(),
}
.into()
})
}
pub(super) fn close_and_drop(mut self, reason: ConnectionClosedReason) {
self.close(reason);
}
fn close(&mut self, reason: ConnectionClosedReason) {
self.pool.take();
if let Some(ref handler) = self.handler {
handler.handle_connection_closed_event(self.closed_event(reason));
}
}
fn take(&mut self) -> DroppedConnectionState {
self.pool.take();
DroppedConnectionState {
id: self.id,
address: self.address.clone(),
generation: self.generation,
stream: std::mem::replace(&mut self.stream, AsyncStream::Null),
handler: self.handler.take(),
stream_description: self.stream_description.take(),
command_executing: self.command_executing,
}
}
}
impl Drop for Connection {
fn drop(&mut self) {
if let Some(ref weak_pool_ref) = self.pool {
if let Some(strong_pool_ref) = weak_pool_ref.upgrade() {
let dropped_connection_state = self.take();
RUNTIME.execute(async move {
strong_pool_ref
.check_in(dropped_connection_state.into())
.await;
});
} else {
self.close(ConnectionClosedReason::PoolClosed);
}
}
}
}
#[derive(Derivative)]
#[derivative(Debug)]
struct DroppedConnectionState {
pub(super) id: u32,
pub(super) address: StreamAddress,
pub(super) generation: u32,
stream: AsyncStream,
#[derivative(Debug = "ignore")]
handler: Option<Arc<dyn CmapEventHandler>>,
stream_description: Option<StreamDescription>,
command_executing: bool,
}
impl Drop for DroppedConnectionState {
fn drop(&mut self) {
if let Some(ref handler) = self.handler {
handler.handle_connection_closed_event(ConnectionClosedEvent {
address: self.address.clone(),
connection_id: self.id,
reason: ConnectionClosedReason::PoolClosed,
});
}
}
}
impl From<DroppedConnectionState> for Connection {
fn from(mut state: DroppedConnectionState) -> Self {
Self {
id: state.id,
address: state.address.clone(),
generation: state.generation,
command_executing: state.command_executing,
stream: std::mem::replace(&mut state.stream, AsyncStream::Null),
handler: state.handler.take(),
stream_description: state.stream_description.take(),
ready_and_available_time: None,
pool: None,
}
}
}