dr_metrix_postgres/pool/
deadpool.rs1use async_trait::async_trait;
2use diesel::deserialize::QueryableByName;
3use diesel::row::NamedRow;
4use diesel::sql_types::Text;
5use diesel::RunQueryDsl;
6use dr_metrix_core::error::{MetricsError, Result};
7use serde_json::{Map, Value};
8
9use super::{PoolAdapter, PoolStatus};
10
11struct JsonRow {
12 row_json: String,
13}
14
15impl<DB> QueryableByName<DB> for JsonRow
16where
17 DB: diesel::backend::Backend,
18 String: diesel::deserialize::FromSql<Text, DB>,
19{
20 fn build<'a>(
21 row: &impl NamedRow<'a, DB>,
22 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
23 let row_json = NamedRow::get::<Text, String>(row, "row_json")?;
24 Ok(Self { row_json })
25 }
26}
27
28#[async_trait]
29impl PoolAdapter for deadpool_diesel::postgres::Pool {
30 async fn query_json(&self, sql: &str) -> Result<Vec<Map<String, Value>>> {
31 let wrapped = format!("SELECT row_to_json(t)::text AS row_json FROM ({sql}) t");
32 let conn = self.get().await.map_err(MetricsError::database)?;
33
34 let rows: Vec<JsonRow> = conn
35 .interact(move |conn| diesel::sql_query(wrapped).load::<JsonRow>(conn))
36 .await
37 .map_err(MetricsError::database)?
38 .map_err(MetricsError::database)?;
39
40 let mut result = Vec::with_capacity(rows.len());
41 for row in rows {
42 let val: Value =
43 serde_json::from_str(&row.row_json).map_err(MetricsError::database)?;
44 if let Value::Object(map) = val {
45 result.push(map);
46 }
47 }
48 Ok(result)
49 }
50
51 fn pool_status(&self) -> PoolStatus {
52 let status = self.status();
53 let in_use = status.size.saturating_sub(status.available) as u32;
54 PoolStatus {
55 max_size: status.max_size as u32,
56 available: status.available as u32,
57 in_use,
58 }
59 }
60}