Skip to main content

architect_sdk/service/
crud.rs

1//! Generic CRUD execution against PostgreSQL.
2
3use crate::config::ResolvedEntity;
4use crate::db::pool::{Connection, DbRow, Pool};
5use crate::db::Dialect;
6use crate::error::AppError;
7use crate::sql::{
8    archive, coerce_json_value_for_pg_array, delete, insert, select_by_column_in, select_by_id,
9    select_list, select_list_with_includes, unarchive, update, BindValue, FilterNode,
10    IncludeSelect, QueryBuf, SortSpec,
11};
12use serde_json::Value;
13use std::collections::HashMap;
14
15/// Execution target: either a pool (for database/schema strategy) or a single connection (for RLS, with SET LOCAL already applied).
16pub enum TenantExecutorInner<'a> {
17    Pool(&'a Pool),
18    Conn(&'a mut Connection),
19}
20
21pub struct TenantExecutor<'a> {
22    pub executor: TenantExecutorInner<'a>,
23    pub dialect: &'a dyn crate::db::Dialect,
24}
25
26impl<'a> TenantExecutor<'a> {
27    pub fn pool(pool: &'a Pool, dialect: &'a dyn crate::db::Dialect) -> Self {
28        TenantExecutor {
29            executor: TenantExecutorInner::Pool(pool),
30            dialect,
31        }
32    }
33    pub fn conn(conn: &'a mut Connection, dialect: &'a dyn crate::db::Dialect) -> Self {
34        TenantExecutor {
35            executor: TenantExecutorInner::Conn(conn),
36            dialect,
37        }
38    }
39}
40
41pub struct CrudService;
42
43impl CrudService {
44    /// List rows with optional RSQL filter and sort, limit (default 100, max 1000), offset (default 0).
45    /// `filter_includes` supplies related-entity metadata for dotted-field EXISTS filters; pass `&[]` when unused.
46    #[allow(clippy::too_many_arguments)]
47    pub async fn list<'a>(
48        executor: &mut TenantExecutor<'a>,
49        entity: &ResolvedEntity,
50        filter: Option<&FilterNode>,
51        sort: &[SortSpec],
52        limit: Option<u32>,
53        offset: Option<u32>,
54        filter_includes: &[IncludeSelect<'_>],
55        schema_override: Option<&str>,
56        dialect: &dyn Dialect,
57    ) -> Result<Vec<Value>, AppError> {
58        const DEFAULT_LIMIT: u32 = 100;
59        let limit = limit.unwrap_or(DEFAULT_LIMIT).min(1000);
60        let offset = offset.unwrap_or(0);
61        let q = select_list(
62            entity,
63            filter,
64            sort,
65            Some(limit),
66            Some(offset),
67            filter_includes,
68            schema_override,
69            dialect,
70        )?;
71        Self::query_many_exec(executor, &q.sql, &q.params).await
72    }
73
74    /// List rows with includes in a single query (scalar subqueries with json_agg/row_to_json). Returns rows with include keys already set (JSON).
75    /// `includes` drives scalar subqueries for response data; `filter_includes` is the superset used for EXISTS generation.
76    #[allow(clippy::too_many_arguments)]
77    pub async fn list_with_includes<'a>(
78        executor: &mut TenantExecutor<'a>,
79        entity: &ResolvedEntity,
80        filter: Option<&FilterNode>,
81        sort: &[SortSpec],
82        limit: Option<u32>,
83        offset: Option<u32>,
84        includes: &[IncludeSelect<'_>],
85        filter_includes: &[IncludeSelect<'_>],
86        schema_override: Option<&str>,
87        dialect: &dyn Dialect,
88    ) -> Result<Vec<Value>, AppError> {
89        const DEFAULT_LIMIT: u32 = 100;
90        let limit = limit.unwrap_or(DEFAULT_LIMIT).min(1000);
91        let offset = offset.unwrap_or(0);
92        let q = select_list_with_includes(
93            entity,
94            filter,
95            sort,
96            Some(limit),
97            Some(offset),
98            includes,
99            filter_includes,
100            schema_override,
101            dialect,
102        )?;
103        Self::query_many_exec(executor, &q.sql, &q.params).await
104    }
105
106    /// Fetch one row by primary key. Returns JSON object or None.
107    pub async fn read<'a>(
108        executor: &mut TenantExecutor<'a>,
109        entity: &ResolvedEntity,
110        id: &Value,
111        schema_override: Option<&str>,
112        dialect: &dyn Dialect,
113    ) -> Result<Option<Value>, AppError> {
114        let q = select_by_id(entity, schema_override, dialect);
115        Self::query_one_exec(executor, &q.sql, std::slice::from_ref(id)).await
116    }
117
118    /// Fetch rows from entity where column IN (values). Used for batch-loading related rows.
119    pub async fn fetch_where_column_in<'a>(
120        executor: &mut TenantExecutor<'a>,
121        entity: &ResolvedEntity,
122        column_name: &str,
123        values: &[Value],
124        schema_override: Option<&str>,
125        dialect: &dyn Dialect,
126    ) -> Result<Vec<Value>, AppError> {
127        if values.is_empty() {
128            return Ok(Vec::new());
129        }
130        let q = select_by_column_in(entity, column_name, values, schema_override, dialect);
131        Self::query_many_exec(executor, &q.sql, &q.params).await
132    }
133
134    /// Insert one row; body may include or omit PK (if has default). Returns created row.
135    /// When rls_tenant_id is Some (RLS strategy), tenant_id column is set automatically.
136    /// When caller_user_id is Some, created_by is set to that value.
137    pub async fn create<'a>(
138        executor: &mut TenantExecutor<'a>,
139        entity: &ResolvedEntity,
140        body: &HashMap<String, Value>,
141        schema_override: Option<&str>,
142        rls_tenant_id: Option<&str>,
143        caller_user_id: Option<&str>,
144        dialect: &dyn Dialect,
145    ) -> Result<Value, AppError> {
146        let include_pk = body.contains_key(&entity.pk_columns[0]);
147        let q = insert(
148            entity,
149            body,
150            include_pk,
151            schema_override,
152            rls_tenant_id,
153            caller_user_id,
154            dialect,
155        );
156        let row = Self::execute_returning_one_exec(executor, &q)
157            .await?
158            .ok_or_else(|| AppError::Db(sqlx::Error::RowNotFound))?;
159        if entity.audit_log {
160            Self::insert_audit(
161                executor,
162                entity,
163                "create",
164                &row,
165                None,
166                caller_user_id,
167                schema_override,
168            )
169            .await?;
170        }
171        Ok(row)
172    }
173
174    /// Update one row by id. Returns updated row.
175    /// When caller_user_id is Some, updated_by is set to that value.
176    pub async fn update<'a>(
177        executor: &mut TenantExecutor<'a>,
178        entity: &ResolvedEntity,
179        id: &Value,
180        body: &HashMap<String, Value>,
181        schema_override: Option<&str>,
182        caller_user_id: Option<&str>,
183        dialect: &dyn Dialect,
184    ) -> Result<Option<Value>, AppError> {
185        let pre_row = if entity.audit_log {
186            let q = select_by_id(entity, schema_override, dialect);
187            Self::query_one_exec(executor, &q.sql, std::slice::from_ref(id)).await?
188        } else {
189            None
190        };
191        let q = update(entity, id, body, schema_override, caller_user_id, dialect);
192        let result = Self::execute_returning_one_exec(executor, &q).await?;
193        if entity.audit_log {
194            if let Some(ref post_row) = result {
195                Self::insert_audit(
196                    executor,
197                    entity,
198                    "update",
199                    post_row,
200                    pre_row.as_ref(),
201                    caller_user_id,
202                    schema_override,
203                )
204                .await?;
205            }
206        }
207        Ok(result)
208    }
209
210    /// Delete one row by id. Returns deleted row or None.
211    /// When caller_user_id is Some, audit_by is set on the audit record.
212    pub async fn delete<'a>(
213        executor: &mut TenantExecutor<'a>,
214        entity: &ResolvedEntity,
215        id: &Value,
216        schema_override: Option<&str>,
217        caller_user_id: Option<&str>,
218        dialect: &dyn Dialect,
219    ) -> Result<Option<Value>, AppError> {
220        let q = delete(entity, schema_override, dialect);
221        let result = Self::execute_returning_one_with_params_exec(
222            executor,
223            &q.sql,
224            std::slice::from_ref(id),
225        )
226        .await?;
227        if entity.audit_log {
228            if let Some(ref deleted_row) = result {
229                Self::insert_audit(
230                    executor,
231                    entity,
232                    "delete",
233                    deleted_row,
234                    None,
235                    caller_user_id,
236                    schema_override,
237                )
238                .await?;
239            }
240        }
241        Ok(result)
242    }
243
244    /// Archive one row by id: stamps archive_field with NOW() if it is currently NULL.
245    /// Returns the updated row, or None if the record was not found or already archived.
246    pub async fn archive<'a>(
247        executor: &mut TenantExecutor<'a>,
248        entity: &ResolvedEntity,
249        archive_field: &str,
250        id: &Value,
251        schema_override: Option<&str>,
252        dialect: &dyn Dialect,
253    ) -> Result<Option<Value>, AppError> {
254        let q = archive(entity, archive_field, schema_override, dialect);
255        Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
256            .await
257    }
258
259    /// Unarchive one row by id: clears archive_field (sets to NULL) if it is currently NOT NULL.
260    /// Returns the updated row, or None if the record was not found or not archived.
261    pub async fn unarchive<'a>(
262        executor: &mut TenantExecutor<'a>,
263        entity: &ResolvedEntity,
264        archive_field: &str,
265        id: &Value,
266        schema_override: Option<&str>,
267        dialect: &dyn Dialect,
268    ) -> Result<Option<Value>, AppError> {
269        let q = unarchive(entity, archive_field, schema_override, dialect);
270        Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
271            .await
272    }
273
274    /// Bulk create in a transaction (when using pool) or on the same connection (when using conn). Returns vec of created rows.
275    /// When rls_tenant_id is Some (RLS strategy), tenant_id column is set automatically on each row.
276    /// When caller_user_id is Some, created_by is set on each row.
277    pub async fn bulk_create<'a>(
278        executor: &mut TenantExecutor<'a>,
279        entity: &ResolvedEntity,
280        items: &[HashMap<String, Value>],
281        schema_override: Option<&str>,
282        rls_tenant_id: Option<&str>,
283        caller_user_id: Option<&str>,
284        dialect: &dyn Dialect,
285    ) -> Result<Vec<Value>, AppError> {
286        const BULK_LIMIT: usize = 100;
287        if items.len() > BULK_LIMIT {
288            return Err(AppError::BadRequest(format!(
289                "bulk create limited to {} items",
290                BULK_LIMIT
291            )));
292        }
293        let mut out = Vec::with_capacity(items.len());
294        match executor.executor {
295            TenantExecutorInner::Pool(pool) => {
296                let mut tx = pool.begin().await?;
297                for body in items {
298                    let include_pk = body.contains_key(&entity.pk_columns[0]);
299                    let q = insert(
300                        entity,
301                        body,
302                        include_pk,
303                        schema_override,
304                        rls_tenant_id,
305                        caller_user_id,
306                        dialect,
307                    );
308                    let row = Self::execute_returning_one_tx(&mut tx, &q)
309                        .await?
310                        .unwrap_or(Value::Null);
311                    out.push(row);
312                }
313                tx.commit().await?;
314            }
315            TenantExecutorInner::Conn(ref mut conn) => {
316                for body in items {
317                    let include_pk = body.contains_key(&entity.pk_columns[0]);
318                    let q = insert(
319                        entity,
320                        body,
321                        include_pk,
322                        schema_override,
323                        rls_tenant_id,
324                        caller_user_id,
325                        dialect,
326                    );
327                    let row = Self::execute_returning_one_conn(conn, &q)
328                        .await?
329                        .unwrap_or(Value::Null);
330                    out.push(row);
331                }
332            }
333        }
334        Ok(out)
335    }
336
337    /// Like `bulk_create` but uses savepoints to isolate per-row DB errors.
338    /// Returns `(successful_rows, row_errors)`. If any errors occur the transaction is
339    /// rolled back and successful_rows will be empty — call site decides how to surface errors.
340    pub async fn bulk_create_collecting<'a>(
341        executor: &mut TenantExecutor<'a>,
342        entity: &ResolvedEntity,
343        items: &[HashMap<String, Value>],
344        schema_override: Option<&str>,
345        rls_tenant_id: Option<&str>,
346        caller_user_id: Option<&str>,
347        dialect: &dyn Dialect,
348    ) -> Result<(Vec<Value>, Vec<(usize, AppError)>), AppError> {
349        const BULK_LIMIT: usize = 100;
350        if items.len() > BULK_LIMIT {
351            return Err(AppError::BadRequest(format!(
352                "bulk create limited to {} items",
353                BULK_LIMIT
354            )));
355        }
356        let mut out = Vec::with_capacity(items.len());
357        let mut row_errors: Vec<(usize, AppError)> = Vec::new();
358        match executor.executor {
359            TenantExecutorInner::Pool(pool) => {
360                let mut tx = pool.begin().await?;
361                for (idx, body) in items.iter().enumerate() {
362                    let sp = format!("sp_{}", idx);
363                    sqlx::query(&format!("SAVEPOINT {}", sp))
364                        .execute(&mut *tx)
365                        .await?;
366                    let include_pk = body.contains_key(&entity.pk_columns[0]);
367                    let q = insert(
368                        entity,
369                        body,
370                        include_pk,
371                        schema_override,
372                        rls_tenant_id,
373                        caller_user_id,
374                        dialect,
375                    );
376                    match Self::execute_returning_one_tx(&mut tx, &q).await {
377                        Ok(row) => {
378                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
379                                .execute(&mut *tx)
380                                .await?;
381                            out.push(row.unwrap_or(Value::Null));
382                        }
383                        Err(e) => {
384                            sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
385                                .execute(&mut *tx)
386                                .await?;
387                            row_errors.push((idx, e));
388                        }
389                    }
390                }
391                if row_errors.is_empty() {
392                    tx.commit().await?;
393                } else {
394                    tx.rollback().await?;
395                    out.clear();
396                }
397            }
398            TenantExecutorInner::Conn(ref mut conn) => {
399                for (idx, body) in items.iter().enumerate() {
400                    let sp = format!("sp_{}", idx);
401                    sqlx::query(&format!("SAVEPOINT {}", sp))
402                        .execute(&mut **conn)
403                        .await?;
404                    let include_pk = body.contains_key(&entity.pk_columns[0]);
405                    let q = insert(
406                        entity,
407                        body,
408                        include_pk,
409                        schema_override,
410                        rls_tenant_id,
411                        caller_user_id,
412                        dialect,
413                    );
414                    match Self::execute_returning_one_conn(conn, &q).await {
415                        Ok(row) => {
416                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
417                                .execute(&mut **conn)
418                                .await?;
419                            out.push(row.unwrap_or(Value::Null));
420                        }
421                        Err(e) => {
422                            sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
423                                .execute(&mut **conn)
424                                .await?;
425                            row_errors.push((idx, e));
426                        }
427                    }
428                }
429                if !row_errors.is_empty() {
430                    out.clear();
431                }
432            }
433        }
434        Ok((out, row_errors))
435    }
436
437    /// Bulk update in a transaction (when using pool) or on the same connection (when using conn). Each item must have id. Returns vec of updated rows.
438    /// When caller_user_id is Some, updated_by is set on each row.
439    pub async fn bulk_update<'a>(
440        executor: &mut TenantExecutor<'a>,
441        entity: &ResolvedEntity,
442        items: &[HashMap<String, Value>],
443        schema_override: Option<&str>,
444        caller_user_id: Option<&str>,
445        dialect: &dyn Dialect,
446    ) -> Result<Vec<Value>, AppError> {
447        const BULK_LIMIT: usize = 100;
448        if items.len() > BULK_LIMIT {
449            return Err(AppError::BadRequest(format!(
450                "bulk update limited to {} items",
451                BULK_LIMIT
452            )));
453        }
454        let pk = &entity.pk_columns[0];
455        let mut out = Vec::with_capacity(items.len());
456        match executor.executor {
457            TenantExecutorInner::Pool(pool) => {
458                let mut tx = pool.begin().await?;
459                for body in items {
460                    let id = body.get(pk).ok_or_else(|| {
461                        AppError::Validation(format!("each item must have '{}'", pk))
462                    })?;
463                    let mut body_without_pk = body.clone();
464                    body_without_pk.remove(pk);
465                    let q = update(
466                        entity,
467                        id,
468                        &body_without_pk,
469                        schema_override,
470                        caller_user_id,
471                        dialect,
472                    );
473                    if let Some(row) = Self::execute_returning_one_tx(&mut tx, &q).await? {
474                        out.push(row);
475                    }
476                }
477                tx.commit().await?;
478            }
479            TenantExecutorInner::Conn(ref mut conn) => {
480                for body in items {
481                    let id = body.get(pk).ok_or_else(|| {
482                        AppError::Validation(format!("each item must have '{}'", pk))
483                    })?;
484                    let mut body_without_pk = body.clone();
485                    body_without_pk.remove(pk);
486                    let q = update(
487                        entity,
488                        id,
489                        &body_without_pk,
490                        schema_override,
491                        caller_user_id,
492                        dialect,
493                    );
494                    if let Some(row) = Self::execute_returning_one_conn(conn, &q).await? {
495                        out.push(row);
496                    }
497                }
498            }
499        }
500        Ok(out)
501    }
502
503    /// Like `bulk_update` but uses savepoints to isolate per-row DB errors.
504    /// Missing pk on an item is recorded as a row error rather than aborting early.
505    /// Returns `(successful_rows, row_errors)`. If any errors occur the transaction is
506    /// rolled back and successful_rows will be empty.
507    pub async fn bulk_update_collecting<'a>(
508        executor: &mut TenantExecutor<'a>,
509        entity: &ResolvedEntity,
510        items: &[HashMap<String, Value>],
511        schema_override: Option<&str>,
512        caller_user_id: Option<&str>,
513        dialect: &dyn Dialect,
514    ) -> Result<(Vec<Value>, Vec<(usize, AppError)>), AppError> {
515        const BULK_LIMIT: usize = 100;
516        if items.len() > BULK_LIMIT {
517            return Err(AppError::BadRequest(format!(
518                "bulk update limited to {} items",
519                BULK_LIMIT
520            )));
521        }
522        let pk = entity.pk_columns[0].clone();
523        let mut out = Vec::with_capacity(items.len());
524        let mut row_errors: Vec<(usize, AppError)> = Vec::new();
525        match executor.executor {
526            TenantExecutorInner::Pool(pool) => {
527                let mut tx = pool.begin().await?;
528                for (idx, body) in items.iter().enumerate() {
529                    let id = match body.get(&pk) {
530                        Some(id) => id.clone(),
531                        None => {
532                            row_errors.push((
533                                idx,
534                                AppError::Validation(format!("each item must have '{}'", pk)),
535                            ));
536                            continue;
537                        }
538                    };
539                    let sp = format!("sp_{}", idx);
540                    sqlx::query(&format!("SAVEPOINT {}", sp))
541                        .execute(&mut *tx)
542                        .await?;
543                    let mut body_without_pk = body.clone();
544                    body_without_pk.remove(&pk);
545                    let q = update(
546                        entity,
547                        &id,
548                        &body_without_pk,
549                        schema_override,
550                        caller_user_id,
551                        dialect,
552                    );
553                    match Self::execute_returning_one_tx(&mut tx, &q).await {
554                        Ok(Some(row)) => {
555                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
556                                .execute(&mut *tx)
557                                .await?;
558                            out.push(row);
559                        }
560                        Ok(None) => {
561                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
562                                .execute(&mut *tx)
563                                .await?;
564                        }
565                        Err(e) => {
566                            sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
567                                .execute(&mut *tx)
568                                .await?;
569                            row_errors.push((idx, e));
570                        }
571                    }
572                }
573                if row_errors.is_empty() {
574                    tx.commit().await?;
575                } else {
576                    tx.rollback().await?;
577                    out.clear();
578                }
579            }
580            TenantExecutorInner::Conn(ref mut conn) => {
581                for (idx, body) in items.iter().enumerate() {
582                    let id = match body.get(&pk) {
583                        Some(id) => id.clone(),
584                        None => {
585                            row_errors.push((
586                                idx,
587                                AppError::Validation(format!("each item must have '{}'", pk)),
588                            ));
589                            continue;
590                        }
591                    };
592                    let sp = format!("sp_{}", idx);
593                    sqlx::query(&format!("SAVEPOINT {}", sp))
594                        .execute(&mut **conn)
595                        .await?;
596                    let mut body_without_pk = body.clone();
597                    body_without_pk.remove(&pk);
598                    let q = update(
599                        entity,
600                        &id,
601                        &body_without_pk,
602                        schema_override,
603                        caller_user_id,
604                        dialect,
605                    );
606                    match Self::execute_returning_one_conn(conn, &q).await {
607                        Ok(Some(row)) => {
608                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
609                                .execute(&mut **conn)
610                                .await?;
611                            out.push(row);
612                        }
613                        Ok(None) => {
614                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
615                                .execute(&mut **conn)
616                                .await?;
617                        }
618                        Err(e) => {
619                            sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
620                                .execute(&mut **conn)
621                                .await?;
622                            row_errors.push((idx, e));
623                        }
624                    }
625                }
626                if !row_errors.is_empty() {
627                    out.clear();
628                }
629            }
630        }
631        Ok((out, row_errors))
632    }
633
634    async fn query_one_exec<'a>(
635        executor: &mut TenantExecutor<'a>,
636        sql: &str,
637        params: &[Value],
638    ) -> Result<Option<Value>, AppError> {
639        tracing::debug!(sql = %sql, params = ?params, "query");
640        let bind = Self::to_sqlx_param(&params[0]);
641        let row = match executor.executor {
642            TenantExecutorInner::Pool(pool) => {
643                sqlx::query(sql).bind(bind).fetch_optional(pool).await?
644            }
645            TenantExecutorInner::Conn(ref mut conn) => {
646                sqlx::query(sql)
647                    .bind(bind)
648                    .fetch_optional(&mut **conn)
649                    .await?
650            }
651        };
652        Ok(row.map(|r| row_to_json(&r)))
653    }
654
655    async fn query_many_exec<'a>(
656        executor: &mut TenantExecutor<'a>,
657        sql: &str,
658        params: &[Value],
659    ) -> Result<Vec<Value>, AppError> {
660        tracing::debug!(sql = %sql, params = ?params, "query");
661        let mut query = sqlx::query(sql);
662        for p in params {
663            query = query.bind(Self::to_sqlx_param(p));
664        }
665        let rows = match executor.executor {
666            TenantExecutorInner::Pool(pool) => query.fetch_all(pool).await?,
667            TenantExecutorInner::Conn(ref mut conn) => query.fetch_all(&mut **conn).await?,
668        };
669        Ok(rows.iter().map(row_to_json).collect())
670    }
671
672    async fn execute_returning_one_exec<'a>(
673        executor: &mut TenantExecutor<'a>,
674        q: &QueryBuf,
675    ) -> Result<Option<Value>, AppError> {
676        tracing::debug!(sql = %q.sql, params = ?q.params, "query");
677        let mut query = sqlx::query(&q.sql);
678        for p in &q.params {
679            query = query.bind(Self::to_sqlx_param(p));
680        }
681        let row = match executor.executor {
682            TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
683            TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
684        };
685        Ok(row.map(|r| row_to_json(&r)))
686    }
687
688    async fn execute_returning_one_with_params_exec<'a>(
689        executor: &mut TenantExecutor<'a>,
690        sql: &str,
691        params: &[Value],
692    ) -> Result<Option<Value>, AppError> {
693        tracing::debug!(sql = %sql, params = ?params, "query");
694        let mut query = sqlx::query(sql);
695        for p in params {
696            query = query.bind(Self::to_sqlx_param(p));
697        }
698        let row = match executor.executor {
699            TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
700            TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
701        };
702        Ok(row.map(|r| row_to_json(&r)))
703    }
704
705    async fn execute_returning_one_conn(
706        conn: &mut Connection,
707        q: &QueryBuf,
708    ) -> Result<Option<Value>, AppError> {
709        tracing::debug!(sql = %q.sql, params = ?q.params, "query (conn)");
710        let mut query = sqlx::query(&q.sql);
711        for p in &q.params {
712            query = query.bind(Self::to_sqlx_param(p));
713        }
714        let row = query.fetch_optional(conn).await?;
715        Ok(row.map(|r| row_to_json(&r)))
716    }
717
718    async fn execute_returning_one_tx(
719        tx: &mut Connection,
720        q: &QueryBuf,
721    ) -> Result<Option<Value>, AppError> {
722        tracing::debug!(sql = %q.sql, params = ?q.params, "query (tx)");
723        let mut query = sqlx::query(&q.sql);
724        for p in &q.params {
725            query = query.bind(Self::to_sqlx_param(p));
726        }
727        let row = query.fetch_optional(&mut *tx).await?;
728        Ok(row.map(|r| row_to_json(&r)))
729    }
730
731    fn to_sqlx_param(v: &Value) -> BindValue {
732        BindValue::from_json(v).unwrap_or(BindValue::Null)
733    }
734
735    async fn insert_audit<'a>(
736        executor: &mut TenantExecutor<'a>,
737        entity: &ResolvedEntity,
738        action: &str,
739        row: &Value,
740        pre_row: Option<&Value>,
741        audit_by: Option<&str>,
742        schema_override: Option<&str>,
743    ) -> Result<(), AppError> {
744        let schema = schema_override.unwrap_or(&entity.schema_name);
745        let audit_table = format!(
746            "\"{}\".\"{}\"",
747            schema.replace('"', "\"\""),
748            format!("{}_audit", entity.table_name).replace('"', "\"\"")
749        );
750
751        let changed = if action == "update" {
752            pre_row.map(|pre| compute_changed_fields(pre, row, entity))
753        } else {
754            None
755        };
756
757        let mut col_names: Vec<String> = vec![
758            "\"audit_action\"".to_string(),
759            "\"audit_by\"".to_string(),
760            "\"changed_fields\"".to_string(),
761        ];
762        let mut placeholders: Vec<String> = Vec::new();
763        let mut params: Vec<Value> = Vec::new();
764
765        params.push(Value::String(action.to_string()));
766        placeholders.push(format!("${}", params.len()));
767
768        params.push(
769            audit_by
770                .map(|s| Value::String(s.to_string()))
771                .unwrap_or(Value::Null),
772        );
773        placeholders.push(format!("${}", params.len()));
774
775        params.push(changed.unwrap_or(Value::Null));
776        placeholders.push(format!("${}::jsonb", params.len()));
777
778        let row_obj = row.as_object();
779        for col in &entity.columns {
780            let raw = row_obj
781                .and_then(|o| o.get(&col.name))
782                .cloned()
783                .unwrap_or(Value::Null);
784            let val = coerce_json_value_for_pg_array(raw, col.pg_type.as_deref());
785            let param_num = params.len() + 1;
786            let ph = col
787                .pg_type
788                .as_deref()
789                .map(|t| format!("${}::{}", param_num, t))
790                .unwrap_or_else(|| format!("${}", param_num));
791            col_names.push(format!("\"{}\"", col.name));
792            placeholders.push(ph);
793            params.push(val);
794        }
795
796        let sql = format!(
797            "INSERT INTO {} ({}) VALUES ({})",
798            audit_table,
799            col_names.join(", "),
800            placeholders.join(", ")
801        );
802        tracing::debug!(sql = %sql, "audit insert");
803
804        let mut query = sqlx::query(&sql);
805        for p in &params {
806            query = query.bind(Self::to_sqlx_param(p));
807        }
808        match executor.executor {
809            TenantExecutorInner::Pool(pool) => {
810                query.execute(pool).await?;
811            }
812            TenantExecutorInner::Conn(ref mut conn) => {
813                query.execute(&mut **conn).await?;
814            }
815        }
816        Ok(())
817    }
818}
819
820fn compute_changed_fields(pre: &Value, post: &Value, entity: &ResolvedEntity) -> Value {
821    let pre_obj = match pre.as_object() {
822        Some(o) => o,
823        None => return Value::Null,
824    };
825    let post_obj = match post.as_object() {
826        Some(o) => o,
827        None => return Value::Null,
828    };
829    let mut changes = serde_json::Map::new();
830    for col in &entity.columns {
831        let pre_val = pre_obj.get(&col.name).unwrap_or(&Value::Null);
832        let post_val = post_obj.get(&col.name).unwrap_or(&Value::Null);
833        if pre_val != post_val {
834            let mut diff = serde_json::Map::new();
835            diff.insert("old".to_string(), pre_val.clone());
836            diff.insert("new".to_string(), post_val.clone());
837            changes.insert(col.name.clone(), Value::Object(diff));
838        }
839    }
840    Value::Object(changes)
841}
842
843fn row_to_json(row: &DbRow) -> Value {
844    use sqlx::Column;
845    use sqlx::Row;
846    let mut map = serde_json::Map::new();
847    for col in row.columns() {
848        let name = col.name();
849        let v = cell_to_value(row, name);
850        map.insert(name.to_string(), v);
851    }
852    Value::Object(map)
853}
854
855fn cell_to_value(row: &DbRow, name: &str) -> Value {
856    use sqlx::Row;
857    if let Ok(Some(n)) = row.try_get::<Option<i16>, _>(name) {
858        return Value::Number(n.into());
859    }
860    if let Ok(Some(n)) = row.try_get::<Option<i32>, _>(name) {
861        return Value::Number(n.into());
862    }
863    if let Ok(Some(n)) = row.try_get::<Option<i64>, _>(name) {
864        return Value::Number(n.into());
865    }
866    if let Ok(Some(n)) = row.try_get::<Option<f32>, _>(name) {
867        if let Some(n) = serde_json::Number::from_f64(n as f64) {
868            return Value::Number(n);
869        }
870    }
871    if let Ok(Some(n)) = row.try_get::<Option<f64>, _>(name) {
872        if let Some(n) = serde_json::Number::from_f64(n) {
873            return Value::Number(n);
874        }
875    }
876    if let Ok(Some(b)) = row.try_get::<Option<bool>, _>(name) {
877        return Value::Bool(b);
878    }
879    #[cfg(feature = "postgres")]
880    if let Ok(Some(vec)) = row.try_get::<Option<Vec<String>>, _>(name) {
881        return Value::Array(vec.into_iter().map(Value::String).collect());
882    }
883    #[cfg(feature = "postgres")]
884    if let Ok(Some(vec)) = row.try_get::<Option<Vec<uuid::Uuid>>, _>(name) {
885        return Value::Array(
886            vec.into_iter()
887                .map(|u| Value::String(u.to_string()))
888                .collect(),
889        );
890    }
891    #[cfg(feature = "postgres")]
892    if let Ok(Some(vec)) = row.try_get::<Option<Vec<i64>>, _>(name) {
893        return Value::Array(vec.into_iter().map(|n| Value::Number(n.into())).collect());
894    }
895    if let Ok(Some(u)) = row.try_get::<Option<uuid::Uuid>, _>(name) {
896        return Value::String(u.to_string());
897    }
898    if let Ok(Some(d)) = row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(name) {
899        return Value::String(d.to_rfc3339());
900    }
901    if let Ok(Some(d)) = row.try_get::<Option<chrono::NaiveDateTime>, _>(name) {
902        return Value::String(d.format("%Y-%m-%dT%H:%M:%S%.f").to_string());
903    }
904    if let Ok(Some(d)) = row.try_get::<Option<chrono::NaiveDate>, _>(name) {
905        return Value::String(d.format("%Y-%m-%d").to_string());
906    }
907    if let Ok(Some(s)) = row.try_get::<Option<String>, _>(name) {
908        // Numeric columns are selected as ::text; parse so we return a JSON number not string
909        if let Ok(n) = s.trim().parse::<f64>() {
910            if let Some(num) = serde_json::Number::from_f64(n) {
911                return Value::Number(num);
912            }
913        }
914        return Value::String(s);
915    }
916    if let Ok(Some(j)) = row.try_get::<Option<serde_json::Value>, _>(name) {
917        return j;
918    }
919    Value::Null
920}