strut_database/
connector.rs

1use crate::repr::handle::Handle;
2use sqlx_core::database::Database;
3use sqlx_core::pool::Pool;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use strut_core::{AppContext, AppSpindown, AppSpindownToken};
7use tracing::info;
8
9/// Runs in the background, holds a copy of `sqlx` database connection [`Pool`]
10/// created from the given [`Handle`], and closes the pooled connections once
11/// the global [`AppContext`] is terminated.
12///
13/// In all fairness, this [`Connector`] does no “connecting” whatsoever: it
14/// merely lazily initializes a [`Pool`], holds a copy of it, and returns
15/// another copy to the caller. All database connectivity logic is implemented
16/// by the pool itself. The main purpose of this connector is to clean up during
17/// [`AppSpindown`].
18pub struct Connector<DB>
19where
20    DB: Database,
21{
22    /// The globally unique name of this connector, for logging/debugging
23    /// purposes.
24    name: Arc<str>,
25    /// The identifier of this connector’s [`Handle`], for logging/debugging
26    /// purposes.
27    identifier: Arc<str>,
28    /// The pool of connections that this connector holds.
29    pool: Pool<DB>,
30    /// The canary token, which (once it goes out of scope) will inform the application
31    /// that this connector gracefully completed.
32    _spindown_token: AppSpindownToken,
33}
34
35impl<DB> Connector<DB>
36where
37    DB: Database,
38{
39    /// Creates a new [`Connector`] for the given [`Handle`] and sends it into
40    /// background to eventually close the pooled database connections once the
41    /// global [`AppContext`] is terminated.
42    ///
43    /// The returned [`Pool`] may be cloned and re-used as necessary.
44    pub fn start<H>(handle: H) -> Pool<DB>
45    where
46        H: Handle<Database = DB>,
47    {
48        let name = Self::compose_name(&handle);
49        let identifier = Arc::from(handle.identifier());
50        let (connect_options, pool_options) = handle.destruct();
51        let pool = pool_options.connect_lazy_with(connect_options);
52        let pool_to_return = pool.clone();
53        let _spindown_token = AppSpindown::register(&name);
54
55        let connector = Self {
56            name,
57            identifier,
58            pool,
59            _spindown_token,
60        };
61
62        tokio::spawn(connector.stand_by());
63
64        pool_to_return
65    }
66
67    /// Composes a human-readable name for this connector.
68    fn compose_name(handle: &impl Handle) -> Arc<str> {
69        static COUNTER: AtomicUsize = AtomicUsize::new(0);
70
71        Arc::from(format!(
72            "database:connector:{}:{}",
73            handle.name(),
74            COUNTER.fetch_add(1, Ordering::Relaxed),
75        ))
76    }
77}
78
79impl<DB> Connector<DB>
80where
81    DB: Database,
82{
83    /// Main, long-running function waits until the global [`AppContext`] is
84    /// terminated. After that it falls into the spindown phase, where it cleans
85    /// up before returning.
86    async fn stand_by(self) {
87        // Wait for the global context to terminate
88        AppContext::terminated().await;
89
90        // Announce spindown
91        info!(
92            name = self.name.as_ref(),
93            identifier = self.identifier.as_ref(),
94            "Closing the database connection pool",
95        );
96
97        // Close database connections
98        self.pool.close().await;
99    }
100}