Skip to main content

shaperail_runtime/db/
orm_query.rs

1//! ORM-backed resource query (M14). Builds dialect-aware SQL and executes via SeaORM.
2//!
3//! Supports Postgres, MySQL, and SQLite. Uses each connection's engine to determine
4//! identifier quoting (`"` vs `` ` ``), parameter syntax (`$N` vs `?`), and backend.
5
6use shaperail_core::{FieldSchema, FieldType, ResourceDefinition, ShaperailError};
7
8use super::filter::FilterSet;
9use super::pagination::{decode_cursor, encode_cursor, PageRequest};
10use super::search::SearchParam;
11use super::sort::SortParam;
12use super::{ResourceRow, SqlConnection};
13use crate::observability::telemetry;
14use sea_orm::ConnectionTrait;
15
16/// Converts a SeaORM QueryResult row to a JSON object using the resource schema.
17fn query_result_to_json(
18    row: &sea_orm::QueryResult,
19    resource: &ResourceDefinition,
20) -> Result<serde_json::Value, ShaperailError> {
21    let mut obj = serde_json::Map::new();
22    for (name, field) in &resource.schema {
23        let value = get_column_value(row, name, field)?;
24        obj.insert(name.clone(), value);
25    }
26    Ok(serde_json::Value::Object(obj))
27}
28
29fn get_column_value(
30    row: &sea_orm::QueryResult,
31    name: &str,
32    field: &FieldSchema,
33) -> Result<serde_json::Value, ShaperailError> {
34    let map_err =
35        |e: sea_orm::DbErr| ShaperailError::Internal(format!("Column '{name}' error: {e}"));
36    match field.field_type {
37        FieldType::Uuid => {
38            // MySQL/SQLite store UUIDs as strings; Postgres as native UUID.
39            let v: Option<String> = row.try_get("", name).map_err(map_err)?;
40            Ok(v.map(serde_json::Value::String)
41                .unwrap_or(serde_json::Value::Null))
42        }
43        FieldType::String | FieldType::Enum | FieldType::File => {
44            let v: Option<String> = row.try_get("", name).map_err(map_err)?;
45            Ok(v.map(serde_json::Value::String)
46                .unwrap_or(serde_json::Value::Null))
47        }
48        FieldType::Integer => {
49            let v: Option<i32> = row.try_get("", name).map_err(map_err)?;
50            Ok(v.map(|n| serde_json::Value::Number(n.into()))
51                .unwrap_or(serde_json::Value::Null))
52        }
53        FieldType::Bigint => {
54            let v: Option<i64> = row.try_get("", name).map_err(map_err)?;
55            Ok(v.map(|n| serde_json::Value::Number(n.into()))
56                .unwrap_or(serde_json::Value::Null))
57        }
58        FieldType::Number => {
59            let v: Option<f64> = row.try_get("", name).map_err(map_err)?;
60            Ok(
61                v.and_then(|n| serde_json::Number::from_f64(n).map(serde_json::Value::Number))
62                    .unwrap_or(serde_json::Value::Null),
63            )
64        }
65        FieldType::Boolean => {
66            let v: Option<bool> = row.try_get("", name).map_err(map_err)?;
67            Ok(v.map(serde_json::Value::Bool)
68                .unwrap_or(serde_json::Value::Null))
69        }
70        FieldType::Timestamp => {
71            let v: Option<String> = row.try_get("", name).map_err(map_err)?;
72            Ok(v.map(serde_json::Value::String)
73                .unwrap_or(serde_json::Value::Null))
74        }
75        FieldType::Date => {
76            let v: Option<String> = row.try_get("", name).map_err(map_err)?;
77            Ok(v.map(serde_json::Value::String)
78                .unwrap_or(serde_json::Value::Null))
79        }
80        FieldType::Json | FieldType::Array => {
81            let v: Option<serde_json::Value> = row.try_get("", name).map_err(map_err)?;
82            Ok(v.unwrap_or(serde_json::Value::Null))
83        }
84    }
85}
86
87fn json_to_sea_value(value: &serde_json::Value, field: &FieldSchema) -> sea_query::Value {
88    if value.is_null() {
89        return sea_query::Value::String(None);
90    }
91    match field.field_type {
92        FieldType::Uuid => {
93            // Store as string for cross-engine compat (MySQL/SQLite use CHAR/TEXT).
94            sea_query::Value::String(Some(Box::new(
95                value.as_str().unwrap_or(&value.to_string()).to_string(),
96            )))
97        }
98        FieldType::String | FieldType::Enum | FieldType::File => sea_query::Value::String(Some(
99            Box::new(value.as_str().unwrap_or(&value.to_string()).to_string()),
100        )),
101        FieldType::Integer => sea_query::Value::Int(Some(value.as_i64().unwrap_or(0) as i32)),
102        FieldType::Bigint => sea_query::Value::BigInt(Some(value.as_i64().unwrap_or(0))),
103        FieldType::Number => sea_query::Value::Double(Some(value.as_f64().unwrap_or(0.0))),
104        FieldType::Boolean => sea_query::Value::Bool(Some(value.as_bool().unwrap_or(false))),
105        FieldType::Timestamp => sea_query::Value::String(Some(Box::new(
106            value.as_str().unwrap_or(&value.to_string()).to_string(),
107        ))),
108        FieldType::Date => sea_query::Value::String(Some(Box::new(
109            value.as_str().unwrap_or(&value.to_string()).to_string(),
110        ))),
111        FieldType::Json | FieldType::Array => sea_query::Value::Json(Some(Box::new(value.clone()))),
112    }
113}
114
115fn coerce_filter_to_sea_value(value: &str, field: &FieldSchema) -> sea_query::Value {
116    match field.field_type {
117        FieldType::Uuid => {
118            // Store as string for cross-engine compat.
119            sea_query::Value::String(Some(Box::new(value.to_string())))
120        }
121        FieldType::Integer => value
122            .parse::<i32>()
123            .ok()
124            .map(|n| sea_query::Value::Int(Some(n)))
125            .unwrap_or_else(|| sea_query::Value::String(Some(Box::new(value.to_string())))),
126        FieldType::Bigint => value
127            .parse::<i64>()
128            .ok()
129            .map(|n| sea_query::Value::BigInt(Some(n)))
130            .unwrap_or_else(|| sea_query::Value::String(Some(Box::new(value.to_string())))),
131        FieldType::Number => value
132            .parse::<f64>()
133            .ok()
134            .map(|n| sea_query::Value::Double(Some(n)))
135            .unwrap_or_else(|| sea_query::Value::String(Some(Box::new(value.to_string())))),
136        FieldType::Boolean => value
137            .parse::<bool>()
138            .ok()
139            .map(|b| sea_query::Value::Bool(Some(b)))
140            .unwrap_or_else(|| sea_query::Value::String(Some(Box::new(value.to_string())))),
141        _ => sea_query::Value::String(Some(Box::new(value.to_string()))),
142    }
143}
144
145/// ORM-backed resource query. Executes dialect-aware SQL via SeaORM.
146/// Supports Postgres, MySQL, and SQLite backends.
147pub struct OrmResourceQuery<'a> {
148    pub resource: &'a ResourceDefinition,
149    pub connection: &'a SqlConnection,
150}
151
152impl<'a> OrmResourceQuery<'a> {
153    pub fn new(resource: &'a ResourceDefinition, connection: &'a SqlConnection) -> Self {
154        Self {
155            resource,
156            connection,
157        }
158    }
159
160    fn table(&self) -> String {
161        self.connection.quote_ident(&self.resource.resource)
162    }
163
164    fn select_columns(&self) -> String {
165        self.resource
166            .schema
167            .keys()
168            .map(|c| self.connection.quote_ident(c))
169            .collect::<Vec<_>>()
170            .join(", ")
171    }
172
173    fn primary_key(&self) -> &str {
174        self.resource
175            .schema
176            .iter()
177            .find(|(_, fs)| fs.primary)
178            .map(|(name, _)| name.as_str())
179            .unwrap_or("id")
180    }
181
182    fn has_soft_delete(&self) -> bool {
183        self.resource
184            .endpoints
185            .as_ref()
186            .map(|eps| eps.values().any(|ep| ep.soft_delete))
187            .unwrap_or(false)
188    }
189
190    fn backend(&self) -> sea_orm::DatabaseBackend {
191        self.connection.backend()
192    }
193
194    fn param(&self, index: usize) -> String {
195        self.connection.param(index)
196    }
197
198    fn qi(&self, name: &str) -> String {
199        self.connection.quote_ident(name)
200    }
201
202    /// Whether this engine supports RETURNING clause (Postgres, SQLite 3.35+).
203    fn supports_returning(&self) -> bool {
204        matches!(
205            self.connection.engine,
206            shaperail_core::DatabaseEngine::Postgres | shaperail_core::DatabaseEngine::SQLite
207        )
208    }
209
210    /// Find a single record by primary key.
211    pub async fn find_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
212        let pk = self.primary_key();
213        let soft = if self.has_soft_delete() {
214            format!(" AND {} IS NULL", self.qi("deleted_at"))
215        } else {
216            String::new()
217        };
218        let p1 = self.param(1);
219        let sql = format!(
220            "SELECT {} FROM {} WHERE {} = {p1}{soft}",
221            self.select_columns(),
222            self.table(),
223            self.qi(pk),
224        );
225        let values = sea_query::Values(vec![sea_query::Value::String(Some(Box::new(
226            id.to_string(),
227        )))]);
228        let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
229        let span = telemetry::db_span("orm_find_by_id", &self.resource.resource, "SELECT");
230        let _enter = span.enter();
231        let rows = self
232            .connection
233            .inner
234            .query_all(stmt)
235            .await
236            .map_err(|e| ShaperailError::Internal(format!("ORM find_by_id failed: {e}")))?;
237        let row = rows.into_iter().next().ok_or(ShaperailError::NotFound)?;
238        let json = query_result_to_json(&row, self.resource)?;
239        Ok(ResourceRow(json))
240    }
241
242    /// Find all with filters, sort, and pagination.
243    pub async fn find_all(
244        &self,
245        filters: &FilterSet,
246        _search: Option<&SearchParam>,
247        sort: &SortParam,
248        page: &PageRequest,
249    ) -> Result<(Vec<ResourceRow>, serde_json::Value), ShaperailError> {
250        let mut sql = format!("SELECT {} FROM {}", self.select_columns(), self.table());
251        let mut values_vec = Vec::new();
252        let mut param = 1usize;
253
254        if self.has_soft_delete() {
255            sql.push_str(&format!(" WHERE {} IS NULL", self.qi("deleted_at")));
256        }
257        for f in &filters.filters {
258            if let Some(field) = self.resource.schema.get(&f.field) {
259                if param == 1 && !self.has_soft_delete() {
260                    sql.push_str(" WHERE ");
261                } else {
262                    sql.push_str(" AND ");
263                }
264                let p = self.param(param);
265                sql.push_str(&format!("{} = {p}", self.qi(&f.field)));
266                values_vec.push(coerce_filter_to_sea_value(&f.value, field));
267                param += 1;
268            }
269        }
270
271        if !sort.fields.is_empty() {
272            sql.push_str(" ORDER BY ");
273            for (i, s) in sort.fields.iter().enumerate() {
274                if i > 0 {
275                    sql.push_str(", ");
276                }
277                let dir = match s.direction {
278                    super::sort::SortDirection::Asc => "ASC",
279                    super::sort::SortDirection::Desc => "DESC",
280                };
281                sql.push_str(&format!("{} {dir}", self.qi(&s.field)));
282            }
283        }
284
285        match page {
286            PageRequest::Cursor { after, limit } => {
287                if let Some(cursor) = after {
288                    let id_str = decode_cursor(cursor)?;
289                    if param == 1 && !self.has_soft_delete() && filters.is_empty() {
290                        sql.push_str(" WHERE ");
291                    } else {
292                        sql.push_str(" AND ");
293                    }
294                    let p = self.param(param);
295                    sql.push_str(&format!("{} > {p}", self.qi("id")));
296                    values_vec.push(sea_query::Value::String(Some(Box::new(id_str))));
297                }
298                if sort.fields.is_empty() {
299                    sql.push_str(&format!(" ORDER BY {} ASC", self.qi("id")));
300                }
301                sql.push_str(&format!(" LIMIT {}", limit + 1));
302                let values = sea_query::Values(values_vec);
303                let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
304                let span = telemetry::db_span("orm_find_all", &self.resource.resource, "SELECT");
305                let _enter = span.enter();
306                let rows =
307                    self.connection.inner.query_all(stmt).await.map_err(|e| {
308                        ShaperailError::Internal(format!("ORM find_all failed: {e}"))
309                    })?;
310                let limit_i64 = *limit;
311                let has_more = rows.len() as i64 > limit_i64;
312                let result_rows: Vec<ResourceRow> = rows
313                    .into_iter()
314                    .take(limit_i64 as usize)
315                    .map(|r| query_result_to_json(&r, self.resource).map(ResourceRow))
316                    .collect::<Result<Vec<_>, _>>()?;
317                let cursor = result_rows
318                    .last()
319                    .and_then(|r| r.0.get("id"))
320                    .and_then(|v| v.as_str())
321                    .map(encode_cursor);
322                let meta = serde_json::json!({ "cursor": cursor, "has_more": has_more });
323                Ok((result_rows, meta))
324            }
325            PageRequest::Offset { offset, limit } => {
326                sql.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
327                let values = sea_query::Values(values_vec);
328                let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
329                let span = telemetry::db_span("orm_find_all", &self.resource.resource, "SELECT");
330                let _enter = span.enter();
331                let rows =
332                    self.connection.inner.query_all(stmt).await.map_err(|e| {
333                        ShaperailError::Internal(format!("ORM find_all failed: {e}"))
334                    })?;
335                let result_rows: Vec<ResourceRow> = rows
336                    .into_iter()
337                    .map(|r| query_result_to_json(&r, self.resource).map(ResourceRow))
338                    .collect::<Result<Vec<_>, _>>()?;
339                let total = result_rows.len() as i64;
340                let meta = serde_json::json!({
341                    "offset": offset,
342                    "limit": limit,
343                    "total": total
344                });
345                Ok((result_rows, meta))
346            }
347        }
348    }
349
350    /// Insert a record. Uses RETURNING for Postgres/SQLite; for MySQL does INSERT + SELECT.
351    pub async fn insert(
352        &self,
353        data: &serde_json::Map<String, serde_json::Value>,
354    ) -> Result<ResourceRow, ShaperailError> {
355        let mut columns = Vec::new();
356        let mut placeholders = Vec::new();
357        let mut values_vec = Vec::new();
358        let mut param = 1usize;
359        let mut generated_id: Option<String> = None;
360
361        for (name, field) in &self.resource.schema {
362            if field.generated {
363                match field.field_type {
364                    FieldType::Uuid => {
365                        let id = uuid::Uuid::new_v4();
366                        columns.push(self.qi(name));
367                        placeholders.push(self.param(param));
368                        values_vec.push(sea_query::Value::String(Some(Box::new(id.to_string()))));
369                        if field.primary {
370                            generated_id = Some(id.to_string());
371                        }
372                        param += 1;
373                    }
374                    FieldType::Timestamp => {
375                        let now = chrono::Utc::now().to_rfc3339();
376                        columns.push(self.qi(name));
377                        placeholders.push(self.param(param));
378                        values_vec.push(sea_query::Value::String(Some(Box::new(now))));
379                        param += 1;
380                    }
381                    _ => {}
382                }
383                continue;
384            }
385            if let Some(value) = data.get(name) {
386                columns.push(self.qi(name));
387                placeholders.push(self.param(param));
388                values_vec.push(json_to_sea_value(value, field));
389                param += 1;
390            } else if let Some(default) = &field.default {
391                columns.push(self.qi(name));
392                placeholders.push(self.param(param));
393                values_vec.push(json_to_sea_value(default, field));
394                param += 1;
395            }
396        }
397
398        if self.supports_returning() {
399            let sql = format!(
400                "INSERT INTO {} ({}) VALUES ({}) RETURNING {}",
401                self.table(),
402                columns.join(", "),
403                placeholders.join(", "),
404                self.select_columns(),
405            );
406            let values = sea_query::Values(values_vec);
407            let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
408            let span = telemetry::db_span("orm_insert", &self.resource.resource, "INSERT");
409            let _enter = span.enter();
410            let rows = self
411                .connection
412                .inner
413                .query_all(stmt)
414                .await
415                .map_err(|e| ShaperailError::Internal(format!("ORM insert failed: {e}")))?;
416            let row = rows
417                .into_iter()
418                .next()
419                .ok_or_else(|| ShaperailError::Internal("Insert returned no rows".to_string()))?;
420            let json = query_result_to_json(&row, self.resource)?;
421            Ok(ResourceRow(json))
422        } else {
423            // MySQL: INSERT then SELECT back.
424            let sql = format!(
425                "INSERT INTO {} ({}) VALUES ({})",
426                self.table(),
427                columns.join(", "),
428                placeholders.join(", "),
429            );
430            let values = sea_query::Values(values_vec);
431            let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
432            let span = telemetry::db_span("orm_insert", &self.resource.resource, "INSERT");
433            let _enter = span.enter();
434            self.connection
435                .inner
436                .execute(stmt)
437                .await
438                .map_err(|e| ShaperailError::Internal(format!("ORM insert failed: {e}")))?;
439            // Fetch back via generated ID.
440            if let Some(id_str) = generated_id {
441                let id = uuid::Uuid::parse_str(&id_str).map_err(|e| {
442                    ShaperailError::Internal(format!("Generated UUID parse error: {e}"))
443                })?;
444                self.find_by_id(&id).await
445            } else {
446                // Fallback: use LAST_INSERT_ID for MySQL auto-increment (rare in Shaperail).
447                Err(ShaperailError::Internal(
448                    "MySQL insert without generated UUID not supported".to_string(),
449                ))
450            }
451        }
452    }
453
454    /// Update by primary key.
455    pub async fn update_by_id(
456        &self,
457        id: &uuid::Uuid,
458        data: &serde_json::Map<String, serde_json::Value>,
459    ) -> Result<ResourceRow, ShaperailError> {
460        let mut set_parts = Vec::new();
461        let mut values_vec = Vec::new();
462        let mut param = 1usize;
463
464        for (name, value) in data {
465            if let Some(field) = self.resource.schema.get(name) {
466                if field.primary || field.generated {
467                    continue;
468                }
469                let p = self.param(param);
470                set_parts.push(format!("{} = {p}", self.qi(name)));
471                values_vec.push(json_to_sea_value(value, field));
472                param += 1;
473            }
474        }
475        if self.resource.schema.contains_key("updated_at") {
476            let p = self.param(param);
477            let now = chrono::Utc::now().to_rfc3339();
478            set_parts.push(format!("{} = {p}", self.qi("updated_at")));
479            values_vec.push(sea_query::Value::String(Some(Box::new(now))));
480            param += 1;
481        }
482        if set_parts.is_empty() {
483            return Err(ShaperailError::Validation(vec![
484                shaperail_core::FieldError {
485                    field: "body".to_string(),
486                    message: "No valid fields to update".to_string(),
487                    code: "empty_update".to_string(),
488                },
489            ]));
490        }
491        let pk = self.primary_key();
492        let soft = if self.has_soft_delete() {
493            format!(" AND {} IS NULL", self.qi("deleted_at"))
494        } else {
495            String::new()
496        };
497        let p = self.param(param);
498        values_vec.push(sea_query::Value::String(Some(Box::new(id.to_string()))));
499
500        if self.supports_returning() {
501            let sql = format!(
502                "UPDATE {} SET {} WHERE {} = {p}{soft} RETURNING {}",
503                self.table(),
504                set_parts.join(", "),
505                self.qi(pk),
506                self.select_columns(),
507            );
508            let values = sea_query::Values(values_vec);
509            let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
510            let span = telemetry::db_span("orm_update", &self.resource.resource, "UPDATE");
511            let _enter = span.enter();
512            let rows = self
513                .connection
514                .inner
515                .query_all(stmt)
516                .await
517                .map_err(|e| ShaperailError::Internal(format!("ORM update failed: {e}")))?;
518            rows.into_iter()
519                .next()
520                .ok_or(ShaperailError::NotFound)
521                .and_then(|row| query_result_to_json(&row, self.resource).map(ResourceRow))
522        } else {
523            // MySQL: UPDATE then SELECT back.
524            let sql = format!(
525                "UPDATE {} SET {} WHERE {} = {p}{soft}",
526                self.table(),
527                set_parts.join(", "),
528                self.qi(pk),
529            );
530            let values = sea_query::Values(values_vec);
531            let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
532            let span = telemetry::db_span("orm_update", &self.resource.resource, "UPDATE");
533            let _enter = span.enter();
534            let result = self
535                .connection
536                .inner
537                .execute(stmt)
538                .await
539                .map_err(|e| ShaperailError::Internal(format!("ORM update failed: {e}")))?;
540            if result.rows_affected() == 0 {
541                return Err(ShaperailError::NotFound);
542            }
543            self.find_by_id(id).await
544        }
545    }
546
547    /// Soft-delete by setting deleted_at.
548    pub async fn soft_delete_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
549        let pk = self.primary_key();
550        let p1 = self.param(1);
551        let p2 = self.param(2);
552        let now = chrono::Utc::now().to_rfc3339();
553
554        if self.supports_returning() {
555            let sql = format!(
556                "UPDATE {} SET {} = {p1} WHERE {} = {p2} AND {} IS NULL RETURNING {}",
557                self.table(),
558                self.qi("deleted_at"),
559                self.qi(pk),
560                self.qi("deleted_at"),
561                self.select_columns(),
562            );
563            let values = sea_query::Values(vec![
564                sea_query::Value::String(Some(Box::new(now))),
565                sea_query::Value::String(Some(Box::new(id.to_string()))),
566            ]);
567            let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
568            let span = telemetry::db_span("orm_soft_delete", &self.resource.resource, "UPDATE");
569            let _enter = span.enter();
570            let rows =
571                self.connection.inner.query_all(stmt).await.map_err(|e| {
572                    ShaperailError::Internal(format!("ORM soft_delete failed: {e}"))
573                })?;
574            rows.into_iter()
575                .next()
576                .ok_or(ShaperailError::NotFound)
577                .and_then(|row| query_result_to_json(&row, self.resource).map(ResourceRow))
578        } else {
579            // MySQL: UPDATE then SELECT.
580            let sql = format!(
581                "UPDATE {} SET {} = {p1} WHERE {} = {p2} AND {} IS NULL",
582                self.table(),
583                self.qi("deleted_at"),
584                self.qi(pk),
585                self.qi("deleted_at"),
586            );
587            let values = sea_query::Values(vec![
588                sea_query::Value::String(Some(Box::new(now))),
589                sea_query::Value::String(Some(Box::new(id.to_string()))),
590            ]);
591            let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
592            let span = telemetry::db_span("orm_soft_delete", &self.resource.resource, "UPDATE");
593            let _enter = span.enter();
594            let result =
595                self.connection.inner.execute(stmt).await.map_err(|e| {
596                    ShaperailError::Internal(format!("ORM soft_delete failed: {e}"))
597                })?;
598            if result.rows_affected() == 0 {
599                return Err(ShaperailError::NotFound);
600            }
601            self.find_by_id(id).await
602        }
603    }
604
605    /// Hard-delete by primary key.
606    pub async fn hard_delete_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
607        let row = self.find_by_id(id).await?;
608        let pk = self.primary_key();
609        let p1 = self.param(1);
610
611        if self.supports_returning() {
612            let sql = format!(
613                "DELETE FROM {} WHERE {} = {p1} RETURNING {}",
614                self.table(),
615                self.qi(pk),
616                self.select_columns(),
617            );
618            let values = sea_query::Values(vec![sea_query::Value::String(Some(Box::new(
619                id.to_string(),
620            )))]);
621            let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
622            let span = telemetry::db_span("orm_hard_delete", &self.resource.resource, "DELETE");
623            let _enter = span.enter();
624            self.connection
625                .inner
626                .execute(stmt)
627                .await
628                .map_err(|e| ShaperailError::Internal(format!("ORM hard_delete failed: {e}")))?;
629        } else {
630            // MySQL: just DELETE.
631            let sql = format!("DELETE FROM {} WHERE {} = {p1}", self.table(), self.qi(pk),);
632            let values = sea_query::Values(vec![sea_query::Value::String(Some(Box::new(
633                id.to_string(),
634            )))]);
635            let stmt = sea_orm::Statement::from_sql_and_values(self.backend(), sql, values);
636            let span = telemetry::db_span("orm_hard_delete", &self.resource.resource, "DELETE");
637            let _enter = span.enter();
638            self.connection
639                .inner
640                .execute(stmt)
641                .await
642                .map_err(|e| ShaperailError::Internal(format!("ORM hard_delete failed: {e}")))?;
643        }
644        Ok(row)
645    }
646}