uxum_pools/async/
pool.rs

1use std::{
2    borrow::Cow,
3    fmt::{Debug, Display},
4    ops::{Deref, DerefMut},
5    sync::Arc,
6    time::{Duration, Instant},
7};
8
9use opentelemetry::KeyValue;
10use parking_lot::Mutex;
11use tracing::{debug_span, Instrument};
12
13use crate::{
14    error::Error,
15    metrics::{pool_kv, Metrics, POOL_METRICS},
16    r#async::InstrumentablePool,
17    resource::InstrumentedResource,
18};
19
20/// Instrumented pool.
21///
22/// Automatically gathers pool-related metrics and provides relevant traces.
23pub struct InstrumentedPool<P> {
24    /// Pool label.
25    label: [KeyValue; 1],
26    /// Linked metrics storage.
27    metrics: Arc<Metrics>,
28    /// Time of last gathering of common pool metrics.
29    last_gathered_at: Mutex<Instant>,
30    /// Original resource pool.
31    pool: P,
32}
33
34impl<P, E> InstrumentedPool<P>
35where
36    P: for<'p> InstrumentablePool<'p, Error = E> + Sync,
37    E: std::error::Error + Send + 'static,
38{
39    /// Instrument provided resource pool.
40    ///
41    /// # Errors
42    ///
43    /// Returns `Err` if there was a problem collecting pool state.
44    pub fn instrument<L: Into<Cow<'static, str>>>(
45        label: Option<L>,
46        pool: P,
47    ) -> Result<Self, Error<E>> {
48        let label = pool_kv(label.map(Into::into));
49        let metrics = POOL_METRICS.deref().clone();
50        metrics.record_state(&label, pool.get_state()?);
51        Ok(Self {
52            label,
53            metrics,
54            last_gathered_at: Mutex::new(Instant::now()),
55            pool,
56        })
57    }
58
59    /// Manually update pool metrics.
60    ///
61    /// Normally you wouldn't need to call this directly, as metrics collection occurs
62    /// automatically as you use the pool.
63    ///
64    /// # Errors
65    ///
66    /// Returns `Err` if there was a problem collecting pool state.
67    pub fn update_metrics(&self) -> Result<(), Error<E>> {
68        // TODO: configure periodic state gathering interval
69        const PROBE_INTERVAL: Duration = Duration::from_secs(15);
70        let mut last_gathered_at = self.last_gathered_at.lock();
71        if last_gathered_at.elapsed() > PROBE_INTERVAL {
72            *last_gathered_at = Instant::now();
73            drop(last_gathered_at);
74            self.metrics
75                .record_state(&self.label, self.pool.get_state()?);
76        }
77        Ok(())
78    }
79
80    /// Internal method to record metrics after resource acquisition.
81    #[inline]
82    fn measure_acquire(&self, before: Instant) -> Result<(), Error<E>> {
83        self.metrics
84            .wait_time
85            .record(before.elapsed().as_secs_f64(), &self.label);
86        self.update_metrics()
87    }
88
89    /// Acquire instrumented resource from resource pool.
90    ///
91    /// # Errors
92    ///
93    /// Returns `Err` if there was a problem acquiring a resource from the pool.
94    pub async fn get(
95        &self,
96    ) -> Result<InstrumentedResource<<P as InstrumentablePool<'_>>::Resource>, Error<E>> {
97        let now = Instant::now();
98        let span = debug_span!("pool_acquire", name = self.label[0].value.as_str().as_ref());
99        let resource = self.pool.get().instrument(span).await?;
100        self.measure_acquire(now)?;
101        Ok(InstrumentedResource::new(
102            self.metrics.clone(),
103            self.label.clone(),
104            resource,
105        ))
106    }
107
108    /// Instantly acquire an instrumented resource from the pool.
109    ///
110    /// # Errors
111    ///
112    /// Returns `Err` if blocking is required, or if this operation is not implemented for this
113    /// pool type.
114    pub fn try_get(
115        &self,
116    ) -> Result<InstrumentedResource<<P as InstrumentablePool<'_>>::Resource>, Error<E>> {
117        let now = Instant::now();
118        let span = debug_span!(
119            "pool_try_acquire",
120            name = self.label[0].value.as_str().as_ref()
121        )
122        .entered();
123        let resource = self.pool.try_get()?;
124        drop(span);
125        self.measure_acquire(now)?;
126        Ok(InstrumentedResource::new(
127            self.metrics.clone(),
128            self.label.clone(),
129            resource,
130        ))
131    }
132
133    /// Try to acquire an instrumented resource from the pool, waiting for a bounded time.
134    ///
135    /// # Errors
136    ///
137    /// Must return [`Error::AcquireTimeout`] if waiting time was exhaused.
138    ///
139    /// Returns [`Error::NotImplemented`] if this operation is not implemented for this pool type.
140    pub async fn get_timeout(
141        &self,
142        timeout: Duration,
143    ) -> Result<InstrumentedResource<<P as InstrumentablePool<'_>>::Resource>, Error<E>> {
144        let now = Instant::now();
145        let span = debug_span!(
146            "pool_timed_acquire",
147            name = self.label[0].value.as_str().as_ref()
148        );
149        let resource = self.pool.get_timeout(timeout).instrument(span).await?;
150        self.measure_acquire(now)?;
151        Ok(InstrumentedResource::new(
152            self.metrics.clone(),
153            self.label.clone(),
154            resource,
155        ))
156    }
157}
158
159impl<P> Deref for InstrumentedPool<P> {
160    type Target = P;
161
162    fn deref(&self) -> &Self::Target {
163        &self.pool
164    }
165}
166
167impl<P> DerefMut for InstrumentedPool<P> {
168    fn deref_mut(&mut self) -> &mut Self::Target {
169        &mut self.pool
170    }
171}
172
173impl<P> AsRef<P> for InstrumentedPool<P> {
174    fn as_ref(&self) -> &P {
175        &self.pool
176    }
177}
178
179impl<P> AsMut<P> for InstrumentedPool<P> {
180    fn as_mut(&mut self) -> &mut P {
181        &mut self.pool
182    }
183}
184
185impl<P: Clone> Clone for InstrumentedPool<P> {
186    fn clone(&self) -> Self {
187        Self {
188            label: self.label.clone(),
189            metrics: self.metrics.clone(),
190            last_gathered_at: Mutex::new(*self.last_gathered_at.lock()),
191            pool: self.pool.clone(),
192        }
193    }
194}
195
196impl<P: Debug> Debug for InstrumentedPool<P> {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        self.pool.fmt(f)
199    }
200}
201
202impl<P: Display> Display for InstrumentedPool<P> {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        self.pool.fmt(f)
205    }
206}