1use shaperail_core::{FieldSchema, FieldType, ResourceDefinition, ShaperailError};
7
8use super::filter::FilterSet;
9use super::pagination::{decode_cursor, encode_cursor, PageRequest};
10use super::search::SearchParam;
11use super::sort::SortParam;
12use super::{ResourceRow, SqlConnection};
13use crate::observability::telemetry;
14use sea_orm::ConnectionTrait;
15
16fn query_result_to_json(
18 row: &sea_orm::QueryResult,
19 resource: &ResourceDefinition,
20) -> Result<serde_json::Value, ShaperailError> {
21 let mut obj = serde_json::Map::new();
22 for (name, field) in &resource.schema {
23 let value = get_column_value(row, name, field)?;
24 obj.insert(name.clone(), value);
25 }
26 Ok(serde_json::Value::Object(obj))
27}
28
29fn get_column_value(
30 row: &sea_orm::QueryResult,
31 name: &str,
32 field: &FieldSchema,
33) -> Result<serde_json::Value, ShaperailError> {
34 let map_err =
35 |e: sea_orm::DbErr| ShaperailError::Internal(format!("Column '{name}' error: {e}"));
36 match field.field_type {
37 FieldType::Uuid => {
38 let v: Option<String> = row.try_get("", name).map_err(map_err)?;
40 Ok(v.map(serde_json::Value::String)
41 .unwrap_or(serde_json::Value::Null))
42 }
43 FieldType::String | FieldType::Enum | FieldType::File => {
44 let v: Option<String> = row.try_get("", name).map_err(map_err)?;
45 Ok(v.map(serde_json::Value::String)
46 .unwrap_or(serde_json::Value::Null))
47 }
48 FieldType::Integer => {
49 let v: Option<i32> = row.try_get("", name).map_err(map_err)?;
50 Ok(v.map(|n| serde_json::Value::Number(n.into()))
51 .unwrap_or(serde_json::Value::Null))
52 }
53 FieldType::Bigint => {
54 let v: Option<i64> = row.try_get("", name).map_err(map_err)?;
55 Ok(v.map(|n| serde_json::Value::Number(n.into()))
56 .unwrap_or(serde_json::Value::Null))
57 }
58 FieldType::Number => {
59 let v: Option<f64> = row.try_get("", name).map_err(map_err)?;
60 Ok(
61 v.and_then(|n| serde_json::Number::from_f64(n).map(serde_json::Value::Number))
62 .unwrap_or(serde_json::Value::Null),
63 )
64 }
65 FieldType::Boolean => {
66 let v: Option<bool> = row.try_get("", name).map_err(map_err)?;
67 Ok(v.map(serde_json::Value::Bool)
68 .unwrap_or(serde_json::Value::Null))
69 }
70 FieldType::Timestamp => {
71 let v: Option<String> = row.try_get("", name).map_err(map_err)?;
72 Ok(v.map(serde_json::Value::String)
73 .unwrap_or(serde_json::Value::Null))
74 }
75 FieldType::Date => {
76 let v: Option<String> = row.try_get("", name).map_err(map_err)?;
77 Ok(v.map(serde_json::Value::String)
78 .unwrap_or(serde_json::Value::Null))
79 }
80 FieldType::Json | FieldType::Array => {
81 let v: Option<serde_json::Value> = row.try_get("", name).map_err(map_err)?;
82 Ok(v.unwrap_or(serde_json::Value::Null))
83 }
84 }
85}
86
87fn json_to_sea_value(value: &serde_json::Value, field: &FieldSchema) -> sea_query::Value {
88 if value.is_null() {
89 return sea_query::Value::String(None);
90 }
91 match field.field_type {
92 FieldType::Uuid => {
93 sea_query::Value::String(Some(Box::new(
95 value.as_str().unwrap_or(&value.to_string()).to_string(),
96 )))
97 }
98 FieldType::String | FieldType::Enum | FieldType::File => sea_query::Value::String(Some(
99 Box::new(value.as_str().unwrap_or(&value.to_string()).to_string()),
100 )),
101 FieldType::Integer => sea_query::Value::Int(Some(value.as_i64().unwrap_or(0) as i32)),
102 FieldType::Bigint => sea_query::Value::BigInt(Some(value.as_i64().unwrap_or(0))),
103 FieldType::Number => sea_query::Value::Double(Some(value.as_f64().unwrap_or(0.0))),
104 FieldType::Boolean => sea_query::Value::Bool(Some(value.as_bool().unwrap_or(false))),
105 FieldType::Timestamp => sea_query::Value::String(Some(Box::new(
106 value.as_str().unwrap_or(&value.to_string()).to_string(),
107 ))),
108 FieldType::Date => sea_query::Value::String(Some(Box::new(
109 value.as_str().unwrap_or(&value.to_string()).to_string(),
110 ))),
111 FieldType::Json | FieldType::Array => sea_query::Value::Json(Some(Box::new(value.clone()))),
112 }
113}
114
115fn coerce_filter_to_sea_value(value: &str, field: &FieldSchema) -> sea_query::Value {
116 match field.field_type {
117 FieldType::Uuid => {
118 sea_query::Value::String(Some(Box::new(value.to_string())))
120 }
121 FieldType::Integer => value
122 .parse::<i32>()
123 .ok()
124 .map(|n| sea_query::Value::Int(Some(n)))
125 .unwrap_or_else(|| sea_query::Value::String(Some(Box::new(value.to_string())))),
126 FieldType::Bigint => value
127 .parse::<i64>()
128 .ok()
129 .map(|n| sea_query::Value::BigInt(Some(n)))
130 .unwrap_or_else(|| sea_query::Value::String(Some(Box::new(value.to_string())))),
131 FieldType::Number => value
132 .parse::<f64>()
133 .ok()
134 .map(|n| sea_query::Value::Double(Some(n)))
135 .unwrap_or_else(|| sea_query::Value::String(Some(Box::new(value.to_string())))),
136 FieldType::Boolean => value
137 .parse::<bool>()
138 .ok()
139 .map(|b| sea_query::Value::Bool(Some(b)))
140 .unwrap_or_else(|| sea_query::Value::String(Some(Box::new(value.to_string())))),
141 _ => sea_query::Value::String(Some(Box::new(value.to_string()))),
142 }
143}
144
145pub struct OrmResourceQuery<'a> {
148 pub resource: &'a ResourceDefinition,
149 pub connection: &'a SqlConnection,
150}
151
152impl<'a> OrmResourceQuery<'a> {
153 pub fn new(resource: &'a ResourceDefinition, connection: &'a SqlConnection) -> Self {
154 Self {
155 resource,
156 connection,
157 }
158 }
159
160 fn table(&self) -> String {
161 self.connection.quote_ident(&self.resource.resource)
162 }
163
164 fn select_columns(&self) -> String {
165 self.resource
166 .schema
167 .keys()
168 .map(|c| self.connection.quote_ident(c))
169 .collect::<Vec<_>>()
170 .join(", ")
171 }
172
173 fn primary_key(&self) -> &str {
174 self.resource
175 .schema
176 .iter()
177 .find(|(_, fs)| fs.primary)
178 .map(|(name, _)| name.as_str())
179 .unwrap_or("id")
180 }
181
182 fn has_soft_delete(&self) -> bool {
183 self.resource
184 .endpoints
185 .as_ref()
186 .map(|eps| eps.values().any(|ep| ep.soft_delete))
187 .unwrap_or(false)
188 }
189
190 fn backend(&self) -> sea_orm::DatabaseBackend {
191 self.connection.backend()
192 }
193
194 fn param(&self, index: usize) -> String {
195 self.connection.param(index)
196 }
197
198 fn qi(&self, name: &str) -> String {
199 self.connection.quote_ident(name)
200 }
201
202 fn supports_returning(&self) -> bool {
204 matches!(
205 self.connection.engine,
206 shaperail_core::DatabaseEngine::Postgres | shaperail_core::DatabaseEngine::SQLite
207 )
208 }
209
210 pub async fn find_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
212 let pk = self.primary_key();
213 let soft = if self.has_soft_delete() {
214 format!(" AND {} IS NULL", self.qi("deleted_at"))
215 } else {
216 String::new()
217 };
218 let p1 = self.param(1);
219 let sql = format!(
220 "SELECT {} FROM {} WHERE {} = {p1}{soft}",
221 self.select_columns(),
222 self.table(),
223 self.qi(pk),
224 );
225 let values = sea_query::Values(vec![sea_query::Value::String(Some(Box::new(
226 id.to_string(),
227 )))]);
228 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
229 let span = telemetry::db_span("orm_find_by_id", &self.resource.resource, "SELECT");
230 let _enter = span.enter();
231 let rows = self
232 .connection
233 .inner
234 .query_all(stmt)
235 .await
236 .map_err(|e| ShaperailError::Internal(format!("ORM find_by_id failed: {e}")))?;
237 let row = rows.into_iter().next().ok_or(ShaperailError::NotFound)?;
238 let json = query_result_to_json(&row, self.resource)?;
239 Ok(ResourceRow(json))
240 }
241
242 pub async fn find_all(
244 &self,
245 filters: &FilterSet,
246 _search: Option<&SearchParam>,
247 sort: &SortParam,
248 page: &PageRequest,
249 ) -> Result<(Vec<ResourceRow>, serde_json::Value), ShaperailError> {
250 let mut sql = format!("SELECT {} FROM {}", self.select_columns(), self.table());
251 let mut values_vec = Vec::new();
252 let mut param = 1usize;
253
254 if self.has_soft_delete() {
255 sql.push_str(&format!(" WHERE {} IS NULL", self.qi("deleted_at")));
256 }
257 for f in &filters.filters {
258 if let Some(field) = self.resource.schema.get(&f.field) {
259 if param == 1 && !self.has_soft_delete() {
260 sql.push_str(" WHERE ");
261 } else {
262 sql.push_str(" AND ");
263 }
264 let p = self.param(param);
265 sql.push_str(&format!("{} = {p}", self.qi(&f.field)));
266 values_vec.push(coerce_filter_to_sea_value(&f.value, field));
267 param += 1;
268 }
269 }
270
271 if !sort.fields.is_empty() {
272 sql.push_str(" ORDER BY ");
273 for (i, s) in sort.fields.iter().enumerate() {
274 if i > 0 {
275 sql.push_str(", ");
276 }
277 let dir = match s.direction {
278 super::sort::SortDirection::Asc => "ASC",
279 super::sort::SortDirection::Desc => "DESC",
280 };
281 sql.push_str(&format!("{} {dir}", self.qi(&s.field)));
282 }
283 }
284
285 match page {
286 PageRequest::Cursor { after, limit } => {
287 if let Some(cursor) = after {
288 let id_str = decode_cursor(cursor)?;
289 if param == 1 && !self.has_soft_delete() && filters.is_empty() {
290 sql.push_str(" WHERE ");
291 } else {
292 sql.push_str(" AND ");
293 }
294 let p = self.param(param);
295 sql.push_str(&format!("{} > {p}", self.qi("id")));
296 values_vec.push(sea_query::Value::String(Some(Box::new(id_str))));
297 }
298 if sort.fields.is_empty() {
299 sql.push_str(&format!(" ORDER BY {} ASC", self.qi("id")));
300 }
301 sql.push_str(&format!(" LIMIT {}", limit + 1));
302 let values = sea_query::Values(values_vec);
303 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
304 let span = telemetry::db_span("orm_find_all", &self.resource.resource, "SELECT");
305 let _enter = span.enter();
306 let rows =
307 self.connection.inner.query_all(stmt).await.map_err(|e| {
308 ShaperailError::Internal(format!("ORM find_all failed: {e}"))
309 })?;
310 let limit_i64 = *limit;
311 let has_more = rows.len() as i64 > limit_i64;
312 let result_rows: Vec<ResourceRow> = rows
313 .into_iter()
314 .take(limit_i64 as usize)
315 .map(|r| query_result_to_json(&r, self.resource).map(ResourceRow))
316 .collect::<Result<Vec<_>, _>>()?;
317 let cursor = result_rows
318 .last()
319 .and_then(|r| r.0.get("id"))
320 .and_then(|v| v.as_str())
321 .map(encode_cursor);
322 let meta = serde_json::json!({ "cursor": cursor, "has_more": has_more });
323 Ok((result_rows, meta))
324 }
325 PageRequest::Offset { offset, limit } => {
326 sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
327 let values = sea_query::Values(values_vec);
328 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
329 let span = telemetry::db_span("orm_find_all", &self.resource.resource, "SELECT");
330 let _enter = span.enter();
331 let rows =
332 self.connection.inner.query_all(stmt).await.map_err(|e| {
333 ShaperailError::Internal(format!("ORM find_all failed: {e}"))
334 })?;
335 let result_rows: Vec<ResourceRow> = rows
336 .into_iter()
337 .map(|r| query_result_to_json(&r, self.resource).map(ResourceRow))
338 .collect::<Result<Vec<_>, _>>()?;
339 let total = result_rows.len() as i64;
340 let meta = serde_json::json!({
341 "offset": offset,
342 "limit": limit,
343 "total": total
344 });
345 Ok((result_rows, meta))
346 }
347 }
348 }
349
350 pub async fn insert(
352 &self,
353 data: &serde_json::Map<String, serde_json::Value>,
354 ) -> Result<ResourceRow, ShaperailError> {
355 let mut columns = Vec::new();
356 let mut placeholders = Vec::new();
357 let mut values_vec = Vec::new();
358 let mut param = 1usize;
359 let mut generated_id: Option<String> = None;
360
361 for (name, field) in &self.resource.schema {
362 if field.generated {
363 match field.field_type {
364 FieldType::Uuid => {
365 let id = uuid::Uuid::new_v4();
366 columns.push(self.qi(name));
367 placeholders.push(self.param(param));
368 values_vec.push(sea_query::Value::String(Some(Box::new(id.to_string()))));
369 if field.primary {
370 generated_id = Some(id.to_string());
371 }
372 param += 1;
373 }
374 FieldType::Timestamp => {
375 let now = chrono::Utc::now().to_rfc3339();
376 columns.push(self.qi(name));
377 placeholders.push(self.param(param));
378 values_vec.push(sea_query::Value::String(Some(Box::new(now))));
379 param += 1;
380 }
381 _ => {}
382 }
383 continue;
384 }
385 if let Some(value) = data.get(name) {
386 columns.push(self.qi(name));
387 placeholders.push(self.param(param));
388 values_vec.push(json_to_sea_value(value, field));
389 param += 1;
390 } else if let Some(default) = &field.default {
391 columns.push(self.qi(name));
392 placeholders.push(self.param(param));
393 values_vec.push(json_to_sea_value(default, field));
394 param += 1;
395 }
396 }
397
398 if self.supports_returning() {
399 let sql = format!(
400 "INSERT INTO {} ({}) VALUES ({}) RETURNING {}",
401 self.table(),
402 columns.join(", "),
403 placeholders.join(", "),
404 self.select_columns(),
405 );
406 let values = sea_query::Values(values_vec);
407 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
408 let span = telemetry::db_span("orm_insert", &self.resource.resource, "INSERT");
409 let _enter = span.enter();
410 let rows = self
411 .connection
412 .inner
413 .query_all(stmt)
414 .await
415 .map_err(|e| ShaperailError::Internal(format!("ORM insert failed: {e}")))?;
416 let row = rows
417 .into_iter()
418 .next()
419 .ok_or_else(|| ShaperailError::Internal("Insert returned no rows".to_string()))?;
420 let json = query_result_to_json(&row, self.resource)?;
421 Ok(ResourceRow(json))
422 } else {
423 let sql = format!(
425 "INSERT INTO {} ({}) VALUES ({})",
426 self.table(),
427 columns.join(", "),
428 placeholders.join(", "),
429 );
430 let values = sea_query::Values(values_vec);
431 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
432 let span = telemetry::db_span("orm_insert", &self.resource.resource, "INSERT");
433 let _enter = span.enter();
434 self.connection
435 .inner
436 .execute(stmt)
437 .await
438 .map_err(|e| ShaperailError::Internal(format!("ORM insert failed: {e}")))?;
439 if let Some(id_str) = generated_id {
441 let id = uuid::Uuid::parse_str(&id_str).map_err(|e| {
442 ShaperailError::Internal(format!("Generated UUID parse error: {e}"))
443 })?;
444 self.find_by_id(&id).await
445 } else {
446 Err(ShaperailError::Internal(
448 "MySQL insert without generated UUID not supported".to_string(),
449 ))
450 }
451 }
452 }
453
454 pub async fn update_by_id(
456 &self,
457 id: &uuid::Uuid,
458 data: &serde_json::Map<String, serde_json::Value>,
459 ) -> Result<ResourceRow, ShaperailError> {
460 let mut set_parts = Vec::new();
461 let mut values_vec = Vec::new();
462 let mut param = 1usize;
463
464 for (name, value) in data {
465 if let Some(field) = self.resource.schema.get(name) {
466 if field.primary || field.generated {
467 continue;
468 }
469 let p = self.param(param);
470 set_parts.push(format!("{} = {p}", self.qi(name)));
471 values_vec.push(json_to_sea_value(value, field));
472 param += 1;
473 }
474 }
475 if self.resource.schema.contains_key("updated_at") {
476 let p = self.param(param);
477 let now = chrono::Utc::now().to_rfc3339();
478 set_parts.push(format!("{} = {p}", self.qi("updated_at")));
479 values_vec.push(sea_query::Value::String(Some(Box::new(now))));
480 param += 1;
481 }
482 if set_parts.is_empty() {
483 return Err(ShaperailError::Validation(vec![
484 shaperail_core::FieldError {
485 field: "body".to_string(),
486 message: "No valid fields to update".to_string(),
487 code: "empty_update".to_string(),
488 },
489 ]));
490 }
491 let pk = self.primary_key();
492 let soft = if self.has_soft_delete() {
493 format!(" AND {} IS NULL", self.qi("deleted_at"))
494 } else {
495 String::new()
496 };
497 let p = self.param(param);
498 values_vec.push(sea_query::Value::String(Some(Box::new(id.to_string()))));
499
500 if self.supports_returning() {
501 let sql = format!(
502 "UPDATE {} SET {} WHERE {} = {p}{soft} RETURNING {}",
503 self.table(),
504 set_parts.join(", "),
505 self.qi(pk),
506 self.select_columns(),
507 );
508 let values = sea_query::Values(values_vec);
509 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
510 let span = telemetry::db_span("orm_update", &self.resource.resource, "UPDATE");
511 let _enter = span.enter();
512 let rows = self
513 .connection
514 .inner
515 .query_all(stmt)
516 .await
517 .map_err(|e| ShaperailError::Internal(format!("ORM update failed: {e}")))?;
518 rows.into_iter()
519 .next()
520 .ok_or(ShaperailError::NotFound)
521 .and_then(|row| query_result_to_json(&row, self.resource).map(ResourceRow))
522 } else {
523 let sql = format!(
525 "UPDATE {} SET {} WHERE {} = {p}{soft}",
526 self.table(),
527 set_parts.join(", "),
528 self.qi(pk),
529 );
530 let values = sea_query::Values(values_vec);
531 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
532 let span = telemetry::db_span("orm_update", &self.resource.resource, "UPDATE");
533 let _enter = span.enter();
534 let result = self
535 .connection
536 .inner
537 .execute(stmt)
538 .await
539 .map_err(|e| ShaperailError::Internal(format!("ORM update failed: {e}")))?;
540 if result.rows_affected() == 0 {
541 return Err(ShaperailError::NotFound);
542 }
543 self.find_by_id(id).await
544 }
545 }
546
547 pub async fn soft_delete_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
549 let pk = self.primary_key();
550 let p1 = self.param(1);
551 let p2 = self.param(2);
552 let now = chrono::Utc::now().to_rfc3339();
553
554 if self.supports_returning() {
555 let sql = format!(
556 "UPDATE {} SET {} = {p1} WHERE {} = {p2} AND {} IS NULL RETURNING {}",
557 self.table(),
558 self.qi("deleted_at"),
559 self.qi(pk),
560 self.qi("deleted_at"),
561 self.select_columns(),
562 );
563 let values = sea_query::Values(vec![
564 sea_query::Value::String(Some(Box::new(now))),
565 sea_query::Value::String(Some(Box::new(id.to_string()))),
566 ]);
567 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
568 let span = telemetry::db_span("orm_soft_delete", &self.resource.resource, "UPDATE");
569 let _enter = span.enter();
570 let rows =
571 self.connection.inner.query_all(stmt).await.map_err(|e| {
572 ShaperailError::Internal(format!("ORM soft_delete failed: {e}"))
573 })?;
574 rows.into_iter()
575 .next()
576 .ok_or(ShaperailError::NotFound)
577 .and_then(|row| query_result_to_json(&row, self.resource).map(ResourceRow))
578 } else {
579 let sql = format!(
581 "UPDATE {} SET {} = {p1} WHERE {} = {p2} AND {} IS NULL",
582 self.table(),
583 self.qi("deleted_at"),
584 self.qi(pk),
585 self.qi("deleted_at"),
586 );
587 let values = sea_query::Values(vec![
588 sea_query::Value::String(Some(Box::new(now))),
589 sea_query::Value::String(Some(Box::new(id.to_string()))),
590 ]);
591 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
592 let span = telemetry::db_span("orm_soft_delete", &self.resource.resource, "UPDATE");
593 let _enter = span.enter();
594 let result =
595 self.connection.inner.execute(stmt).await.map_err(|e| {
596 ShaperailError::Internal(format!("ORM soft_delete failed: {e}"))
597 })?;
598 if result.rows_affected() == 0 {
599 return Err(ShaperailError::NotFound);
600 }
601 self.find_by_id(id).await
602 }
603 }
604
605 pub async fn hard_delete_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
607 let row = self.find_by_id(id).await?;
608 let pk = self.primary_key();
609 let p1 = self.param(1);
610
611 if self.supports_returning() {
612 let sql = format!(
613 "DELETE FROM {} WHERE {} = {p1} RETURNING {}",
614 self.table(),
615 self.qi(pk),
616 self.select_columns(),
617 );
618 let values = sea_query::Values(vec![sea_query::Value::String(Some(Box::new(
619 id.to_string(),
620 )))]);
621 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
622 let span = telemetry::db_span("orm_hard_delete", &self.resource.resource, "DELETE");
623 let _enter = span.enter();
624 self.connection
625 .inner
626 .execute(stmt)
627 .await
628 .map_err(|e| ShaperailError::Internal(format!("ORM hard_delete failed: {e}")))?;
629 } else {
630 let sql = format!("DELETE FROM {} WHERE {} = {p1}", self.table(), self.qi(pk),);
632 let values = sea_query::Values(vec![sea_query::Value::String(Some(Box::new(
633 id.to_string(),
634 )))]);
635 let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
636 let span = telemetry::db_span("orm_hard_delete", &self.resource.resource, "DELETE");
637 let _enter = span.enter();
638 self.connection
639 .inner
640 .execute(stmt)
641 .await
642 .map_err(|e| ShaperailError::Internal(format!("ORM hard_delete failed: {e}")))?;
643 }
644 Ok(row)
645 }
646}