1use tokio::sync::broadcast;
2
3#[cfg(test)]
4use super::options::BackgroundThreadInterval;
5use super::{
6 conn::{pooled::PooledConnection, PendingConnection},
7 connection_requester,
8 connection_requester::{
9 ConnectionRequest,
10 ConnectionRequestReceiver,
11 ConnectionRequestResult,
12 ConnectionRequester,
13 WeakConnectionRequester,
14 },
15 establish::ConnectionEstablisher,
16 manager,
17 manager::{ConnectionSucceeded, ManagementRequestReceiver, PoolManagementRequest, PoolManager},
18 options::ConnectionPoolOptions,
19 status,
20 status::{PoolGenerationPublisher, PoolGenerationSubscriber},
21 DEFAULT_MAX_POOL_SIZE,
22};
23use crate::{
24 bson::oid::ObjectId,
25 client::auth::Credential,
26 error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
27 event::cmap::{
28 CmapEventEmitter,
29 ConnectionClosedEvent,
30 ConnectionClosedReason,
31 PoolClearedEvent,
32 PoolClosedEvent,
33 PoolReadyEvent,
34 },
35 options::ServerAddress,
36 runtime::{self, WorkerHandleListener},
37 sdam::{BroadcastMessage, TopologyUpdater},
38};
39
40use std::{
41 collections::{HashMap, VecDeque},
42 time::{Duration, Instant},
43};
44
45const DEFAULT_MAX_CONNECTING: u32 = 2;
46const MAINTENACE_FREQUENCY: Duration = Duration::from_millis(500);
47
48pub(crate) struct ConnectionPoolWorker {
50 address: ServerAddress,
52
53 state: PoolState,
56
57 total_connection_count: u32,
60
61 pending_connection_count: u32,
63
64 next_connection_id: u32,
66
67 generation: PoolGeneration,
71
72 service_connection_count: HashMap<ObjectId, u32>,
74
75 available_connections: VecDeque<PooledConnection>,
78
79 establisher: ConnectionEstablisher,
82
83 credential: Option<Credential>,
85
86 event_emitter: CmapEventEmitter,
89
90 maintenance_frequency: Duration,
92
93 max_idle_time: Option<Duration>,
98
99 min_pool_size: Option<u32>,
104
105 max_pool_size: u32,
110
111 handle_listener: WorkerHandleListener,
114
115 weak_requester: WeakConnectionRequester,
118
119 request_receiver: ConnectionRequestReceiver,
121
122 wait_queue: VecDeque<ConnectionRequest>,
124
125 management_receiver: ManagementRequestReceiver,
127
128 generation_publisher: PoolGenerationPublisher,
131
132 manager: PoolManager,
134
135 server_updater: TopologyUpdater,
138
139 max_connecting: u32,
141
142 cancellation_sender: Option<broadcast::Sender<()>>,
144}
145
146impl ConnectionPoolWorker {
147 pub(super) fn start(
151 address: ServerAddress,
152 establisher: ConnectionEstablisher,
153 server_updater: TopologyUpdater,
154 event_emitter: CmapEventEmitter,
155 options: Option<ConnectionPoolOptions>,
156 ) -> (PoolManager, ConnectionRequester, PoolGenerationSubscriber) {
157 let mut max_idle_time = options.as_ref().and_then(|opts| opts.max_idle_time);
160 if max_idle_time == Some(Duration::from_millis(0)) {
161 max_idle_time = None;
162 }
163
164 let max_pool_size = options
165 .as_ref()
166 .and_then(|opts| opts.max_pool_size)
167 .unwrap_or(DEFAULT_MAX_POOL_SIZE);
168 let max_connecting = options
169 .as_ref()
170 .and_then(|opts| opts.max_connecting)
171 .unwrap_or(DEFAULT_MAX_CONNECTING);
172
173 let min_pool_size = options.as_ref().and_then(|opts| opts.min_pool_size);
174
175 let (handle, handle_listener) = WorkerHandleListener::channel();
176 let (connection_requester, request_receiver) = connection_requester::channel(handle);
177 let (manager, management_receiver) = manager::channel();
178
179 let is_load_balanced = options
180 .as_ref()
181 .and_then(|opts| opts.load_balanced)
182 .unwrap_or(false);
183 let generation = if is_load_balanced {
184 PoolGeneration::load_balanced()
185 } else {
186 PoolGeneration::normal()
187 };
188 let (generation_publisher, generation_subscriber) = status::channel(generation.clone());
189
190 #[cfg(test)]
191 let mut state = if options
192 .as_ref()
193 .and_then(|opts| opts.ready)
194 .unwrap_or(false)
195 {
196 PoolState::Ready
197 } else {
198 PoolState::New
199 };
200 #[cfg(test)]
201 let maintenance_frequency = options
202 .as_ref()
203 .and_then(|opts| opts.background_thread_interval)
204 .map(|i| match i {
205 BackgroundThreadInterval::Never => Duration::from_secs(31_556_952),
208 BackgroundThreadInterval::Every(d) => d,
209 })
210 .unwrap_or(MAINTENACE_FREQUENCY);
211
212 #[cfg(not(test))]
213 let (mut state, maintenance_frequency) = (PoolState::New, MAINTENACE_FREQUENCY);
214
215 if is_load_balanced {
216 state = PoolState::Ready;
219 }
220
221 let credential = options.and_then(|o| o.credential);
222
223 let cancellation_sender = if !is_load_balanced {
224 Some(broadcast::channel(1).0)
229 } else {
230 None
231 };
232
233 let worker = ConnectionPoolWorker {
234 address,
235 event_emitter,
236 max_idle_time,
237 min_pool_size,
238 credential,
239 establisher,
240 next_connection_id: 1,
241 total_connection_count: 0,
242 pending_connection_count: 0,
243 generation,
244 service_connection_count: HashMap::new(),
245 available_connections: VecDeque::new(),
246 max_pool_size,
247 weak_requester: connection_requester.weak(),
248 request_receiver,
249 wait_queue: Default::default(),
250 management_receiver,
251 manager: manager.clone(),
252 handle_listener,
253 state,
254 generation_publisher,
255 maintenance_frequency,
256 server_updater,
257 max_connecting,
258 cancellation_sender,
259 };
260
261 runtime::spawn(async move {
262 worker.execute().await;
263 });
264
265 (manager, connection_requester, generation_subscriber)
266 }
267
268 async fn execute(mut self) {
272 let mut maintenance_interval = tokio::time::interval(self.maintenance_frequency);
273 let mut shutdown_ack = None;
274
275 loop {
276 let task = tokio::select! {
277 biased;
282
283 Some(request) = self.management_receiver.recv() => request.into(),
284 _ = self.handle_listener.wait_for_all_handle_drops() => {
285 break
288 },
289 Some(request) = self.request_receiver.recv() => {
290 PoolTask::CheckOut(request)
291 },
292 _ = maintenance_interval.tick() => {
293 PoolTask::Maintenance
294 },
295 else => {
296 break
297 }
298 };
299
300 match task {
301 PoolTask::CheckOut(request) => match self.state {
302 PoolState::Ready => {
303 self.wait_queue.push_back(request);
304 }
305 PoolState::Paused(ref e) => {
306 let _ = request.fulfill(ConnectionRequestResult::PoolCleared(e.clone()));
308 }
309 PoolState::New => {
310 let _ = request.fulfill(ConnectionRequestResult::PoolCleared(
311 ErrorKind::Internal {
312 message: "check out attempted from new pool".to_string(),
313 }
314 .into(),
315 ));
316 }
317 },
318 PoolTask::HandleManagementRequest(request) => match *request {
319 PoolManagementRequest::CheckIn(connection) => {
320 self.check_in(*connection);
321 }
322 PoolManagementRequest::Clear {
323 cause, service_id, ..
324 } => {
325 self.clear(cause, service_id);
326 }
327 PoolManagementRequest::MarkAsReady { completion_handler } => {
328 self.mark_as_ready();
329 completion_handler.acknowledge(());
330 }
331 PoolManagementRequest::HandleConnectionSucceeded(conn) => {
332 self.handle_connection_succeeded(conn);
333 }
334 PoolManagementRequest::HandleConnectionFailed => {
335 self.handle_connection_failed();
336 }
337 PoolManagementRequest::Broadcast(msg) => {
338 let (msg, ack) = msg.into_parts();
339 match msg {
340 BroadcastMessage::Shutdown => {
341 shutdown_ack = Some(ack);
342 break;
343 }
344 BroadcastMessage::FillPool => {
345 crate::runtime::spawn(fill_pool(self.weak_requester.clone(), ack));
346 }
347 #[cfg(test)]
348 BroadcastMessage::SyncWorkers => {
349 ack.acknowledge(());
350 }
351 }
352 }
353 },
354 PoolTask::Maintenance => {
355 self.perform_maintenance();
356 }
357 }
358
359 if self.can_service_connection_request() {
360 if let Some(request) = self.wait_queue.pop_front() {
361 self.check_out(request);
362 }
363 }
364 }
365
366 while let Some(connection) = self.available_connections.pop_front() {
367 connection.emit_closed_event(ConnectionClosedReason::PoolClosed);
368 }
369
370 self.event_emitter.emit_event(|| {
371 PoolClosedEvent {
372 address: self.address.clone(),
373 }
374 .into()
375 });
376 if let Some(tx) = shutdown_ack {
377 tx.acknowledge(());
378 }
379 }
380
381 fn below_max_connections(&self) -> bool {
382 self.total_connection_count < self.max_pool_size
383 }
384
385 fn can_service_connection_request(&self) -> bool {
386 if !matches!(self.state, PoolState::Ready) {
387 return false;
388 }
389
390 if !self.available_connections.is_empty() {
391 return true;
392 }
393
394 self.below_max_connections() && self.pending_connection_count < self.max_connecting
395 }
396
397 fn check_out(&mut self, request: ConnectionRequest) {
398 if request.is_warm_pool() {
399 if self.total_connection_count >= self.min_pool_size.unwrap_or(0) {
400 let _ = request.fulfill(ConnectionRequestResult::PoolWarmed);
401 return;
402 }
403 } else {
404 while let Some(mut conn) = self.available_connections.pop_back() {
406 if conn.generation.is_stale(&self.generation) {
408 self.close_connection(conn, ConnectionClosedReason::Stale);
409 continue;
410 }
411
412 if conn.is_idle(self.max_idle_time) {
414 self.close_connection(conn, ConnectionClosedReason::Idle);
415 continue;
416 }
417
418 conn.mark_checked_out(self.manager.clone(), self.get_cancellation_receiver());
419 if let Err(request) =
420 request.fulfill(ConnectionRequestResult::Pooled(Box::new(conn)))
421 {
422 let mut connection = request.unwrap_pooled_connection();
425 connection.mark_checked_in();
426 self.available_connections.push_back(connection);
427 }
428
429 return;
430 }
431 }
432
433 if self.below_max_connections() {
435 let event_emitter = self.event_emitter.clone();
436 let establisher = self.establisher.clone();
437 let pending_connection = self.create_pending_connection();
438 let manager = self.manager.clone();
439 let server_updater = self.server_updater.clone();
440 let credential = self.credential.clone();
441 let cancellation_receiver = self.get_cancellation_receiver();
442
443 let handle = runtime::spawn(async move {
444 let mut establish_result = establish_connection(
445 establisher,
446 pending_connection,
447 server_updater,
448 &manager,
449 credential,
450 event_emitter,
451 )
452 .await;
453
454 if let Ok(ref mut c) = establish_result {
455 c.mark_checked_out(manager.clone(), cancellation_receiver);
456 manager.handle_connection_succeeded(ConnectionSucceeded::Used {
457 service_id: c.generation.service_id(),
458 });
459 }
460
461 establish_result
462 });
463
464 let _: std::result::Result<_, _> =
467 request.fulfill(ConnectionRequestResult::Establishing(handle));
468 } else {
469 self.wait_queue.push_front(request);
472 }
473 }
474
475 fn create_pending_connection(&mut self) -> PendingConnection {
476 self.total_connection_count += 1;
477 self.pending_connection_count += 1;
478
479 let pending_connection = PendingConnection {
480 id: self.next_connection_id,
481 address: self.address.clone(),
482 generation: self.generation.clone(),
483 event_emitter: self.event_emitter.clone(),
484 time_created: Instant::now(),
485 cancellation_receiver: self.get_cancellation_receiver(),
486 };
487 self.next_connection_id += 1;
488 self.event_emitter
489 .emit_event(|| pending_connection.created_event().into());
490
491 pending_connection
492 }
493
494 fn handle_connection_failed(&mut self) {
496 self.total_connection_count -= 1;
499 self.pending_connection_count -= 1;
500 }
501
502 fn handle_connection_succeeded(&mut self, connection: ConnectionSucceeded) {
505 self.pending_connection_count -= 1;
506 if let Some(sid) = connection.service_id() {
507 let count = self.service_connection_count.entry(sid).or_insert(0);
508 *count += 1;
509 }
510 if let ConnectionSucceeded::ForPool(connection) = connection {
511 let mut connection = *connection;
512 connection.mark_checked_in();
513 self.available_connections.push_back(connection);
514 }
515 }
516
517 fn check_in(&mut self, mut conn: PooledConnection) {
518 self.event_emitter
519 .emit_event(|| conn.checked_in_event().into());
520
521 conn.mark_checked_in();
522
523 if conn.has_errored() {
524 self.close_connection(conn, ConnectionClosedReason::Error);
525 } else if conn.generation.is_stale(&self.generation) {
526 self.close_connection(conn, ConnectionClosedReason::Stale);
527 } else if conn.is_executing() {
528 self.close_connection(conn, ConnectionClosedReason::Dropped);
529 } else {
530 self.available_connections.push_back(conn);
531 }
532 }
533
534 fn clear(&mut self, cause: Error, service_id: Option<ObjectId>) {
535 let interrupt_in_use_connections = cause.is_network_timeout();
536 if interrupt_in_use_connections {
537 if let Some(ref cancellation_sender) = self.cancellation_sender {
538 let _ = cancellation_sender.send(());
539 }
540 }
541
542 let was_ready = match (&mut self.generation, service_id) {
543 (PoolGeneration::Normal(gen), None) => {
544 *gen += 1;
545 let prev = std::mem::replace(&mut self.state, PoolState::Paused(cause.clone()));
546 matches!(prev, PoolState::Ready)
547 }
548 (PoolGeneration::LoadBalanced(gen_map), Some(sid)) => {
549 let gen = gen_map.entry(sid).or_insert(0);
550 *gen += 1;
551 true
552 }
553 (..) => load_balanced_mode_mismatch!(),
554 };
555 self.generation_publisher.publish(self.generation.clone());
556
557 if was_ready {
558 self.event_emitter.emit_event(|| {
559 PoolClearedEvent {
560 address: self.address.clone(),
561 service_id,
562 interrupt_in_use_connections,
563 }
564 .into()
565 });
566
567 if !matches!(self.generation, PoolGeneration::LoadBalanced(_)) {
568 for request in self.wait_queue.drain(..) {
569 let _: std::result::Result<_, _> =
572 request.fulfill(ConnectionRequestResult::PoolCleared(cause.clone()));
573 }
574 }
575 }
576 }
577
578 fn mark_as_ready(&mut self) {
579 if matches!(self.state, PoolState::Ready) {
580 return;
581 }
582
583 self.state = PoolState::Ready;
584 self.event_emitter.emit_event(|| {
585 PoolReadyEvent {
586 address: self.address.clone(),
587 }
588 .into()
589 });
590 }
591
592 #[allow(clippy::single_match)]
595 fn close_connection(&mut self, connection: PooledConnection, reason: ConnectionClosedReason) {
596 match (&mut self.generation, connection.generation.service_id()) {
597 (PoolGeneration::LoadBalanced(gen_map), Some(sid)) => {
598 match self.service_connection_count.get_mut(&sid) {
599 Some(count) => {
600 *count -= 1;
601 if *count == 0 {
602 gen_map.remove(&sid);
603 self.service_connection_count.remove(&sid);
604 }
605 }
606 None => load_balanced_mode_mismatch!(),
607 }
608 }
609 (PoolGeneration::Normal(_), None) => {}
610 _ => load_balanced_mode_mismatch!(),
611 }
612 connection.emit_closed_event(reason);
613 self.total_connection_count -= 1;
614 }
615
616 fn perform_maintenance(&mut self) {
619 self.remove_perished_connections();
620 if matches!(self.state, PoolState::Ready) {
621 self.ensure_min_connections();
622 }
623 }
624
625 fn remove_perished_connections(&mut self) {
627 while let Some(connection) = self.available_connections.pop_front() {
628 if connection.generation.is_stale(&self.generation) {
629 self.close_connection(connection, ConnectionClosedReason::Stale);
631 } else if connection.is_idle(self.max_idle_time) {
632 self.close_connection(connection, ConnectionClosedReason::Idle);
633 } else {
634 self.available_connections.push_front(connection);
635 break;
638 };
639 }
640 }
641
642 fn ensure_min_connections(&mut self) {
644 if let Some(min_pool_size) = self.min_pool_size {
645 while self.total_connection_count < min_pool_size
646 && self.pending_connection_count < self.max_connecting
647 {
648 let pending_connection = self.create_pending_connection();
649 let event_handler = self.event_emitter.clone();
650 let manager = self.manager.clone();
651 let establisher = self.establisher.clone();
652 let updater = self.server_updater.clone();
653 let credential = self.credential.clone();
654
655 runtime::spawn(async move {
656 let connection = establish_connection(
657 establisher,
658 pending_connection,
659 updater,
660 &manager,
661 credential,
662 event_handler,
663 )
664 .await;
665
666 if let Ok(connection) = connection {
667 manager.handle_connection_succeeded(ConnectionSucceeded::ForPool(Box::new(
668 connection,
669 )))
670 }
671 });
672 }
673 }
674 }
675
676 fn get_cancellation_receiver(&self) -> Option<broadcast::Receiver<()>> {
679 self.cancellation_sender
680 .as_ref()
681 .map(|sender| sender.subscribe())
682 }
683}
684
685async fn establish_connection(
689 establisher: ConnectionEstablisher,
690 pending_connection: PendingConnection,
691 server_updater: TopologyUpdater,
692 manager: &PoolManager,
693 credential: Option<Credential>,
694 event_emitter: CmapEventEmitter,
695) -> Result<PooledConnection> {
696 let connection_id = pending_connection.id;
697 let address = pending_connection.address.clone();
698
699 let mut establish_result = establisher
700 .establish_connection(pending_connection, credential.as_ref())
701 .await;
702
703 match establish_result {
704 Err(ref e) => {
705 server_updater
706 .handle_application_error(
707 address.clone(),
708 e.cause.clone(),
709 e.handshake_phase.clone(),
710 )
711 .await;
712 event_emitter.emit_event(|| {
713 ConnectionClosedEvent {
714 address,
715 reason: ConnectionClosedReason::Error,
716 connection_id,
717 #[cfg(feature = "tracing-unstable")]
718 error: Some(e.cause.clone()),
719 }
720 .into()
721 });
722 manager.handle_connection_failed();
723 }
724 Ok(ref mut connection) => {
725 event_emitter.emit_event(|| connection.ready_event().into());
726 }
727 }
728
729 establish_result.map_err(|e| e.cause)
730}
731
732async fn fill_pool(
733 requester: WeakConnectionRequester,
734 ack: crate::runtime::AcknowledgmentSender<()>,
735) {
736 let mut establishing = vec![];
737 loop {
738 let result = requester.request_warm_pool().await;
739 match result {
740 None => break,
741 Some(ConnectionRequestResult::Establishing(handle)) => {
742 establishing.push(crate::runtime::spawn(async move {
744 let _ = handle.await;
745 }));
747 }
748 _ => break,
749 };
750 }
751 futures_util::future::join_all(establishing).await;
753 ack.acknowledge(());
754}
755
756#[derive(Debug)]
761enum PoolState {
762 New,
764
765 Paused(Error),
767
768 Ready,
770}
771
772#[derive(Debug)]
774enum PoolTask {
775 HandleManagementRequest(Box<PoolManagementRequest>),
777
778 CheckOut(ConnectionRequest),
780
781 Maintenance,
783}
784
785impl From<PoolManagementRequest> for PoolTask {
786 fn from(request: PoolManagementRequest) -> Self {
787 PoolTask::HandleManagementRequest(Box::new(request))
788 }
789}
790
791#[derive(Debug, Clone)]
792pub(crate) enum PoolGeneration {
793 Normal(u32),
794 LoadBalanced(HashMap<ObjectId, u32>),
795}
796
797impl PoolGeneration {
798 pub(crate) fn normal() -> Self {
799 Self::Normal(0)
800 }
801
802 fn load_balanced() -> Self {
803 Self::LoadBalanced(HashMap::new())
804 }
805
806 #[cfg(test)]
807 pub(crate) fn as_normal(&self) -> Option<u32> {
808 match self {
809 PoolGeneration::Normal(n) => Some(*n),
810 _ => None,
811 }
812 }
813}