Skip to main content

dr_metrix_postgres/pool/
deadpool.rs

1use async_trait::async_trait;
2use diesel::deserialize::QueryableByName;
3use diesel::row::NamedRow;
4use diesel::sql_types::Text;
5use diesel::RunQueryDsl;
6use dr_metrix_core::error::{MetricsError, Result};
7use serde_json::{Map, Value};
8
9use super::{PoolAdapter, PoolStatus};
10
11struct JsonRow {
12    row_json: String,
13}
14
15impl<DB> QueryableByName<DB> for JsonRow
16where
17    DB: diesel::backend::Backend,
18    String: diesel::deserialize::FromSql<Text, DB>,
19{
20    fn build<'a>(
21        row: &impl NamedRow<'a, DB>,
22    ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
23        let row_json = NamedRow::get::<Text, String>(row, "row_json")?;
24        Ok(Self { row_json })
25    }
26}
27
28#[async_trait]
29impl PoolAdapter for deadpool_diesel::postgres::Pool {
30    async fn query_json(&self, sql: &str) -> Result<Vec<Map<String, Value>>> {
31        let wrapped = format!("SELECT row_to_json(t)::text AS row_json FROM ({sql}) t");
32        let conn = self.get().await.map_err(MetricsError::database)?;
33
34        let rows: Vec<JsonRow> = conn
35            .interact(move |conn| diesel::sql_query(wrapped).load::<JsonRow>(conn))
36            .await
37            .map_err(MetricsError::database)?
38            .map_err(MetricsError::database)?;
39
40        let mut result = Vec::with_capacity(rows.len());
41        for row in rows {
42            let val: Value =
43                serde_json::from_str(&row.row_json).map_err(MetricsError::database)?;
44            if let Value::Object(map) = val {
45                result.push(map);
46            }
47        }
48        Ok(result)
49    }
50
51    fn pool_status(&self) -> PoolStatus {
52        let status = self.status();
53        let in_use = status.size.saturating_sub(status.available) as u32;
54        PoolStatus {
55            max_size: status.max_size as u32,
56            available: status.available as u32,
57            in_use,
58        }
59    }
60}