#[cfg(test)]
use super::options::BackgroundThreadInterval;
use super::{
conn::PendingConnection,
connection_requester,
connection_requester::{
ConnectionRequest,
ConnectionRequestReceiver,
ConnectionRequestResult,
ConnectionRequester,
},
establish::ConnectionEstablisher,
manager,
manager::{ConnectionSucceeded, ManagementRequestReceiver, PoolManagementRequest, PoolManager},
options::ConnectionPoolOptions,
status,
status::{PoolGenerationPublisher, PoolGenerationSubscriber},
Connection,
DEFAULT_MAX_POOL_SIZE,
};
use crate::{
bson::oid::ObjectId,
client::auth::Credential,
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
event::cmap::{
CmapEventEmitter,
ConnectionClosedEvent,
ConnectionClosedReason,
PoolClearedEvent,
PoolClosedEvent,
PoolReadyEvent,
},
options::ServerAddress,
runtime::{self, WorkerHandleListener},
sdam::TopologyUpdater,
};
use std::{
collections::{HashMap, VecDeque},
time::Duration,
};
const MAX_CONNECTING: u32 = 2;
const MAINTENACE_FREQUENCY: Duration = Duration::from_millis(500);
pub(crate) struct ConnectionPoolWorker {
address: ServerAddress,
state: PoolState,
total_connection_count: u32,
pending_connection_count: u32,
next_connection_id: u32,
generation: PoolGeneration,
service_connection_count: HashMap<ObjectId, u32>,
available_connections: VecDeque<Connection>,
establisher: ConnectionEstablisher,
credential: Option<Credential>,
event_emitter: CmapEventEmitter,
maintenance_frequency: Duration,
max_idle_time: Option<Duration>,
min_pool_size: Option<u32>,
max_pool_size: u32,
handle_listener: WorkerHandleListener,
request_receiver: ConnectionRequestReceiver,
wait_queue: VecDeque<ConnectionRequest>,
management_receiver: ManagementRequestReceiver,
generation_publisher: PoolGenerationPublisher,
manager: PoolManager,
server_updater: TopologyUpdater,
}
impl ConnectionPoolWorker {
pub(super) fn start(
address: ServerAddress,
establisher: ConnectionEstablisher,
server_updater: TopologyUpdater,
event_emitter: CmapEventEmitter,
options: Option<ConnectionPoolOptions>,
) -> (PoolManager, ConnectionRequester, PoolGenerationSubscriber) {
let mut max_idle_time = options.as_ref().and_then(|opts| opts.max_idle_time);
if max_idle_time == Some(Duration::from_millis(0)) {
max_idle_time = None;
}
let max_pool_size = options
.as_ref()
.and_then(|opts| opts.max_pool_size)
.unwrap_or(DEFAULT_MAX_POOL_SIZE);
let min_pool_size = options.as_ref().and_then(|opts| opts.min_pool_size);
let (handle, handle_listener) = WorkerHandleListener::channel();
let (connection_requester, request_receiver) = connection_requester::channel(handle);
let (manager, management_receiver) = manager::channel();
let is_load_balanced = options
.as_ref()
.and_then(|opts| opts.load_balanced)
.unwrap_or(false);
let generation = if is_load_balanced {
PoolGeneration::load_balanced()
} else {
PoolGeneration::normal()
};
let (generation_publisher, generation_subscriber) = status::channel(generation.clone());
#[cfg(test)]
let mut state = if options
.as_ref()
.and_then(|opts| opts.ready)
.unwrap_or(false)
{
PoolState::Ready
} else {
PoolState::New
};
#[cfg(test)]
let maintenance_frequency = options
.as_ref()
.and_then(|opts| opts.background_thread_interval)
.map(|i| match i {
BackgroundThreadInterval::Never => Duration::from_secs(31_556_952),
BackgroundThreadInterval::Every(d) => d,
})
.unwrap_or(MAINTENACE_FREQUENCY);
#[cfg(not(test))]
let (mut state, maintenance_frequency) = (PoolState::New, MAINTENACE_FREQUENCY);
if is_load_balanced {
state = PoolState::Ready;
}
let credential = options.and_then(|o| o.credential);
let worker = ConnectionPoolWorker {
address,
event_emitter,
max_idle_time,
min_pool_size,
credential,
establisher,
next_connection_id: 1,
total_connection_count: 0,
pending_connection_count: 0,
generation,
service_connection_count: HashMap::new(),
available_connections: VecDeque::new(),
max_pool_size,
request_receiver,
wait_queue: Default::default(),
management_receiver,
manager: manager.clone(),
handle_listener,
state,
generation_publisher,
maintenance_frequency,
server_updater,
};
runtime::execute(async move {
worker.execute().await;
});
(manager, connection_requester, generation_subscriber)
}
async fn execute(mut self) {
let mut maintenance_interval = runtime::interval(self.maintenance_frequency);
loop {
let task = tokio::select! {
biased;
Some(request) = self.management_receiver.recv() => request.into(),
_ = self.handle_listener.wait_for_all_handle_drops() => {
break
},
Some(request) = self.request_receiver.recv() => {
PoolTask::CheckOut(request)
},
_ = maintenance_interval.tick() => {
PoolTask::Maintenance
},
else => {
break
}
};
match task {
PoolTask::CheckOut(request) => match self.state {
PoolState::Ready => {
self.wait_queue.push_back(request);
}
PoolState::Paused(ref e) => {
let _ = request.fulfill(ConnectionRequestResult::PoolCleared(e.clone()));
}
PoolState::New => {
let _ = request.fulfill(ConnectionRequestResult::PoolCleared(
ErrorKind::Internal {
message: "check out attempted from new pool".to_string(),
}
.into(),
));
}
},
PoolTask::HandleManagementRequest(request) => match *request {
PoolManagementRequest::CheckIn(connection) => {
self.check_in(*connection);
}
PoolManagementRequest::Clear {
cause, service_id, ..
} => {
self.clear(cause, service_id);
}
PoolManagementRequest::MarkAsReady { completion_handler } => {
self.mark_as_ready();
completion_handler.acknowledge(());
}
PoolManagementRequest::HandleConnectionSucceeded(conn) => {
self.handle_connection_succeeded(conn);
}
PoolManagementRequest::HandleConnectionFailed => {
self.handle_connection_failed();
}
#[cfg(test)]
PoolManagementRequest::Sync(tx) => {
let _ = tx.send(());
}
},
PoolTask::Maintenance => {
self.perform_maintenance();
}
}
if self.can_service_connection_request() {
if let Some(request) = self.wait_queue.pop_front() {
self.check_out(request);
}
}
}
while let Some(connection) = self.available_connections.pop_front() {
connection.close_and_drop(ConnectionClosedReason::PoolClosed);
}
self.event_emitter.emit_event(|| {
PoolClosedEvent {
address: self.address.clone(),
}
.into()
});
}
fn below_max_connections(&self) -> bool {
self.total_connection_count < self.max_pool_size
}
fn can_service_connection_request(&self) -> bool {
if !matches!(self.state, PoolState::Ready) {
return false;
}
if !self.available_connections.is_empty() {
return true;
}
self.below_max_connections() && self.pending_connection_count < MAX_CONNECTING
}
fn check_out(&mut self, request: ConnectionRequest) {
while let Some(mut conn) = self.available_connections.pop_back() {
if conn.generation.is_stale(&self.generation) {
self.close_connection(conn, ConnectionClosedReason::Stale);
continue;
}
if conn.is_idle(self.max_idle_time) {
self.close_connection(conn, ConnectionClosedReason::Idle);
continue;
}
conn.mark_as_in_use(self.manager.clone());
if let Err(request) = request.fulfill(ConnectionRequestResult::Pooled(Box::new(conn))) {
let mut connection = request.unwrap_pooled_connection();
connection.mark_as_available();
self.available_connections.push_back(connection);
}
return;
}
if self.below_max_connections() {
let event_emitter = self.event_emitter.clone();
let establisher = self.establisher.clone();
let pending_connection = self.create_pending_connection();
let manager = self.manager.clone();
let server_updater = self.server_updater.clone();
let credential = self.credential.clone();
let handle = runtime::spawn(async move {
let mut establish_result = establish_connection(
establisher,
pending_connection,
server_updater,
&manager,
credential,
event_emitter,
)
.await;
if let Ok(ref mut c) = establish_result {
c.mark_as_in_use(manager.clone());
manager.handle_connection_succeeded(ConnectionSucceeded::Used {
service_id: c.generation.service_id(),
});
}
establish_result
});
let _: std::result::Result<_, _> =
request.fulfill(ConnectionRequestResult::Establishing(handle));
} else {
self.wait_queue.push_front(request);
}
}
fn create_pending_connection(&mut self) -> PendingConnection {
self.total_connection_count += 1;
self.pending_connection_count += 1;
let pending_connection = PendingConnection {
id: self.next_connection_id,
address: self.address.clone(),
generation: self.generation.clone(),
event_emitter: self.event_emitter.clone(),
};
self.next_connection_id += 1;
self.event_emitter
.emit_event(|| pending_connection.created_event().into());
pending_connection
}
fn handle_connection_failed(&mut self) {
self.total_connection_count -= 1;
self.pending_connection_count -= 1;
}
fn handle_connection_succeeded(&mut self, connection: ConnectionSucceeded) {
self.pending_connection_count -= 1;
if let Some(sid) = connection.service_id() {
let count = self.service_connection_count.entry(sid).or_insert(0);
*count += 1;
}
if let ConnectionSucceeded::ForPool(connection) = connection {
let mut connection = *connection;
connection.mark_as_available();
self.available_connections.push_back(connection);
}
}
fn check_in(&mut self, mut conn: Connection) {
self.event_emitter
.emit_event(|| conn.checked_in_event().into());
conn.mark_as_available();
if conn.has_errored() {
self.close_connection(conn, ConnectionClosedReason::Error);
} else if conn.generation.is_stale(&self.generation) {
self.close_connection(conn, ConnectionClosedReason::Stale);
} else if conn.is_executing() {
self.close_connection(conn, ConnectionClosedReason::Dropped);
} else {
self.available_connections.push_back(conn);
}
}
fn clear(&mut self, cause: Error, service_id: Option<ObjectId>) {
let was_ready = match (&mut self.generation, service_id) {
(PoolGeneration::Normal(gen), None) => {
*gen += 1;
let prev = std::mem::replace(&mut self.state, PoolState::Paused(cause.clone()));
matches!(prev, PoolState::Ready)
}
(PoolGeneration::LoadBalanced(gen_map), Some(sid)) => {
let gen = gen_map.entry(sid).or_insert(0);
*gen += 1;
true
}
(..) => load_balanced_mode_mismatch!(),
};
self.generation_publisher.publish(self.generation.clone());
if was_ready {
self.event_emitter.emit_event(|| {
PoolClearedEvent {
address: self.address.clone(),
service_id,
}
.into()
});
if !matches!(self.generation, PoolGeneration::LoadBalanced(_)) {
for request in self.wait_queue.drain(..) {
let _: std::result::Result<_, _> =
request.fulfill(ConnectionRequestResult::PoolCleared(cause.clone()));
}
}
}
}
fn mark_as_ready(&mut self) {
if matches!(self.state, PoolState::Ready) {
return;
}
self.state = PoolState::Ready;
self.event_emitter.emit_event(|| {
PoolReadyEvent {
address: self.address.clone(),
}
.into()
});
}
#[allow(clippy::single_match)]
fn close_connection(&mut self, connection: Connection, reason: ConnectionClosedReason) {
match (&mut self.generation, connection.generation.service_id()) {
(PoolGeneration::LoadBalanced(gen_map), Some(sid)) => {
match self.service_connection_count.get_mut(&sid) {
Some(count) => {
*count -= 1;
if *count == 0 {
gen_map.remove(&sid);
self.service_connection_count.remove(&sid);
}
}
None => load_balanced_mode_mismatch!(),
}
}
(PoolGeneration::Normal(_), None) => {}
_ => load_balanced_mode_mismatch!(),
}
connection.close_and_drop(reason);
self.total_connection_count -= 1;
}
fn perform_maintenance(&mut self) {
self.remove_perished_connections();
if matches!(self.state, PoolState::Ready) {
self.ensure_min_connections();
}
}
fn remove_perished_connections(&mut self) {
while let Some(connection) = self.available_connections.pop_front() {
if connection.generation.is_stale(&self.generation) {
self.close_connection(connection, ConnectionClosedReason::Stale);
} else if connection.is_idle(self.max_idle_time) {
self.close_connection(connection, ConnectionClosedReason::Idle);
} else {
self.available_connections.push_front(connection);
break;
};
}
}
fn ensure_min_connections(&mut self) {
if let Some(min_pool_size) = self.min_pool_size {
while self.total_connection_count < min_pool_size
&& self.pending_connection_count < MAX_CONNECTING
{
let pending_connection = self.create_pending_connection();
let event_handler = self.event_emitter.clone();
let manager = self.manager.clone();
let establisher = self.establisher.clone();
let updater = self.server_updater.clone();
let credential = self.credential.clone();
runtime::execute(async move {
let connection = establish_connection(
establisher,
pending_connection,
updater,
&manager,
credential,
event_handler,
)
.await;
if let Ok(connection) = connection {
manager.handle_connection_succeeded(ConnectionSucceeded::ForPool(Box::new(
connection,
)))
}
});
}
}
}
}
async fn establish_connection(
establisher: ConnectionEstablisher,
pending_connection: PendingConnection,
server_updater: TopologyUpdater,
manager: &PoolManager,
credential: Option<Credential>,
event_emitter: CmapEventEmitter,
) -> Result<Connection> {
let connection_id = pending_connection.id;
let address = pending_connection.address.clone();
let mut establish_result = establisher
.establish_connection(pending_connection, credential.as_ref())
.await;
match establish_result {
Err(ref e) => {
server_updater
.handle_application_error(
address.clone(),
e.cause.clone(),
e.handshake_phase.clone(),
)
.await;
event_emitter.emit_event(|| {
ConnectionClosedEvent {
address,
reason: ConnectionClosedReason::Error,
connection_id,
#[cfg(feature = "tracing-unstable")]
error: Some(e.cause.clone()),
}
.into()
});
manager.handle_connection_failed();
}
Ok(ref mut connection) => {
event_emitter.emit_event(|| connection.ready_event().into());
}
}
establish_result.map_err(|e| e.cause)
}
#[derive(Debug)]
enum PoolState {
New,
Paused(Error),
Ready,
}
#[derive(Debug)]
enum PoolTask {
HandleManagementRequest(Box<PoolManagementRequest>),
CheckOut(ConnectionRequest),
Maintenance,
}
impl From<PoolManagementRequest> for PoolTask {
fn from(request: PoolManagementRequest) -> Self {
PoolTask::HandleManagementRequest(Box::new(request))
}
}
#[derive(Debug, Clone)]
pub(crate) enum PoolGeneration {
Normal(u32),
LoadBalanced(HashMap<ObjectId, u32>),
}
impl PoolGeneration {
pub(crate) fn normal() -> Self {
Self::Normal(0)
}
fn load_balanced() -> Self {
Self::LoadBalanced(HashMap::new())
}
#[cfg(test)]
pub(crate) fn as_normal(&self) -> Option<u32> {
match self {
PoolGeneration::Normal(n) => Some(*n),
_ => None,
}
}
}