sql_middleware/postgres/
query.rs1use crate::middleware::{ResultSet, RowValues, SqlMiddlewareDbError};
2use chrono::NaiveDateTime;
3use deadpool_postgres::Transaction;
4use serde_json::Value;
5use tokio_postgres::{Statement, types::ToSql};
6
7pub async fn build_result_set(
12 stmt: &Statement,
13 params: &[&(dyn ToSql + Sync)],
14 transaction: &Transaction<'_>,
15) -> Result<ResultSet, SqlMiddlewareDbError> {
16 let rows = transaction.query(stmt, params).await?;
18
19 let column_names: Vec<String> = stmt
20 .columns()
21 .iter()
22 .map(|col| col.name().to_string())
23 .collect();
24
25 let capacity = rows.len();
27 let mut result_set = ResultSet::with_capacity(capacity);
28 let column_names_rc = std::sync::Arc::new(column_names);
30 result_set.set_column_names(column_names_rc);
31
32 for row in rows {
33 let mut row_values = Vec::new();
34
35 let col_count = result_set
36 .get_column_names()
37 .ok_or_else(|| {
38 SqlMiddlewareDbError::ExecutionError("No column names available".to_string())
39 })?
40 .len();
41
42 for i in 0..col_count {
43 let value = postgres_extract_value(&row, i)?;
44 row_values.push(value);
45 }
46
47 result_set.add_row_values(row_values);
48 }
49
50 Ok(result_set)
51}
52
53fn postgres_extract_value(
55 row: &tokio_postgres::Row,
56 idx: usize,
57) -> Result<RowValues, SqlMiddlewareDbError> {
58 let type_info = row.columns()[idx].type_();
60
61 if type_info.name() == "int2" {
64 let val: Option<i16> = row.try_get(idx)?;
65 Ok(val
66 .map(|v| RowValues::Int(i64::from(v)))
67 .unwrap_or(RowValues::Null))
68 } else if type_info.name() == "int4" {
69 let val: Option<i32> = row.try_get(idx)?;
70 Ok(val
71 .map(|v| RowValues::Int(i64::from(v)))
72 .unwrap_or(RowValues::Null))
73 } else if type_info.name() == "int8" {
74 let val: Option<i64> = row.try_get(idx)?;
75 Ok(val.map_or(RowValues::Null, RowValues::Int))
76 } else if type_info.name() == "float4" || type_info.name() == "float8" {
77 let val: Option<f64> = row.try_get(idx)?;
78 Ok(val.map_or(RowValues::Null, RowValues::Float))
79 } else if type_info.name() == "bool" {
80 let val: Option<bool> = row.try_get(idx)?;
81 Ok(val.map_or(RowValues::Null, RowValues::Bool))
82 } else if type_info.name() == "timestamp" || type_info.name() == "timestamptz" {
83 let val: Option<NaiveDateTime> = row.try_get(idx)?;
84 Ok(val.map_or(RowValues::Null, RowValues::Timestamp))
85 } else if type_info.name() == "json" || type_info.name() == "jsonb" {
86 let val: Option<Value> = row.try_get(idx)?;
87 Ok(val.map_or(RowValues::Null, RowValues::JSON))
88 } else if type_info.name() == "bytea" {
89 let val: Option<Vec<u8>> = row.try_get(idx)?;
90 Ok(val.map_or(RowValues::Null, RowValues::Blob))
91 } else if type_info.name() == "text"
92 || type_info.name() == "varchar"
93 || type_info.name() == "char"
94 {
95 let val: Option<String> = row.try_get(idx)?;
96 Ok(val.map_or(RowValues::Null, RowValues::Text))
97 } else {
98 let val: Option<String> = row.try_get(idx)?;
100 Ok(val.map_or(RowValues::Null, RowValues::Text))
101 }
102}