1use std::{
2 borrow::Cow,
3 fmt::{Debug, Display},
4 ops::{Deref, DerefMut},
5 sync::Arc,
6 time::{Duration, Instant},
7};
8
9use opentelemetry::KeyValue;
10use parking_lot::Mutex;
11use tracing::{debug_span, Instrument};
12
13use crate::{
14 error::Error,
15 metrics::{pool_kv, Metrics, POOL_METRICS},
16 r#async::InstrumentablePool,
17 resource::InstrumentedResource,
18};
19
20pub struct InstrumentedPool<P> {
24 label: [KeyValue; 1],
26 metrics: Arc<Metrics>,
28 last_gathered_at: Mutex<Instant>,
30 pool: P,
32}
33
34impl<P, E> InstrumentedPool<P>
35where
36 P: for<'p> InstrumentablePool<'p, Error = E> + Sync,
37 E: std::error::Error + Send + 'static,
38{
39 pub fn instrument<L: Into<Cow<'static, str>>>(
45 label: Option<L>,
46 pool: P,
47 ) -> Result<Self, Error<E>> {
48 let label = pool_kv(label.map(Into::into));
49 let metrics = POOL_METRICS.deref().clone();
50 metrics.record_state(&label, pool.get_state()?);
51 Ok(Self {
52 label,
53 metrics,
54 last_gathered_at: Mutex::new(Instant::now()),
55 pool,
56 })
57 }
58
59 pub fn update_metrics(&self) -> Result<(), Error<E>> {
68 const PROBE_INTERVAL: Duration = Duration::from_secs(15);
70 let mut last_gathered_at = self.last_gathered_at.lock();
71 if last_gathered_at.elapsed() > PROBE_INTERVAL {
72 *last_gathered_at = Instant::now();
73 drop(last_gathered_at);
74 self.metrics
75 .record_state(&self.label, self.pool.get_state()?);
76 }
77 Ok(())
78 }
79
80 #[inline]
82 fn measure_acquire(&self, before: Instant) -> Result<(), Error<E>> {
83 self.metrics
84 .wait_time
85 .record(before.elapsed().as_secs_f64(), &self.label);
86 self.update_metrics()
87 }
88
89 pub async fn get(
95 &self,
96 ) -> Result<InstrumentedResource<<P as InstrumentablePool<'_>>::Resource>, Error<E>> {
97 let now = Instant::now();
98 let span = debug_span!("pool_acquire", name = self.label[0].value.as_str().as_ref());
99 let resource = self.pool.get().instrument(span).await?;
100 self.measure_acquire(now)?;
101 Ok(InstrumentedResource::new(
102 self.metrics.clone(),
103 self.label.clone(),
104 resource,
105 ))
106 }
107
108 pub fn try_get(
115 &self,
116 ) -> Result<InstrumentedResource<<P as InstrumentablePool<'_>>::Resource>, Error<E>> {
117 let now = Instant::now();
118 let span = debug_span!(
119 "pool_try_acquire",
120 name = self.label[0].value.as_str().as_ref()
121 )
122 .entered();
123 let resource = self.pool.try_get()?;
124 drop(span);
125 self.measure_acquire(now)?;
126 Ok(InstrumentedResource::new(
127 self.metrics.clone(),
128 self.label.clone(),
129 resource,
130 ))
131 }
132
133 pub async fn get_timeout(
141 &self,
142 timeout: Duration,
143 ) -> Result<InstrumentedResource<<P as InstrumentablePool<'_>>::Resource>, Error<E>> {
144 let now = Instant::now();
145 let span = debug_span!(
146 "pool_timed_acquire",
147 name = self.label[0].value.as_str().as_ref()
148 );
149 let resource = self.pool.get_timeout(timeout).instrument(span).await?;
150 self.measure_acquire(now)?;
151 Ok(InstrumentedResource::new(
152 self.metrics.clone(),
153 self.label.clone(),
154 resource,
155 ))
156 }
157}
158
159impl<P> Deref for InstrumentedPool<P> {
160 type Target = P;
161
162 fn deref(&self) -> &Self::Target {
163 &self.pool
164 }
165}
166
167impl<P> DerefMut for InstrumentedPool<P> {
168 fn deref_mut(&mut self) -> &mut Self::Target {
169 &mut self.pool
170 }
171}
172
173impl<P> AsRef<P> for InstrumentedPool<P> {
174 fn as_ref(&self) -> &P {
175 &self.pool
176 }
177}
178
179impl<P> AsMut<P> for InstrumentedPool<P> {
180 fn as_mut(&mut self) -> &mut P {
181 &mut self.pool
182 }
183}
184
185impl<P: Clone> Clone for InstrumentedPool<P> {
186 fn clone(&self) -> Self {
187 Self {
188 label: self.label.clone(),
189 metrics: self.metrics.clone(),
190 last_gathered_at: Mutex::new(*self.last_gathered_at.lock()),
191 pool: self.pool.clone(),
192 }
193 }
194}
195
196impl<P: Debug> Debug for InstrumentedPool<P> {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 self.pool.fmt(f)
199 }
200}
201
202impl<P: Display> Display for InstrumentedPool<P> {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 self.pool.fmt(f)
205 }
206}