Skip to main content

mongodb/cmap/
worker.rs

1use tokio::sync::broadcast;
2
3#[cfg(test)]
4use super::options::BackgroundThreadInterval;
5use super::{
6    conn::{pooled::PooledConnection, PendingConnection},
7    connection_requester,
8    connection_requester::{
9        ConnectionRequest,
10        ConnectionRequestReceiver,
11        ConnectionRequestResult,
12        ConnectionRequester,
13        WeakConnectionRequester,
14    },
15    establish::ConnectionEstablisher,
16    manager,
17    manager::{ConnectionSucceeded, ManagementRequestReceiver, PoolManagementRequest, PoolManager},
18    options::ConnectionPoolOptions,
19    status,
20    status::{PoolGenerationPublisher, PoolGenerationSubscriber},
21    DEFAULT_MAX_POOL_SIZE,
22};
23use crate::{
24    bson::oid::ObjectId,
25    client::auth::Credential,
26    error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
27    event::cmap::{
28        CmapEventEmitter,
29        ConnectionClosedEvent,
30        ConnectionClosedReason,
31        PoolClearedEvent,
32        PoolClosedEvent,
33        PoolReadyEvent,
34    },
35    options::ServerAddress,
36    runtime::{self, WorkerHandleListener},
37    sdam::{BroadcastMessage, TopologyUpdater},
38};
39
40use std::{
41    collections::{HashMap, VecDeque},
42    time::{Duration, Instant},
43};
44
45const DEFAULT_MAX_CONNECTING: u32 = 2;
46const MAINTENACE_FREQUENCY: Duration = Duration::from_millis(500);
47
48/// A worker task that manages the shared state of the pool.
49pub(crate) struct ConnectionPoolWorker {
50    /// The address the pool's connections will connect to.
51    address: ServerAddress,
52
53    /// Current state of the pool. Determines if connections may be checked out
54    /// and if min_pool_size connection creation should continue.
55    state: PoolState,
56
57    /// The total number of connections managed by the pool, including connections which are
58    /// currently checked out of the pool or have yet to be established.
59    total_connection_count: u32,
60
61    /// The number of connections currently being established by this pool.
62    pending_connection_count: u32,
63
64    /// The ID of the next connection created by the pool.
65    next_connection_id: u32,
66
67    /// The current generation of the pool. The generation is incremented whenever the pool is
68    /// cleared. Connections belonging to a previous generation are considered stale and will be
69    /// closed when checked back in or when popped off of the set of available connections.
70    generation: PoolGeneration,
71
72    /// The connection count for each serviceId in load-balanced mode.
73    service_connection_count: HashMap<ObjectId, u32>,
74
75    /// The established connections that are currently checked into the pool and awaiting usage in
76    /// future operations.
77    available_connections: VecDeque<PooledConnection>,
78
79    /// Contains the logic for "establishing" a connection. This includes handshaking and
80    /// authenticating a connection when it's first created.
81    establisher: ConnectionEstablisher,
82
83    /// The credential used to authenticate connections, if any.
84    credential: Option<Credential>,
85
86    /// The type responsible for emitting CMAP events both to an optional user-specified handler
87    /// and as tracing events.
88    event_emitter: CmapEventEmitter,
89
90    /// The time between maintenance tasks.
91    maintenance_frequency: Duration,
92
93    /// Connections that have been ready for usage in the pool for longer than `max_idle_time` will
94    /// be closed either by the background thread or when popped off of the set of available
95    /// connections. If `max_idle_time` is `None`, then connections will not be closed due to being
96    /// idle.
97    max_idle_time: Option<Duration>,
98
99    /// The minimum number of connections that the pool can have at a given time. This includes
100    /// connections which are currently checked out of the pool. If fewer than `min_pool_size`
101    /// connections are in the pool, the background thread will create more connections and add
102    /// them to the pool.
103    min_pool_size: Option<u32>,
104
105    /// The maximum number of connections that the pool can manage, including connections checked
106    /// out of the pool. If a thread requests a connection and the pool is empty + there are
107    /// already max_pool_size connections in use, it will block until one is returned or the
108    /// wait_queue_timeout is exceeded.
109    max_pool_size: u32,
110
111    /// Receiver used to determine if any threads hold references to this pool. If all the
112    /// sender ends of this receiver drop, this worker will be notified and drop too.
113    handle_listener: WorkerHandleListener,
114
115    /// Sender for connection check out requests.  Does not keep the worker alive the way
116    /// a `ConnectionRequeter` would since it doesn't hold a `WorkerHandle`.
117    weak_requester: WeakConnectionRequester,
118
119    /// Receiver for incoming connection check out requests.
120    request_receiver: ConnectionRequestReceiver,
121
122    /// Ordered queue of incoming requests waiting for connections.
123    wait_queue: VecDeque<ConnectionRequest>,
124
125    /// Receiver for incoming pool management requests (e.g. checking in a connection).
126    management_receiver: ManagementRequestReceiver,
127
128    /// Sender used to publish the latest generation and notifications for any establishment errors
129    /// encountered by the pool.
130    generation_publisher: PoolGenerationPublisher,
131
132    /// A pool manager that can be cloned and attached to connections checked out of the pool.
133    manager: PoolManager,
134
135    /// A handle used to notify SDAM that a connection establishment error happened. This will
136    /// allow the server to transition to Unknown and clear the pool as necessary.
137    server_updater: TopologyUpdater,
138
139    /// The maximum number of new connections that can be created concurrently.
140    max_connecting: u32,
141
142    /// Sender used to broadcast cancellation notices to checked-out connections.
143    cancellation_sender: Option<broadcast::Sender<()>>,
144}
145
146impl ConnectionPoolWorker {
147    /// Starts a worker and returns a manager and connection requester.
148    /// Once all connection requesters are dropped, the worker will stop executing
149    /// and close the pool.
150    pub(super) fn start(
151        address: ServerAddress,
152        establisher: ConnectionEstablisher,
153        server_updater: TopologyUpdater,
154        event_emitter: CmapEventEmitter,
155        options: Option<ConnectionPoolOptions>,
156    ) -> (PoolManager, ConnectionRequester, PoolGenerationSubscriber) {
157        // The CMAP spec indicates that a max idle time of zero means that connections should not be
158        // closed due to idleness.
159        let mut max_idle_time = options.as_ref().and_then(|opts| opts.max_idle_time);
160        if max_idle_time == Some(Duration::from_millis(0)) {
161            max_idle_time = None;
162        }
163
164        let max_pool_size = options
165            .as_ref()
166            .and_then(|opts| opts.max_pool_size)
167            .unwrap_or(DEFAULT_MAX_POOL_SIZE);
168        let max_connecting = options
169            .as_ref()
170            .and_then(|opts| opts.max_connecting)
171            .unwrap_or(DEFAULT_MAX_CONNECTING);
172
173        let min_pool_size = options.as_ref().and_then(|opts| opts.min_pool_size);
174
175        let (handle, handle_listener) = WorkerHandleListener::channel();
176        let (connection_requester, request_receiver) = connection_requester::channel(handle);
177        let (manager, management_receiver) = manager::channel();
178
179        let is_load_balanced = options
180            .as_ref()
181            .and_then(|opts| opts.load_balanced)
182            .unwrap_or(false);
183        let generation = if is_load_balanced {
184            PoolGeneration::load_balanced()
185        } else {
186            PoolGeneration::normal()
187        };
188        let (generation_publisher, generation_subscriber) = status::channel(generation.clone());
189
190        #[cfg(test)]
191        let mut state = if options
192            .as_ref()
193            .and_then(|opts| opts.ready)
194            .unwrap_or(false)
195        {
196            PoolState::Ready
197        } else {
198            PoolState::New
199        };
200        #[cfg(test)]
201        let maintenance_frequency = options
202            .as_ref()
203            .and_then(|opts| opts.background_thread_interval)
204            .map(|i| match i {
205                // One year is long enough to count as never for tests, but not so long that it
206                // will overflow interval math.
207                BackgroundThreadInterval::Never => Duration::from_secs(31_556_952),
208                BackgroundThreadInterval::Every(d) => d,
209            })
210            .unwrap_or(MAINTENACE_FREQUENCY);
211
212        #[cfg(not(test))]
213        let (mut state, maintenance_frequency) = (PoolState::New, MAINTENACE_FREQUENCY);
214
215        if is_load_balanced {
216            // Because load balancer servers don't have a monitoring connection, the associated
217            // connection pool needs start in the ready state.
218            state = PoolState::Ready;
219        }
220
221        let credential = options.and_then(|o| o.credential);
222
223        let cancellation_sender = if !is_load_balanced {
224            // There's not necessarily an upper bound on the number of messages that could exist in
225            // this channel; however, connections use both successfully receiving a message in the
226            // channel and receiving a lagged error as an indication that cancellation should occur,
227            // so we use an artificial bound of one message.
228            Some(broadcast::channel(1).0)
229        } else {
230            None
231        };
232
233        let worker = ConnectionPoolWorker {
234            address,
235            event_emitter,
236            max_idle_time,
237            min_pool_size,
238            credential,
239            establisher,
240            next_connection_id: 1,
241            total_connection_count: 0,
242            pending_connection_count: 0,
243            generation,
244            service_connection_count: HashMap::new(),
245            available_connections: VecDeque::new(),
246            max_pool_size,
247            weak_requester: connection_requester.weak(),
248            request_receiver,
249            wait_queue: Default::default(),
250            management_receiver,
251            manager: manager.clone(),
252            handle_listener,
253            state,
254            generation_publisher,
255            maintenance_frequency,
256            server_updater,
257            max_connecting,
258            cancellation_sender,
259        };
260
261        runtime::spawn(async move {
262            worker.execute().await;
263        });
264
265        (manager, connection_requester, generation_subscriber)
266    }
267
268    /// Run the worker thread, listening on the various receivers until all handles have been
269    /// dropped. Once all handles are dropped, the pool will close any available connections and
270    /// emit a pool closed event.
271    async fn execute(mut self) {
272        let mut maintenance_interval = tokio::time::interval(self.maintenance_frequency);
273        let mut shutdown_ack = None;
274
275        loop {
276            let task = tokio::select! {
277                // This marker indicates that the futures will be polled in order from top to
278                // bottom in this select! macro. We use this to ensure checkIn, clear,
279                // and ready always have priority over checkout requests. The pool
280                // exiting also has priority.
281                biased;
282
283                Some(request) = self.management_receiver.recv() => request.into(),
284                _ = self.handle_listener.wait_for_all_handle_drops() => {
285                    // all worker handles have been dropped meaning this
286                    // pool has no more references and can be dropped itself.
287                    break
288                },
289                Some(request) = self.request_receiver.recv() => {
290                    PoolTask::CheckOut(request)
291                },
292                _ = maintenance_interval.tick() => {
293                    PoolTask::Maintenance
294                },
295                else => {
296                    break
297                }
298            };
299
300            match task {
301                PoolTask::CheckOut(request) => match self.state {
302                    PoolState::Ready => {
303                        self.wait_queue.push_back(request);
304                    }
305                    PoolState::Paused(ref e) => {
306                        // if receiver doesn't listen to error that's ok.
307                        let _ = request.fulfill(ConnectionRequestResult::PoolCleared(e.clone()));
308                    }
309                    PoolState::New => {
310                        let _ = request.fulfill(ConnectionRequestResult::PoolCleared(
311                            ErrorKind::Internal {
312                                message: "check out attempted from new pool".to_string(),
313                            }
314                            .into(),
315                        ));
316                    }
317                },
318                PoolTask::HandleManagementRequest(request) => match *request {
319                    PoolManagementRequest::CheckIn(connection) => {
320                        self.check_in(*connection);
321                    }
322                    PoolManagementRequest::Clear {
323                        cause, service_id, ..
324                    } => {
325                        self.clear(cause, service_id);
326                    }
327                    PoolManagementRequest::MarkAsReady { completion_handler } => {
328                        self.mark_as_ready();
329                        completion_handler.acknowledge(());
330                    }
331                    PoolManagementRequest::HandleConnectionSucceeded(conn) => {
332                        self.handle_connection_succeeded(conn);
333                    }
334                    PoolManagementRequest::HandleConnectionFailed => {
335                        self.handle_connection_failed();
336                    }
337                    PoolManagementRequest::Broadcast(msg) => {
338                        let (msg, ack) = msg.into_parts();
339                        match msg {
340                            BroadcastMessage::Shutdown => {
341                                shutdown_ack = Some(ack);
342                                break;
343                            }
344                            BroadcastMessage::FillPool => {
345                                crate::runtime::spawn(fill_pool(self.weak_requester.clone(), ack));
346                            }
347                            #[cfg(test)]
348                            BroadcastMessage::SyncWorkers => {
349                                ack.acknowledge(());
350                            }
351                        }
352                    }
353                },
354                PoolTask::Maintenance => {
355                    self.perform_maintenance();
356                }
357            }
358
359            if self.can_service_connection_request() {
360                if let Some(request) = self.wait_queue.pop_front() {
361                    self.check_out(request);
362                }
363            }
364        }
365
366        while let Some(connection) = self.available_connections.pop_front() {
367            connection.emit_closed_event(ConnectionClosedReason::PoolClosed);
368        }
369
370        self.event_emitter.emit_event(|| {
371            PoolClosedEvent {
372                address: self.address.clone(),
373            }
374            .into()
375        });
376        if let Some(tx) = shutdown_ack {
377            tx.acknowledge(());
378        }
379    }
380
381    fn below_max_connections(&self) -> bool {
382        self.total_connection_count < self.max_pool_size
383    }
384
385    fn can_service_connection_request(&self) -> bool {
386        if !matches!(self.state, PoolState::Ready) {
387            return false;
388        }
389
390        if !self.available_connections.is_empty() {
391            return true;
392        }
393
394        self.below_max_connections() && self.pending_connection_count < self.max_connecting
395    }
396
397    fn check_out(&mut self, request: ConnectionRequest) {
398        if request.is_warm_pool() {
399            if self.total_connection_count >= self.min_pool_size.unwrap_or(0) {
400                let _ = request.fulfill(ConnectionRequestResult::PoolWarmed);
401                return;
402            }
403        } else {
404            // first attempt to check out an available connection
405            while let Some(mut conn) = self.available_connections.pop_back() {
406                // Close the connection if it's stale.
407                if conn.generation.is_stale(&self.generation) {
408                    self.close_connection(conn, ConnectionClosedReason::Stale);
409                    continue;
410                }
411
412                // Close the connection if it's idle.
413                if conn.is_idle(self.max_idle_time) {
414                    self.close_connection(conn, ConnectionClosedReason::Idle);
415                    continue;
416                }
417
418                conn.mark_checked_out(self.manager.clone(), self.get_cancellation_receiver());
419                if let Err(request) =
420                    request.fulfill(ConnectionRequestResult::Pooled(Box::new(conn)))
421                {
422                    // checking out thread stopped listening, indicating it hit the WaitQueue
423                    // timeout, so we put connection back into pool.
424                    let mut connection = request.unwrap_pooled_connection();
425                    connection.mark_checked_in();
426                    self.available_connections.push_back(connection);
427                }
428
429                return;
430            }
431        }
432
433        // otherwise, attempt to create a connection.
434        if self.below_max_connections() {
435            let event_emitter = self.event_emitter.clone();
436            let establisher = self.establisher.clone();
437            let pending_connection = self.create_pending_connection();
438            let manager = self.manager.clone();
439            let server_updater = self.server_updater.clone();
440            let credential = self.credential.clone();
441            let cancellation_receiver = self.get_cancellation_receiver();
442
443            let handle = runtime::spawn(async move {
444                let mut establish_result = establish_connection(
445                    establisher,
446                    pending_connection,
447                    server_updater,
448                    &manager,
449                    credential,
450                    event_emitter,
451                )
452                .await;
453
454                if let Ok(ref mut c) = establish_result {
455                    c.mark_checked_out(manager.clone(), cancellation_receiver);
456                    manager.handle_connection_succeeded(ConnectionSucceeded::Used {
457                        service_id: c.generation.service_id(),
458                    });
459                }
460
461                establish_result
462            });
463
464            // this only fails if the other end stopped listening (e.g. due to timeout), in
465            // which case we just let the connection establish in the background.
466            let _: std::result::Result<_, _> =
467                request.fulfill(ConnectionRequestResult::Establishing(handle));
468        } else {
469            // put the request to the the front of the wait queue so that it will be processed
470            // next time a request can be processed.
471            self.wait_queue.push_front(request);
472        }
473    }
474
475    fn create_pending_connection(&mut self) -> PendingConnection {
476        self.total_connection_count += 1;
477        self.pending_connection_count += 1;
478
479        let pending_connection = PendingConnection {
480            id: self.next_connection_id,
481            address: self.address.clone(),
482            generation: self.generation.clone(),
483            event_emitter: self.event_emitter.clone(),
484            time_created: Instant::now(),
485            cancellation_receiver: self.get_cancellation_receiver(),
486        };
487        self.next_connection_id += 1;
488        self.event_emitter
489            .emit_event(|| pending_connection.created_event().into());
490
491        pending_connection
492    }
493
494    /// Process a connection establishment failure.
495    fn handle_connection_failed(&mut self) {
496        // Establishing a pending connection failed, so that must be reflected in to total
497        // connection count.
498        self.total_connection_count -= 1;
499        self.pending_connection_count -= 1;
500    }
501
502    /// Process a successful connection establishment, optionally populating the pool with the
503    /// resulting connection.
504    fn handle_connection_succeeded(&mut self, connection: ConnectionSucceeded) {
505        self.pending_connection_count -= 1;
506        if let Some(sid) = connection.service_id() {
507            let count = self.service_connection_count.entry(sid).or_insert(0);
508            *count += 1;
509        }
510        if let ConnectionSucceeded::ForPool(connection) = connection {
511            let mut connection = *connection;
512            connection.mark_checked_in();
513            self.available_connections.push_back(connection);
514        }
515    }
516
517    fn check_in(&mut self, mut conn: PooledConnection) {
518        self.event_emitter
519            .emit_event(|| conn.checked_in_event().into());
520
521        conn.mark_checked_in();
522
523        if conn.has_errored() {
524            self.close_connection(conn, ConnectionClosedReason::Error);
525        } else if conn.generation.is_stale(&self.generation) {
526            self.close_connection(conn, ConnectionClosedReason::Stale);
527        } else if conn.is_executing() {
528            self.close_connection(conn, ConnectionClosedReason::Dropped);
529        } else {
530            self.available_connections.push_back(conn);
531        }
532    }
533
534    fn clear(&mut self, cause: Error, service_id: Option<ObjectId>) {
535        let interrupt_in_use_connections = cause.is_network_timeout();
536        if interrupt_in_use_connections {
537            if let Some(ref cancellation_sender) = self.cancellation_sender {
538                let _ = cancellation_sender.send(());
539            }
540        }
541
542        let was_ready = match (&mut self.generation, service_id) {
543            (PoolGeneration::Normal(gen), None) => {
544                *gen += 1;
545                let prev = std::mem::replace(&mut self.state, PoolState::Paused(cause.clone()));
546                matches!(prev, PoolState::Ready)
547            }
548            (PoolGeneration::LoadBalanced(gen_map), Some(sid)) => {
549                let gen = gen_map.entry(sid).or_insert(0);
550                *gen += 1;
551                true
552            }
553            (..) => load_balanced_mode_mismatch!(),
554        };
555        self.generation_publisher.publish(self.generation.clone());
556
557        if was_ready {
558            self.event_emitter.emit_event(|| {
559                PoolClearedEvent {
560                    address: self.address.clone(),
561                    service_id,
562                    interrupt_in_use_connections,
563                }
564                .into()
565            });
566
567            if !matches!(self.generation, PoolGeneration::LoadBalanced(_)) {
568                for request in self.wait_queue.drain(..) {
569                    // an error means the other end hung up already, which is okay because we were
570                    // returning an error anyways
571                    let _: std::result::Result<_, _> =
572                        request.fulfill(ConnectionRequestResult::PoolCleared(cause.clone()));
573                }
574            }
575        }
576    }
577
578    fn mark_as_ready(&mut self) {
579        if matches!(self.state, PoolState::Ready) {
580            return;
581        }
582
583        self.state = PoolState::Ready;
584        self.event_emitter.emit_event(|| {
585            PoolReadyEvent {
586                address: self.address.clone(),
587            }
588            .into()
589        });
590    }
591
592    /// Close a connection, emit the event for it being closed, and decrement the
593    /// total connection count.
594    #[allow(clippy::single_match)]
595    fn close_connection(&mut self, connection: PooledConnection, reason: ConnectionClosedReason) {
596        match (&mut self.generation, connection.generation.service_id()) {
597            (PoolGeneration::LoadBalanced(gen_map), Some(sid)) => {
598                match self.service_connection_count.get_mut(&sid) {
599                    Some(count) => {
600                        *count -= 1;
601                        if *count == 0 {
602                            gen_map.remove(&sid);
603                            self.service_connection_count.remove(&sid);
604                        }
605                    }
606                    None => load_balanced_mode_mismatch!(),
607                }
608            }
609            (PoolGeneration::Normal(_), None) => {}
610            _ => load_balanced_mode_mismatch!(),
611        }
612        connection.emit_closed_event(reason);
613        self.total_connection_count -= 1;
614    }
615
616    /// Ensure all connections in the pool are valid and that the pool is managing at least
617    /// min_pool_size connections.
618    fn perform_maintenance(&mut self) {
619        self.remove_perished_connections();
620        if matches!(self.state, PoolState::Ready) {
621            self.ensure_min_connections();
622        }
623    }
624
625    /// Iterate over the connections and remove any that are stale or idle.
626    fn remove_perished_connections(&mut self) {
627        while let Some(connection) = self.available_connections.pop_front() {
628            if connection.generation.is_stale(&self.generation) {
629                // the following unwrap is okay becaue we asserted the pool was nonempty
630                self.close_connection(connection, ConnectionClosedReason::Stale);
631            } else if connection.is_idle(self.max_idle_time) {
632                self.close_connection(connection, ConnectionClosedReason::Idle);
633            } else {
634                self.available_connections.push_front(connection);
635                // All subsequent connections are either not idle or not stale since they were
636                // checked into the pool later, so we can just quit early.
637                break;
638            };
639        }
640    }
641
642    /// Populate the the pool with enough connections to meet the min_pool_size_requirement.
643    fn ensure_min_connections(&mut self) {
644        if let Some(min_pool_size) = self.min_pool_size {
645            while self.total_connection_count < min_pool_size
646                && self.pending_connection_count < self.max_connecting
647            {
648                let pending_connection = self.create_pending_connection();
649                let event_handler = self.event_emitter.clone();
650                let manager = self.manager.clone();
651                let establisher = self.establisher.clone();
652                let updater = self.server_updater.clone();
653                let credential = self.credential.clone();
654
655                runtime::spawn(async move {
656                    let connection = establish_connection(
657                        establisher,
658                        pending_connection,
659                        updater,
660                        &manager,
661                        credential,
662                        event_handler,
663                    )
664                    .await;
665
666                    if let Ok(connection) = connection {
667                        manager.handle_connection_succeeded(ConnectionSucceeded::ForPool(Box::new(
668                            connection,
669                        )))
670                    }
671                });
672            }
673        }
674    }
675
676    /// Returns a receiver for the pool's cancellation sender if this pool is not in load-balanced
677    /// mode. The returned receiver will only receive messages sent after this method is called.
678    fn get_cancellation_receiver(&self) -> Option<broadcast::Receiver<()>> {
679        self.cancellation_sender
680            .as_ref()
681            .map(|sender| sender.subscribe())
682    }
683}
684
685/// Helper covering the common connection establishment behavior between
686/// connections established in check_out and those established as part of
687/// satisfying min_pool_size.
688async fn establish_connection(
689    establisher: ConnectionEstablisher,
690    pending_connection: PendingConnection,
691    server_updater: TopologyUpdater,
692    manager: &PoolManager,
693    credential: Option<Credential>,
694    event_emitter: CmapEventEmitter,
695) -> Result<PooledConnection> {
696    let connection_id = pending_connection.id;
697    let address = pending_connection.address.clone();
698
699    let mut establish_result = establisher
700        .establish_connection(pending_connection, credential.as_ref())
701        .await;
702
703    match establish_result {
704        Err(ref e) => {
705            server_updater
706                .handle_application_error(
707                    address.clone(),
708                    e.cause.clone(),
709                    e.handshake_phase.clone(),
710                )
711                .await;
712            event_emitter.emit_event(|| {
713                ConnectionClosedEvent {
714                    address,
715                    reason: ConnectionClosedReason::Error,
716                    connection_id,
717                    #[cfg(feature = "tracing-unstable")]
718                    error: Some(e.cause.clone()),
719                }
720                .into()
721            });
722            manager.handle_connection_failed();
723        }
724        Ok(ref mut connection) => {
725            event_emitter.emit_event(|| connection.ready_event().into());
726        }
727    }
728
729    establish_result.map_err(|e| e.cause)
730}
731
732async fn fill_pool(
733    requester: WeakConnectionRequester,
734    ack: crate::runtime::AcknowledgmentSender<()>,
735) {
736    let mut establishing = vec![];
737    loop {
738        let result = requester.request_warm_pool().await;
739        match result {
740            None => break,
741            Some(ConnectionRequestResult::Establishing(handle)) => {
742                // Let connections finish establishing in parallel.
743                establishing.push(crate::runtime::spawn(async move {
744                    let _ = handle.await;
745                    // The connection is dropped here, returning it to the pool.
746                }));
747            }
748            _ => break,
749        };
750    }
751    // Wait for all connections to finish establishing before reporting completion.
752    futures_util::future::join_all(establishing).await;
753    ack.acknowledge(());
754}
755
756/// Enum modeling the possible pool states as described in the CMAP spec.
757///
758/// The "closed" state is omitted here because the pool considered closed only
759/// once it goes out of scope and cannot be manually closed before then.
760#[derive(Debug)]
761enum PoolState {
762    /// Same as Paused, but only for a new pool, not one that has been cleared due to an error.
763    New,
764
765    /// Connections may not be checked out nor created in the background to satisfy minPoolSize.
766    Paused(Error),
767
768    /// Pool is operational.
769    Ready,
770}
771
772/// Task to process by the worker.
773#[derive(Debug)]
774enum PoolTask {
775    /// Handle a management request from a `PoolManager`.
776    HandleManagementRequest(Box<PoolManagementRequest>),
777
778    /// Fulfill the given connection request.
779    CheckOut(ConnectionRequest),
780
781    /// Perform pool maintenance (ensure min connections, remove stale or idle connections).
782    Maintenance,
783}
784
785impl From<PoolManagementRequest> for PoolTask {
786    fn from(request: PoolManagementRequest) -> Self {
787        PoolTask::HandleManagementRequest(Box::new(request))
788    }
789}
790
791#[derive(Debug, Clone)]
792pub(crate) enum PoolGeneration {
793    Normal(u32),
794    LoadBalanced(HashMap<ObjectId, u32>),
795}
796
797impl PoolGeneration {
798    pub(crate) fn normal() -> Self {
799        Self::Normal(0)
800    }
801
802    fn load_balanced() -> Self {
803        Self::LoadBalanced(HashMap::new())
804    }
805
806    #[cfg(test)]
807    pub(crate) fn as_normal(&self) -> Option<u32> {
808        match self {
809            PoolGeneration::Normal(n) => Some(*n),
810            _ => None,
811        }
812    }
813}