mod command;
mod stream_description;
mod wire;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use derivative::Derivative;
use self::wire::Message;
use super::manager::PoolManager;
use crate::{
cmap::options::{ConnectionOptions, StreamOptions},
error::{ErrorKind, Result},
event::cmap::{
CmapEventHandler,
ConnectionCheckedInEvent,
ConnectionCheckedOutEvent,
ConnectionClosedEvent,
ConnectionClosedReason,
ConnectionCreatedEvent,
ConnectionReadyEvent,
},
options::{StreamAddress, TlsOptions},
runtime::AsyncStream,
};
pub(crate) use command::{Command, CommandResponse};
pub(crate) use stream_description::StreamDescription;
pub(crate) use wire::next_request_id;
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
pub id: u32,
pub address: StreamAddress,
}
#[derive(Derivative)]
#[derivative(Debug)]
pub(crate) struct Connection {
pub(super) id: u32,
pub(super) address: StreamAddress,
pub(super) generation: u32,
pub(super) stream_description: Option<StreamDescription>,
ready_and_available_time: Option<Instant>,
pub(super) pool_manager: Option<PoolManager>,
command_executing: bool,
error: bool,
stream: AsyncStream,
#[derivative(Debug = "ignore")]
handler: Option<Arc<dyn CmapEventHandler>>,
}
impl Connection {
async fn new(
id: u32,
address: StreamAddress,
generation: u32,
options: Option<ConnectionOptions>,
) -> Result<Self> {
let stream_options = StreamOptions {
address: address.clone(),
connect_timeout: options.as_ref().and_then(|opts| opts.connect_timeout),
tls_options: options.as_ref().and_then(|opts| opts.tls_options.clone()),
};
let conn = Self {
id,
generation,
pool_manager: None,
command_executing: false,
ready_and_available_time: None,
stream: AsyncStream::connect(stream_options).await?,
address,
handler: options.and_then(|options| options.event_handler),
stream_description: None,
error: false,
};
Ok(conn)
}
pub(super) async fn connect(pending_connection: PendingConnection) -> Result<Self> {
Self::new(
pending_connection.id,
pending_connection.address.clone(),
pending_connection.generation,
pending_connection.options,
)
.await
}
pub(crate) async fn connect_monitoring(
address: StreamAddress,
connect_timeout: Option<Duration>,
tls_options: Option<TlsOptions>,
) -> Result<Self> {
Self::new(
0,
address,
0,
Some(ConnectionOptions {
connect_timeout,
tls_options,
event_handler: None,
}),
)
.await
}
#[cfg(test)]
pub(crate) async fn new_testing(
id: u32,
address: StreamAddress,
generation: u32,
options: Option<ConnectionOptions>,
) -> Result<Self> {
Self::new(id, address, generation, options).await
}
pub(crate) fn info(&self) -> ConnectionInfo {
ConnectionInfo {
id: self.id,
address: self.address.clone(),
}
}
pub(crate) fn address(&self) -> &StreamAddress {
&self.address
}
pub(super) fn mark_as_available(&mut self) {
self.pool_manager.take();
self.ready_and_available_time = Some(Instant::now());
}
pub(super) fn mark_as_in_use(&mut self, manager: PoolManager) {
self.pool_manager = Some(manager);
self.ready_and_available_time.take();
}
pub(super) fn is_idle(&self, max_idle_time: Option<Duration>) -> bool {
self.ready_and_available_time
.and_then(|ready_and_available_time| {
max_idle_time.map(|max_idle_time| {
Instant::now().duration_since(ready_and_available_time) >= max_idle_time
})
})
.unwrap_or(false)
}
pub(super) fn is_stale(&self, current_generation: u32) -> bool {
self.generation != current_generation
}
pub(super) fn is_executing(&self) -> bool {
self.command_executing
}
pub(super) fn has_errored(&self) -> bool {
self.error
}
pub(super) fn checked_out_event(&self) -> ConnectionCheckedOutEvent {
ConnectionCheckedOutEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn checked_in_event(&self) -> ConnectionCheckedInEvent {
ConnectionCheckedInEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn ready_event(&self) -> ConnectionReadyEvent {
ConnectionReadyEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
pub(super) fn closed_event(&self, reason: ConnectionClosedReason) -> ConnectionClosedEvent {
ConnectionClosedEvent {
address: self.address.clone(),
connection_id: self.id,
reason,
}
}
pub(crate) async fn send_command(
&mut self,
command: Command,
request_id: impl Into<Option<i32>>,
) -> Result<CommandResponse> {
let message = Message::with_command(command, request_id.into());
self.command_executing = true;
let write_result = message.write_to(&mut self.stream).await;
self.error = write_result.is_err();
write_result?;
let response_message_result = Message::read_from(&mut self.stream).await;
self.command_executing = false;
self.error = response_message_result.is_err();
CommandResponse::new(self.address.clone(), response_message_result?)
}
pub(crate) fn stream_description(&self) -> Result<&StreamDescription> {
self.stream_description.as_ref().ok_or_else(|| {
ErrorKind::OperationError {
message: "Stream checked out but not handshaked".to_string(),
}
.into()
})
}
pub(super) fn close_and_drop(mut self, reason: ConnectionClosedReason) {
self.close(reason);
}
fn close(&mut self, reason: ConnectionClosedReason) {
self.pool_manager.take();
if let Some(ref handler) = self.handler {
handler.handle_connection_closed_event(self.closed_event(reason));
}
}
fn take(&mut self) -> Connection {
Connection {
id: self.id,
address: self.address.clone(),
generation: self.generation,
stream: std::mem::replace(&mut self.stream, AsyncStream::Null),
handler: self.handler.take(),
stream_description: self.stream_description.take(),
command_executing: self.command_executing,
error: self.error,
pool_manager: None,
ready_and_available_time: None,
}
}
}
impl Drop for Connection {
fn drop(&mut self) {
if let Some(pool_manager) = self.pool_manager.take() {
let dropped_connection = self.take();
if let Err(mut conn) = pool_manager.check_in(dropped_connection) {
conn.close(ConnectionClosedReason::PoolClosed);
}
}
}
}
#[derive(Debug)]
pub(super) struct PendingConnection {
pub(super) id: u32,
pub(super) address: StreamAddress,
pub(super) generation: u32,
pub(super) options: Option<ConnectionOptions>,
}
impl PendingConnection {
pub(super) fn created_event(&self) -> ConnectionCreatedEvent {
ConnectionCreatedEvent {
address: self.address.clone(),
connection_id: self.id,
}
}
}