elif_orm/query/
execution.rs1use sqlx::Row;
4use serde_json::Value;
5
6use crate::error::ModelResult;
7use crate::model::Model;
8use super::builder::QueryBuilder;
9
10impl<M: Model> QueryBuilder<M> {
12 pub async fn get(self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<Vec<M>> {
14 let sql = self.to_sql();
15 let rows = sqlx::query(&sql)
16 .fetch_all(pool)
17 .await?;
18
19 let mut models = Vec::new();
20 for row in rows {
21 models.push(M::from_row(&row)?);
22 }
23
24 Ok(models)
25 }
26
27 pub async fn chunk<F>(
29 mut self,
30 pool: &sqlx::Pool<sqlx::Postgres>,
31 chunk_size: i64,
32 mut callback: F
33 ) -> ModelResult<()>
34 where
35 F: FnMut(Vec<M>) -> Result<(), crate::error::ModelError>,
36 {
37 let mut offset = 0;
38 loop {
39 let chunk_query = self.clone()
40 .limit(chunk_size)
41 .offset(offset);
42
43 let chunk = chunk_query.get(pool).await?;
44
45 if chunk.is_empty() {
46 break;
47 }
48
49 callback(chunk)?;
50 offset += chunk_size;
51 }
52
53 Ok(())
54 }
55
56 pub async fn get_raw(self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<Vec<serde_json::Value>> {
58 let sql = self.to_sql();
59 let rows = sqlx::query(&sql)
60 .fetch_all(pool)
61 .await?;
62
63 let mut results = Vec::new();
64 for row in rows {
65 let mut json_row = serde_json::Map::new();
66
67 for i in 0..row.len() {
70 if let Ok(column) = row.try_get::<Option<String>, _>(i) {
71 let column_name = format!("column_{}", i); json_row.insert(column_name, serde_json::Value::String(column.unwrap_or_default()));
73 }
74 }
75
76 results.push(serde_json::Value::Object(json_row));
77 }
78
79 Ok(results)
80 }
81
82 pub async fn first(self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<Option<M>> {
84 let query = self.limit(1);
85 let mut results = query.get(pool).await?;
86 Ok(results.pop())
87 }
88
89 pub async fn first_or_fail(self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<M> {
91 self.first(pool)
92 .await?
93 .ok_or_else(|| crate::error::ModelError::NotFound(M::table_name().to_string()))
94 }
95
96 pub async fn count(mut self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<i64> {
98 self.select_fields = vec!["COUNT(*)".to_string()];
99 let sql = self.to_sql();
100
101 let row = sqlx::query(&sql)
102 .fetch_one(pool)
103 .await?;
104
105 let count: i64 = row.try_get(0)?;
106 Ok(count)
107 }
108
109 pub async fn aggregate(self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<Option<serde_json::Value>> {
111 let sql = self.to_sql();
112
113 let row_opt = sqlx::query(&sql)
114 .fetch_optional(pool)
115 .await?;
116
117 if let Some(row) = row_opt {
118 if let Ok(result) = row.try_get::<Option<i64>, _>(0) {
120 return Ok(Some(serde_json::Value::Number(serde_json::Number::from(result.unwrap_or(0)))));
121 } else if let Ok(result) = row.try_get::<Option<f64>, _>(0) {
122 return Ok(Some(serde_json::Number::from_f64(result.unwrap_or(0.0)).map(serde_json::Value::Number).unwrap_or(serde_json::Value::Null)));
123 } else if let Ok(result) = row.try_get::<Option<String>, _>(0) {
124 return Ok(Some(serde_json::Value::String(result.unwrap_or_default())));
125 }
126 }
127
128 Ok(None)
129 }
130}