elif_orm/query/
execution.rs1use sqlx::Row;
4
5use super::builder::QueryBuilder;
6use crate::error::ModelResult;
7use crate::model::Model;
8
9impl<M: Model> QueryBuilder<M> {
11 pub async fn get(self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<Vec<M>> {
13 let sql = self.to_sql();
14 let rows = sqlx::query(&sql).fetch_all(pool).await?;
15
16 let mut models = Vec::new();
17 for row in rows {
18 models.push(M::from_row(&row)?);
19 }
20
21 Ok(models)
22 }
23
24 pub async fn chunk<F>(
26 self,
27 pool: &sqlx::Pool<sqlx::Postgres>,
28 chunk_size: i64,
29 mut callback: F,
30 ) -> ModelResult<()>
31 where
32 F: FnMut(Vec<M>) -> Result<(), crate::error::ModelError>,
33 {
34 let mut offset = 0;
35 loop {
36 let chunk_query = self.clone().limit(chunk_size).offset(offset);
37
38 let chunk = chunk_query.get(pool).await?;
39
40 if chunk.is_empty() {
41 break;
42 }
43
44 callback(chunk)?;
45 offset += chunk_size;
46 }
47
48 Ok(())
49 }
50
51 pub async fn get_raw(
53 self,
54 pool: &sqlx::Pool<sqlx::Postgres>,
55 ) -> ModelResult<Vec<serde_json::Value>> {
56 let sql = self.to_sql();
57 let rows = sqlx::query(&sql).fetch_all(pool).await?;
58
59 let mut results = Vec::new();
60 for row in rows {
61 let mut json_row = serde_json::Map::new();
62
63 for i in 0..row.len() {
66 if let Ok(column) = row.try_get::<Option<String>, _>(i) {
67 let column_name = format!("column_{}", i); json_row.insert(
69 column_name,
70 serde_json::Value::String(column.unwrap_or_default()),
71 );
72 }
73 }
74
75 results.push(serde_json::Value::Object(json_row));
76 }
77
78 Ok(results)
79 }
80
81 pub async fn first(self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<Option<M>> {
83 let query = self.limit(1);
84 let mut results = query.get(pool).await?;
85 Ok(results.pop())
86 }
87
88 pub async fn first_or_fail(self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<M> {
90 self.first(pool)
91 .await?
92 .ok_or_else(|| crate::error::ModelError::NotFound(M::table_name().to_string()))
93 }
94
95 pub async fn count(mut self, pool: &sqlx::Pool<sqlx::Postgres>) -> ModelResult<i64> {
97 self.select_fields = vec!["COUNT(*)".to_string()];
98 let sql = self.to_sql();
99
100 let row = sqlx::query(&sql).fetch_one(pool).await?;
101
102 let count: i64 = row.try_get(0)?;
103 Ok(count)
104 }
105
106 pub async fn aggregate(
108 self,
109 pool: &sqlx::Pool<sqlx::Postgres>,
110 ) -> ModelResult<Option<serde_json::Value>> {
111 let sql = self.to_sql();
112
113 let row_opt = sqlx::query(&sql).fetch_optional(pool).await?;
114
115 if let Some(row) = row_opt {
116 if let Ok(result) = row.try_get::<Option<i64>, _>(0) {
118 return Ok(Some(serde_json::Value::Number(serde_json::Number::from(
119 result.unwrap_or(0),
120 ))));
121 } else if let Ok(result) = row.try_get::<Option<f64>, _>(0) {
122 return Ok(Some(
123 serde_json::Number::from_f64(result.unwrap_or(0.0))
124 .map(serde_json::Value::Number)
125 .unwrap_or(serde_json::Value::Null),
126 ));
127 } else if let Ok(result) = row.try_get::<Option<String>, _>(0) {
128 return Ok(Some(serde_json::Value::String(result.unwrap_or_default())));
129 }
130 }
131
132 Ok(None)
133 }
134}