Skip to main content

architect_sdk/service/
crud.rs

1//! Generic CRUD execution against PostgreSQL.
2
3use crate::config::ResolvedEntity;
4use crate::db::pool::{Connection, DbRow, Pool};
5use crate::db::Dialect;
6use crate::error::AppError;
7use crate::extensible_fields::ExtensibleRegistry;
8use crate::sql::{
9    archive, coerce_json_value_for_pg_array, delete, insert, insert_history_snapshot,
10    prune_history, select_by_column_in, select_by_id, select_list, select_list_with_includes,
11    unarchive, update, BindValue, FilterNode, IncludeSelect, QueryBuf, SortSpec,
12};
13use serde_json::Value;
14use std::collections::HashMap;
15
16/// Execution target: either a pool (for database/schema strategy) or a single connection (for RLS, with SET LOCAL already applied).
17pub enum TenantExecutorInner<'a> {
18    Pool(&'a Pool),
19    Conn(&'a mut Connection),
20}
21
22pub struct TenantExecutor<'a> {
23    pub executor: TenantExecutorInner<'a>,
24    pub dialect: &'a dyn crate::db::Dialect,
25}
26
27impl<'a> TenantExecutor<'a> {
28    pub fn pool(pool: &'a Pool, dialect: &'a dyn crate::db::Dialect) -> Self {
29        TenantExecutor {
30            executor: TenantExecutorInner::Pool(pool),
31            dialect,
32        }
33    }
34    pub fn conn(conn: &'a mut Connection, dialect: &'a dyn crate::db::Dialect) -> Self {
35        TenantExecutor {
36            executor: TenantExecutorInner::Conn(conn),
37            dialect,
38        }
39    }
40}
41
42pub struct CrudService;
43
44impl CrudService {
45    /// List rows with optional RSQL filter and sort, limit (default 100, max 1000), offset (default 0).
46    /// `filter_includes` supplies related-entity metadata for dotted-field EXISTS filters; pass `&[]` when unused.
47    #[allow(clippy::too_many_arguments)]
48    pub async fn list<'a>(
49        executor: &mut TenantExecutor<'a>,
50        entity: &ResolvedEntity,
51        filter: Option<&FilterNode>,
52        sort: &[SortSpec],
53        limit: Option<u32>,
54        offset: Option<u32>,
55        filter_includes: &[IncludeSelect<'_>],
56        schema_override: Option<&str>,
57        dialect: &dyn Dialect,
58        registry: Option<&ExtensibleRegistry>,
59    ) -> Result<Vec<Value>, AppError> {
60        const DEFAULT_LIMIT: u32 = 100;
61        let limit = limit.unwrap_or(DEFAULT_LIMIT).min(1000);
62        let offset = offset.unwrap_or(0);
63        let q = select_list(
64            entity,
65            filter,
66            sort,
67            Some(limit),
68            Some(offset),
69            filter_includes,
70            schema_override,
71            dialect,
72            registry,
73        )?;
74        Self::query_many_exec(executor, &q.sql, &q.params).await
75    }
76
77    /// List rows with includes in a single query (scalar subqueries with json_agg/row_to_json). Returns rows with include keys already set (JSON).
78    /// `includes` drives scalar subqueries for response data; `filter_includes` is the superset used for EXISTS generation.
79    #[allow(clippy::too_many_arguments)]
80    pub async fn list_with_includes<'a>(
81        executor: &mut TenantExecutor<'a>,
82        entity: &ResolvedEntity,
83        filter: Option<&FilterNode>,
84        sort: &[SortSpec],
85        limit: Option<u32>,
86        offset: Option<u32>,
87        includes: &[IncludeSelect<'_>],
88        filter_includes: &[IncludeSelect<'_>],
89        schema_override: Option<&str>,
90        dialect: &dyn Dialect,
91        registry: Option<&ExtensibleRegistry>,
92    ) -> Result<Vec<Value>, AppError> {
93        const DEFAULT_LIMIT: u32 = 100;
94        let limit = limit.unwrap_or(DEFAULT_LIMIT).min(1000);
95        let offset = offset.unwrap_or(0);
96        let q = select_list_with_includes(
97            entity,
98            filter,
99            sort,
100            Some(limit),
101            Some(offset),
102            includes,
103            filter_includes,
104            schema_override,
105            dialect,
106            registry,
107        )?;
108        Self::query_many_exec(executor, &q.sql, &q.params).await
109    }
110
111    /// Fetch one row by primary key. Returns JSON object or None.
112    pub async fn read<'a>(
113        executor: &mut TenantExecutor<'a>,
114        entity: &ResolvedEntity,
115        id: &Value,
116        schema_override: Option<&str>,
117        dialect: &dyn Dialect,
118    ) -> Result<Option<Value>, AppError> {
119        let q = select_by_id(entity, schema_override, dialect);
120        Self::query_one_exec(executor, &q.sql, std::slice::from_ref(id)).await
121    }
122
123    /// Fetch rows from entity where column IN (values). Used for batch-loading related rows.
124    pub async fn fetch_where_column_in<'a>(
125        executor: &mut TenantExecutor<'a>,
126        entity: &ResolvedEntity,
127        column_name: &str,
128        values: &[Value],
129        schema_override: Option<&str>,
130        dialect: &dyn Dialect,
131    ) -> Result<Vec<Value>, AppError> {
132        if values.is_empty() {
133            return Ok(Vec::new());
134        }
135        let q = select_by_column_in(entity, column_name, values, schema_override, dialect);
136        Self::query_many_exec(executor, &q.sql, &q.params).await
137    }
138
139    /// Insert one row; body may include or omit PK (if has default). Returns created row.
140    /// When rls_tenant_id is Some (RLS strategy), tenant_id column is set automatically.
141    /// When caller_user_id is Some, created_by is set to that value.
142    pub async fn create<'a>(
143        executor: &mut TenantExecutor<'a>,
144        entity: &ResolvedEntity,
145        body: &HashMap<String, Value>,
146        schema_override: Option<&str>,
147        rls_tenant_id: Option<&str>,
148        caller_user_id: Option<&str>,
149        dialect: &dyn Dialect,
150    ) -> Result<Value, AppError> {
151        let include_pk = body.contains_key(&entity.pk_columns[0]);
152        let q = insert(
153            entity,
154            body,
155            include_pk,
156            schema_override,
157            rls_tenant_id,
158            caller_user_id,
159            dialect,
160        );
161        let row = Self::execute_returning_one_exec(executor, &q)
162            .await?
163            .ok_or_else(|| AppError::Db(sqlx::Error::RowNotFound))?;
164        if entity.audit_log {
165            Self::insert_audit(
166                executor,
167                entity,
168                "create",
169                &row,
170                None,
171                caller_user_id,
172                schema_override,
173            )
174            .await?;
175        }
176        Ok(row)
177    }
178
179    /// Update one row by id. Returns updated row.
180    /// When caller_user_id is Some, updated_by is set to that value.
181    /// When entity has versioning enabled, a history snapshot is written atomically before the update.
182    pub async fn update<'a>(
183        executor: &mut TenantExecutor<'a>,
184        entity: &ResolvedEntity,
185        id: &Value,
186        body: &HashMap<String, Value>,
187        schema_override: Option<&str>,
188        caller_user_id: Option<&str>,
189        dialect: &dyn Dialect,
190    ) -> Result<Option<Value>, AppError> {
191        let versioning_enabled = entity.versioning.as_ref().is_some_and(|v| v.enabled);
192
193        let pre_row = if entity.audit_log || versioning_enabled {
194            let q = select_by_id(entity, schema_override, dialect);
195            Self::query_one_exec(executor, &q.sql, std::slice::from_ref(id)).await?
196        } else {
197            None
198        };
199
200        let result = if versioning_enabled {
201            // Write snapshot + update in a single transaction.
202            let snap_q = insert_history_snapshot(entity, "update", schema_override, dialect);
203            let upd_q = update(entity, id, body, schema_override, caller_user_id, dialect);
204            let keep = entity.versioning.as_ref().and_then(|v| v.keep_versions);
205            let prune_q = keep.map(|_| prune_history(entity, schema_override, dialect));
206            Self::run_versioned_update(executor, id, snap_q, upd_q, prune_q, keep).await?
207        } else {
208            let q = update(entity, id, body, schema_override, caller_user_id, dialect);
209            Self::execute_returning_one_exec(executor, &q).await?
210        };
211
212        if entity.audit_log {
213            if let Some(ref post_row) = result {
214                Self::insert_audit(
215                    executor,
216                    entity,
217                    "update",
218                    post_row,
219                    pre_row.as_ref(),
220                    caller_user_id,
221                    schema_override,
222                )
223                .await?;
224            }
225        }
226        Ok(result)
227    }
228
229    /// Delete one row by id. Returns deleted row or None.
230    /// When caller_user_id is Some, audit_by is set on the audit record.
231    /// When entity has versioning enabled, a history snapshot is written atomically before the delete.
232    pub async fn delete<'a>(
233        executor: &mut TenantExecutor<'a>,
234        entity: &ResolvedEntity,
235        id: &Value,
236        schema_override: Option<&str>,
237        caller_user_id: Option<&str>,
238        dialect: &dyn Dialect,
239    ) -> Result<Option<Value>, AppError> {
240        let versioning_enabled = entity.versioning.as_ref().is_some_and(|v| v.enabled);
241
242        let result = if versioning_enabled {
243            let snap_q = insert_history_snapshot(entity, "delete", schema_override, dialect);
244            let del_q = delete(entity, schema_override, dialect);
245            Self::run_versioned_delete(executor, id, snap_q, del_q).await?
246        } else {
247            let q = delete(entity, schema_override, dialect);
248            Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
249                .await?
250        };
251
252        if entity.audit_log {
253            if let Some(ref deleted_row) = result {
254                Self::insert_audit(
255                    executor,
256                    entity,
257                    "delete",
258                    deleted_row,
259                    None,
260                    caller_user_id,
261                    schema_override,
262                )
263                .await?;
264            }
265        }
266        Ok(result)
267    }
268
269    /// Archive one row by id: stamps archive_field with NOW() if it is currently NULL.
270    /// Returns the updated row, or None if the record was not found or already archived.
271    pub async fn archive<'a>(
272        executor: &mut TenantExecutor<'a>,
273        entity: &ResolvedEntity,
274        archive_field: &str,
275        id: &Value,
276        schema_override: Option<&str>,
277        dialect: &dyn Dialect,
278    ) -> Result<Option<Value>, AppError> {
279        let q = archive(entity, archive_field, schema_override, dialect);
280        Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
281            .await
282    }
283
284    /// Unarchive one row by id: clears archive_field (sets to NULL) if it is currently NOT NULL.
285    /// Returns the updated row, or None if the record was not found or not archived.
286    pub async fn unarchive<'a>(
287        executor: &mut TenantExecutor<'a>,
288        entity: &ResolvedEntity,
289        archive_field: &str,
290        id: &Value,
291        schema_override: Option<&str>,
292        dialect: &dyn Dialect,
293    ) -> Result<Option<Value>, AppError> {
294        let q = unarchive(entity, archive_field, schema_override, dialect);
295        Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
296            .await
297    }
298
299    /// Bulk create in a transaction (when using pool) or on the same connection (when using conn). Returns vec of created rows.
300    /// When rls_tenant_id is Some (RLS strategy), tenant_id column is set automatically on each row.
301    /// When caller_user_id is Some, created_by is set on each row.
302    pub async fn bulk_create<'a>(
303        executor: &mut TenantExecutor<'a>,
304        entity: &ResolvedEntity,
305        items: &[HashMap<String, Value>],
306        schema_override: Option<&str>,
307        rls_tenant_id: Option<&str>,
308        caller_user_id: Option<&str>,
309        dialect: &dyn Dialect,
310    ) -> Result<Vec<Value>, AppError> {
311        const BULK_LIMIT: usize = 100;
312        if items.len() > BULK_LIMIT {
313            return Err(AppError::BadRequest(format!(
314                "bulk create limited to {} items",
315                BULK_LIMIT
316            )));
317        }
318        let mut out = Vec::with_capacity(items.len());
319        match executor.executor {
320            TenantExecutorInner::Pool(pool) => {
321                let mut tx = pool.begin().await?;
322                for body in items {
323                    let include_pk = body.contains_key(&entity.pk_columns[0]);
324                    let q = insert(
325                        entity,
326                        body,
327                        include_pk,
328                        schema_override,
329                        rls_tenant_id,
330                        caller_user_id,
331                        dialect,
332                    );
333                    let row = Self::execute_returning_one_tx(&mut tx, &q)
334                        .await?
335                        .unwrap_or(Value::Null);
336                    out.push(row);
337                }
338                tx.commit().await?;
339            }
340            TenantExecutorInner::Conn(ref mut conn) => {
341                for body in items {
342                    let include_pk = body.contains_key(&entity.pk_columns[0]);
343                    let q = insert(
344                        entity,
345                        body,
346                        include_pk,
347                        schema_override,
348                        rls_tenant_id,
349                        caller_user_id,
350                        dialect,
351                    );
352                    let row = Self::execute_returning_one_conn(conn, &q)
353                        .await?
354                        .unwrap_or(Value::Null);
355                    out.push(row);
356                }
357            }
358        }
359        Ok(out)
360    }
361
362    /// Like `bulk_create` but uses savepoints to isolate per-row DB errors.
363    /// Returns `(successful_rows, row_errors)`. If any errors occur the transaction is
364    /// rolled back and successful_rows will be empty — call site decides how to surface errors.
365    pub async fn bulk_create_collecting<'a>(
366        executor: &mut TenantExecutor<'a>,
367        entity: &ResolvedEntity,
368        items: &[HashMap<String, Value>],
369        schema_override: Option<&str>,
370        rls_tenant_id: Option<&str>,
371        caller_user_id: Option<&str>,
372        dialect: &dyn Dialect,
373    ) -> Result<(Vec<Value>, Vec<(usize, AppError)>), AppError> {
374        const BULK_LIMIT: usize = 100;
375        if items.len() > BULK_LIMIT {
376            return Err(AppError::BadRequest(format!(
377                "bulk create limited to {} items",
378                BULK_LIMIT
379            )));
380        }
381        let mut out = Vec::with_capacity(items.len());
382        let mut row_errors: Vec<(usize, AppError)> = Vec::new();
383        match executor.executor {
384            TenantExecutorInner::Pool(pool) => {
385                let mut tx = pool.begin().await?;
386                for (idx, body) in items.iter().enumerate() {
387                    let sp = format!("sp_{}", idx);
388                    sqlx::query(&format!("SAVEPOINT {}", sp))
389                        .execute(&mut *tx)
390                        .await?;
391                    let include_pk = body.contains_key(&entity.pk_columns[0]);
392                    let q = insert(
393                        entity,
394                        body,
395                        include_pk,
396                        schema_override,
397                        rls_tenant_id,
398                        caller_user_id,
399                        dialect,
400                    );
401                    match Self::execute_returning_one_tx(&mut tx, &q).await {
402                        Ok(row) => {
403                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
404                                .execute(&mut *tx)
405                                .await?;
406                            out.push(row.unwrap_or(Value::Null));
407                        }
408                        Err(e) => {
409                            sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
410                                .execute(&mut *tx)
411                                .await?;
412                            row_errors.push((idx, e));
413                        }
414                    }
415                }
416                if row_errors.is_empty() {
417                    tx.commit().await?;
418                } else {
419                    tx.rollback().await?;
420                    out.clear();
421                }
422            }
423            TenantExecutorInner::Conn(ref mut conn) => {
424                for (idx, body) in items.iter().enumerate() {
425                    let sp = format!("sp_{}", idx);
426                    sqlx::query(&format!("SAVEPOINT {}", sp))
427                        .execute(&mut **conn)
428                        .await?;
429                    let include_pk = body.contains_key(&entity.pk_columns[0]);
430                    let q = insert(
431                        entity,
432                        body,
433                        include_pk,
434                        schema_override,
435                        rls_tenant_id,
436                        caller_user_id,
437                        dialect,
438                    );
439                    match Self::execute_returning_one_conn(conn, &q).await {
440                        Ok(row) => {
441                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
442                                .execute(&mut **conn)
443                                .await?;
444                            out.push(row.unwrap_or(Value::Null));
445                        }
446                        Err(e) => {
447                            sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
448                                .execute(&mut **conn)
449                                .await?;
450                            row_errors.push((idx, e));
451                        }
452                    }
453                }
454                if !row_errors.is_empty() {
455                    out.clear();
456                }
457            }
458        }
459        Ok((out, row_errors))
460    }
461
462    /// Bulk update in a transaction (when using pool) or on the same connection (when using conn). Each item must have id. Returns vec of updated rows.
463    /// When caller_user_id is Some, updated_by is set on each row.
464    pub async fn bulk_update<'a>(
465        executor: &mut TenantExecutor<'a>,
466        entity: &ResolvedEntity,
467        items: &[HashMap<String, Value>],
468        schema_override: Option<&str>,
469        caller_user_id: Option<&str>,
470        dialect: &dyn Dialect,
471    ) -> Result<Vec<Value>, AppError> {
472        const BULK_LIMIT: usize = 100;
473        if items.len() > BULK_LIMIT {
474            return Err(AppError::BadRequest(format!(
475                "bulk update limited to {} items",
476                BULK_LIMIT
477            )));
478        }
479        let pk = &entity.pk_columns[0];
480        let mut out = Vec::with_capacity(items.len());
481        match executor.executor {
482            TenantExecutorInner::Pool(pool) => {
483                let mut tx = pool.begin().await?;
484                for body in items {
485                    let id = body.get(pk).ok_or_else(|| {
486                        AppError::Validation(format!("each item must have '{}'", pk))
487                    })?;
488                    let mut body_without_pk = body.clone();
489                    body_without_pk.remove(pk);
490                    let q = update(
491                        entity,
492                        id,
493                        &body_without_pk,
494                        schema_override,
495                        caller_user_id,
496                        dialect,
497                    );
498                    if let Some(row) = Self::execute_returning_one_tx(&mut tx, &q).await? {
499                        out.push(row);
500                    }
501                }
502                tx.commit().await?;
503            }
504            TenantExecutorInner::Conn(ref mut conn) => {
505                for body in items {
506                    let id = body.get(pk).ok_or_else(|| {
507                        AppError::Validation(format!("each item must have '{}'", pk))
508                    })?;
509                    let mut body_without_pk = body.clone();
510                    body_without_pk.remove(pk);
511                    let q = update(
512                        entity,
513                        id,
514                        &body_without_pk,
515                        schema_override,
516                        caller_user_id,
517                        dialect,
518                    );
519                    if let Some(row) = Self::execute_returning_one_conn(conn, &q).await? {
520                        out.push(row);
521                    }
522                }
523            }
524        }
525        Ok(out)
526    }
527
528    /// Like `bulk_update` but uses savepoints to isolate per-row DB errors.
529    /// Missing pk on an item is recorded as a row error rather than aborting early.
530    /// Returns `(successful_rows, row_errors)`. If any errors occur the transaction is
531    /// rolled back and successful_rows will be empty.
532    pub async fn bulk_update_collecting<'a>(
533        executor: &mut TenantExecutor<'a>,
534        entity: &ResolvedEntity,
535        items: &[HashMap<String, Value>],
536        schema_override: Option<&str>,
537        caller_user_id: Option<&str>,
538        dialect: &dyn Dialect,
539    ) -> Result<(Vec<Value>, Vec<(usize, AppError)>), AppError> {
540        const BULK_LIMIT: usize = 100;
541        if items.len() > BULK_LIMIT {
542            return Err(AppError::BadRequest(format!(
543                "bulk update limited to {} items",
544                BULK_LIMIT
545            )));
546        }
547        let pk = entity.pk_columns[0].clone();
548        let mut out = Vec::with_capacity(items.len());
549        let mut row_errors: Vec<(usize, AppError)> = Vec::new();
550        match executor.executor {
551            TenantExecutorInner::Pool(pool) => {
552                let mut tx = pool.begin().await?;
553                for (idx, body) in items.iter().enumerate() {
554                    let id = match body.get(&pk) {
555                        Some(id) => id.clone(),
556                        None => {
557                            row_errors.push((
558                                idx,
559                                AppError::Validation(format!("each item must have '{}'", pk)),
560                            ));
561                            continue;
562                        }
563                    };
564                    let sp = format!("sp_{}", idx);
565                    sqlx::query(&format!("SAVEPOINT {}", sp))
566                        .execute(&mut *tx)
567                        .await?;
568                    let mut body_without_pk = body.clone();
569                    body_without_pk.remove(&pk);
570                    let q = update(
571                        entity,
572                        &id,
573                        &body_without_pk,
574                        schema_override,
575                        caller_user_id,
576                        dialect,
577                    );
578                    match Self::execute_returning_one_tx(&mut tx, &q).await {
579                        Ok(Some(row)) => {
580                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
581                                .execute(&mut *tx)
582                                .await?;
583                            out.push(row);
584                        }
585                        Ok(None) => {
586                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
587                                .execute(&mut *tx)
588                                .await?;
589                        }
590                        Err(e) => {
591                            sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
592                                .execute(&mut *tx)
593                                .await?;
594                            row_errors.push((idx, e));
595                        }
596                    }
597                }
598                if row_errors.is_empty() {
599                    tx.commit().await?;
600                } else {
601                    tx.rollback().await?;
602                    out.clear();
603                }
604            }
605            TenantExecutorInner::Conn(ref mut conn) => {
606                for (idx, body) in items.iter().enumerate() {
607                    let id = match body.get(&pk) {
608                        Some(id) => id.clone(),
609                        None => {
610                            row_errors.push((
611                                idx,
612                                AppError::Validation(format!("each item must have '{}'", pk)),
613                            ));
614                            continue;
615                        }
616                    };
617                    let sp = format!("sp_{}", idx);
618                    sqlx::query(&format!("SAVEPOINT {}", sp))
619                        .execute(&mut **conn)
620                        .await?;
621                    let mut body_without_pk = body.clone();
622                    body_without_pk.remove(&pk);
623                    let q = update(
624                        entity,
625                        &id,
626                        &body_without_pk,
627                        schema_override,
628                        caller_user_id,
629                        dialect,
630                    );
631                    match Self::execute_returning_one_conn(conn, &q).await {
632                        Ok(Some(row)) => {
633                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
634                                .execute(&mut **conn)
635                                .await?;
636                            out.push(row);
637                        }
638                        Ok(None) => {
639                            sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
640                                .execute(&mut **conn)
641                                .await?;
642                        }
643                        Err(e) => {
644                            sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
645                                .execute(&mut **conn)
646                                .await?;
647                            row_errors.push((idx, e));
648                        }
649                    }
650                }
651                if !row_errors.is_empty() {
652                    out.clear();
653                }
654            }
655        }
656        Ok((out, row_errors))
657    }
658
659    /// Execute a history SELECT that returns multiple rows (used by list_history handler).
660    /// Binds: params[0] = pk value.
661    pub async fn query_history_many<'a>(
662        executor: &mut TenantExecutor<'a>,
663        sql: &str,
664        params: &[Value],
665    ) -> Result<Vec<Value>, AppError> {
666        Self::query_many_exec(executor, sql, params).await
667    }
668
669    /// Execute a history SELECT that returns one row (used by read_history_version handler).
670    /// Binds: $1 = pk value, $2 = version (i64).
671    pub async fn query_history_one<'a>(
672        executor: &mut TenantExecutor<'a>,
673        sql: &str,
674        id: &Value,
675        version: i64,
676    ) -> Result<Option<Value>, AppError> {
677        tracing::debug!(sql = %sql, "history query");
678        let mut query = sqlx::query(sql);
679        query = query.bind(Self::to_sqlx_param(id));
680        query = query.bind(version);
681        let row = match executor.executor {
682            TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
683            TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
684        };
685        Ok(row.map(|r| row_to_json(&r)))
686    }
687
688    async fn query_one_exec<'a>(
689        executor: &mut TenantExecutor<'a>,
690        sql: &str,
691        params: &[Value],
692    ) -> Result<Option<Value>, AppError> {
693        tracing::debug!(sql = %sql, params = ?params, "query");
694        let bind = Self::to_sqlx_param(&params[0]);
695        let row = match executor.executor {
696            TenantExecutorInner::Pool(pool) => {
697                sqlx::query(sql).bind(bind).fetch_optional(pool).await?
698            }
699            TenantExecutorInner::Conn(ref mut conn) => {
700                sqlx::query(sql)
701                    .bind(bind)
702                    .fetch_optional(&mut **conn)
703                    .await?
704            }
705        };
706        Ok(row.map(|r| row_to_json(&r)))
707    }
708
709    async fn query_many_exec<'a>(
710        executor: &mut TenantExecutor<'a>,
711        sql: &str,
712        params: &[Value],
713    ) -> Result<Vec<Value>, AppError> {
714        tracing::debug!(sql = %sql, params = ?params, "query");
715        let mut query = sqlx::query(sql);
716        for p in params {
717            query = query.bind(Self::to_sqlx_param(p));
718        }
719        let rows = match executor.executor {
720            TenantExecutorInner::Pool(pool) => query.fetch_all(pool).await?,
721            TenantExecutorInner::Conn(ref mut conn) => query.fetch_all(&mut **conn).await?,
722        };
723        Ok(rows.iter().map(row_to_json).collect())
724    }
725
726    async fn execute_returning_one_exec<'a>(
727        executor: &mut TenantExecutor<'a>,
728        q: &QueryBuf,
729    ) -> Result<Option<Value>, AppError> {
730        tracing::debug!(sql = %q.sql, params = ?q.params, "query");
731        let mut query = sqlx::query(&q.sql);
732        for p in &q.params {
733            query = query.bind(Self::to_sqlx_param(p));
734        }
735        let row = match executor.executor {
736            TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
737            TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
738        };
739        Ok(row.map(|r| row_to_json(&r)))
740    }
741
742    async fn execute_returning_one_with_params_exec<'a>(
743        executor: &mut TenantExecutor<'a>,
744        sql: &str,
745        params: &[Value],
746    ) -> Result<Option<Value>, AppError> {
747        tracing::debug!(sql = %sql, params = ?params, "query");
748        let mut query = sqlx::query(sql);
749        for p in params {
750            query = query.bind(Self::to_sqlx_param(p));
751        }
752        let row = match executor.executor {
753            TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
754            TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
755        };
756        Ok(row.map(|r| row_to_json(&r)))
757    }
758
759    async fn execute_returning_one_conn(
760        conn: &mut Connection,
761        q: &QueryBuf,
762    ) -> Result<Option<Value>, AppError> {
763        tracing::debug!(sql = %q.sql, params = ?q.params, "query (conn)");
764        let mut query = sqlx::query(&q.sql);
765        for p in &q.params {
766            query = query.bind(Self::to_sqlx_param(p));
767        }
768        let row = query.fetch_optional(conn).await?;
769        Ok(row.map(|r| row_to_json(&r)))
770    }
771
772    async fn execute_returning_one_tx(
773        tx: &mut Connection,
774        q: &QueryBuf,
775    ) -> Result<Option<Value>, AppError> {
776        tracing::debug!(sql = %q.sql, params = ?q.params, "query (tx)");
777        let mut query = sqlx::query(&q.sql);
778        for p in &q.params {
779            query = query.bind(Self::to_sqlx_param(p));
780        }
781        let row = query.fetch_optional(&mut *tx).await?;
782        Ok(row.map(|r| row_to_json(&r)))
783    }
784
785    fn to_sqlx_param(v: &Value) -> BindValue {
786        BindValue::from_json(v).unwrap_or(BindValue::Null)
787    }
788
789    /// Snapshot + UPDATE in one transaction (versioning path for update).
790    async fn run_versioned_update<'a>(
791        executor: &mut TenantExecutor<'a>,
792        id: &Value,
793        snap_q: QueryBuf,
794        upd_q: QueryBuf,
795        prune_q: Option<QueryBuf>,
796        keep_versions: Option<i64>,
797    ) -> Result<Option<Value>, AppError> {
798        match executor.executor {
799            TenantExecutorInner::Pool(pool) => {
800                let mut tx = pool.begin().await?;
801                // Snapshot (INSERT INTO _history SELECT ...)
802                let mut snap = sqlx::query(&snap_q.sql);
803                snap = snap.bind(Self::to_sqlx_param(&snap_q.params[0])); // operation
804                snap = snap.bind(Self::to_sqlx_param(id)); // pk
805                snap.execute(&mut *tx).await?;
806                // Update
807                let mut upd = sqlx::query(&upd_q.sql);
808                for p in &upd_q.params {
809                    upd = upd.bind(Self::to_sqlx_param(p));
810                }
811                let row = upd.fetch_optional(&mut *tx).await?.map(|r| row_to_json(&r));
812                // Prune
813                if let (Some(pq), Some(kv)) = (prune_q, keep_versions) {
814                    let mut pr = sqlx::query(&pq.sql);
815                    pr = pr.bind(Self::to_sqlx_param(id));
816                    pr = pr.bind(kv);
817                    pr.execute(&mut *tx).await?;
818                }
819                tx.commit().await?;
820                Ok(row)
821            }
822            TenantExecutorInner::Conn(ref mut conn) => {
823                // On an RLS connection we can't open a nested transaction; use SAVEPOINT.
824                sqlx::query("SAVEPOINT sp_versioned_update")
825                    .execute(&mut **conn)
826                    .await?;
827                let snap_res = async {
828                    let mut snap = sqlx::query(&snap_q.sql);
829                    snap = snap.bind(Self::to_sqlx_param(&snap_q.params[0]));
830                    snap = snap.bind(Self::to_sqlx_param(id));
831                    snap.execute(&mut **conn).await?;
832                    let mut upd = sqlx::query(&upd_q.sql);
833                    for p in &upd_q.params {
834                        upd = upd.bind(Self::to_sqlx_param(p));
835                    }
836                    let row = upd
837                        .fetch_optional(&mut **conn)
838                        .await?
839                        .map(|r| row_to_json(&r));
840                    if let (Some(pq), Some(kv)) = (prune_q, keep_versions) {
841                        let mut pr = sqlx::query(&pq.sql);
842                        pr = pr.bind(Self::to_sqlx_param(id));
843                        pr = pr.bind(kv);
844                        pr.execute(&mut **conn).await?;
845                    }
846                    Ok::<_, sqlx::Error>(row)
847                }
848                .await;
849                match snap_res {
850                    Ok(row) => {
851                        sqlx::query("RELEASE SAVEPOINT sp_versioned_update")
852                            .execute(&mut **conn)
853                            .await?;
854                        Ok(row)
855                    }
856                    Err(e) => {
857                        sqlx::query("ROLLBACK TO SAVEPOINT sp_versioned_update")
858                            .execute(&mut **conn)
859                            .await?;
860                        Err(AppError::Db(e))
861                    }
862                }
863            }
864        }
865    }
866
867    /// Snapshot + DELETE in one transaction (versioning path for delete).
868    async fn run_versioned_delete<'a>(
869        executor: &mut TenantExecutor<'a>,
870        id: &Value,
871        snap_q: QueryBuf,
872        del_q: QueryBuf,
873    ) -> Result<Option<Value>, AppError> {
874        match executor.executor {
875            TenantExecutorInner::Pool(pool) => {
876                let mut tx = pool.begin().await?;
877                let mut snap = sqlx::query(&snap_q.sql);
878                snap = snap.bind(Self::to_sqlx_param(&snap_q.params[0])); // operation
879                snap = snap.bind(Self::to_sqlx_param(id)); // pk
880                snap.execute(&mut *tx).await?;
881                let mut del = sqlx::query(&del_q.sql);
882                del = del.bind(Self::to_sqlx_param(id));
883                let row = del.fetch_optional(&mut *tx).await?.map(|r| row_to_json(&r));
884                tx.commit().await?;
885                Ok(row)
886            }
887            TenantExecutorInner::Conn(ref mut conn) => {
888                sqlx::query("SAVEPOINT sp_versioned_delete")
889                    .execute(&mut **conn)
890                    .await?;
891                let snap_res = async {
892                    let mut snap = sqlx::query(&snap_q.sql);
893                    snap = snap.bind(Self::to_sqlx_param(&snap_q.params[0]));
894                    snap = snap.bind(Self::to_sqlx_param(id));
895                    snap.execute(&mut **conn).await?;
896                    let mut del = sqlx::query(&del_q.sql);
897                    del = del.bind(Self::to_sqlx_param(id));
898                    let row = del
899                        .fetch_optional(&mut **conn)
900                        .await?
901                        .map(|r| row_to_json(&r));
902                    Ok::<_, sqlx::Error>(row)
903                }
904                .await;
905                match snap_res {
906                    Ok(row) => {
907                        sqlx::query("RELEASE SAVEPOINT sp_versioned_delete")
908                            .execute(&mut **conn)
909                            .await?;
910                        Ok(row)
911                    }
912                    Err(e) => {
913                        sqlx::query("ROLLBACK TO SAVEPOINT sp_versioned_delete")
914                            .execute(&mut **conn)
915                            .await?;
916                        Err(AppError::Db(e))
917                    }
918                }
919            }
920        }
921    }
922
923    async fn insert_audit<'a>(
924        executor: &mut TenantExecutor<'a>,
925        entity: &ResolvedEntity,
926        action: &str,
927        row: &Value,
928        pre_row: Option<&Value>,
929        audit_by: Option<&str>,
930        schema_override: Option<&str>,
931    ) -> Result<(), AppError> {
932        let schema = schema_override.unwrap_or(&entity.schema_name);
933        let audit_table = format!(
934            "\"{}\".\"{}\"",
935            schema.replace('"', "\"\""),
936            format!("{}_audit", entity.table_name).replace('"', "\"\"")
937        );
938
939        let changed = if action == "update" {
940            pre_row.map(|pre| compute_changed_fields(pre, row, entity))
941        } else {
942            None
943        };
944
945        let mut col_names: Vec<String> = vec![
946            "\"audit_action\"".to_string(),
947            "\"audit_by\"".to_string(),
948            "\"changed_fields\"".to_string(),
949        ];
950        let mut placeholders: Vec<String> = Vec::new();
951        let mut params: Vec<Value> = Vec::new();
952
953        params.push(Value::String(action.to_string()));
954        placeholders.push(format!("${}", params.len()));
955
956        params.push(
957            audit_by
958                .map(|s| Value::String(s.to_string()))
959                .unwrap_or(Value::Null),
960        );
961        placeholders.push(format!("${}", params.len()));
962
963        params.push(changed.unwrap_or(Value::Null));
964        placeholders.push(format!("${}::jsonb", params.len()));
965
966        let row_obj = row.as_object();
967        for col in &entity.columns {
968            let raw = row_obj
969                .and_then(|o| o.get(&col.name))
970                .cloned()
971                .unwrap_or(Value::Null);
972            let val = coerce_json_value_for_pg_array(raw, col.pg_type.as_deref());
973            let param_num = params.len() + 1;
974            let ph = col
975                .pg_type
976                .as_deref()
977                .map(|t| format!("${}::{}", param_num, t))
978                .unwrap_or_else(|| format!("${}", param_num));
979            col_names.push(format!("\"{}\"", col.name));
980            placeholders.push(ph);
981            params.push(val);
982        }
983
984        let sql = format!(
985            "INSERT INTO {} ({}) VALUES ({})",
986            audit_table,
987            col_names.join(", "),
988            placeholders.join(", ")
989        );
990        tracing::debug!(sql = %sql, "audit insert");
991
992        let mut query = sqlx::query(&sql);
993        for p in &params {
994            query = query.bind(Self::to_sqlx_param(p));
995        }
996        match executor.executor {
997            TenantExecutorInner::Pool(pool) => {
998                query.execute(pool).await?;
999            }
1000            TenantExecutorInner::Conn(ref mut conn) => {
1001                query.execute(&mut **conn).await?;
1002            }
1003        }
1004        Ok(())
1005    }
1006}
1007
1008fn compute_changed_fields(pre: &Value, post: &Value, entity: &ResolvedEntity) -> Value {
1009    let pre_obj = match pre.as_object() {
1010        Some(o) => o,
1011        None => return Value::Null,
1012    };
1013    let post_obj = match post.as_object() {
1014        Some(o) => o,
1015        None => return Value::Null,
1016    };
1017    let mut changes = serde_json::Map::new();
1018    for col in &entity.columns {
1019        let pre_val = pre_obj.get(&col.name).unwrap_or(&Value::Null);
1020        let post_val = post_obj.get(&col.name).unwrap_or(&Value::Null);
1021        if pre_val != post_val {
1022            let mut diff = serde_json::Map::new();
1023            diff.insert("old".to_string(), pre_val.clone());
1024            diff.insert("new".to_string(), post_val.clone());
1025            changes.insert(col.name.clone(), Value::Object(diff));
1026        }
1027    }
1028    Value::Object(changes)
1029}
1030
1031fn row_to_json(row: &DbRow) -> Value {
1032    use sqlx::Column;
1033    use sqlx::Row;
1034    let mut map = serde_json::Map::new();
1035    for col in row.columns() {
1036        let name = col.name();
1037        let v = cell_to_value(row, name);
1038        map.insert(name.to_string(), v);
1039    }
1040    Value::Object(map)
1041}
1042
1043fn cell_to_value(row: &DbRow, name: &str) -> Value {
1044    use sqlx::Row;
1045    if let Ok(Some(n)) = row.try_get::<Option<i16>, _>(name) {
1046        return Value::Number(n.into());
1047    }
1048    if let Ok(Some(n)) = row.try_get::<Option<i32>, _>(name) {
1049        return Value::Number(n.into());
1050    }
1051    if let Ok(Some(n)) = row.try_get::<Option<i64>, _>(name) {
1052        return Value::Number(n.into());
1053    }
1054    if let Ok(Some(n)) = row.try_get::<Option<f32>, _>(name) {
1055        if let Some(n) = serde_json::Number::from_f64(n as f64) {
1056            return Value::Number(n);
1057        }
1058    }
1059    if let Ok(Some(n)) = row.try_get::<Option<f64>, _>(name) {
1060        if let Some(n) = serde_json::Number::from_f64(n) {
1061            return Value::Number(n);
1062        }
1063    }
1064    if let Ok(Some(b)) = row.try_get::<Option<bool>, _>(name) {
1065        return Value::Bool(b);
1066    }
1067    #[cfg(feature = "postgres")]
1068    if let Ok(Some(vec)) = row.try_get::<Option<Vec<String>>, _>(name) {
1069        return Value::Array(vec.into_iter().map(Value::String).collect());
1070    }
1071    #[cfg(feature = "postgres")]
1072    if let Ok(Some(vec)) = row.try_get::<Option<Vec<uuid::Uuid>>, _>(name) {
1073        return Value::Array(
1074            vec.into_iter()
1075                .map(|u| Value::String(u.to_string()))
1076                .collect(),
1077        );
1078    }
1079    #[cfg(feature = "postgres")]
1080    if let Ok(Some(vec)) = row.try_get::<Option<Vec<i64>>, _>(name) {
1081        return Value::Array(vec.into_iter().map(|n| Value::Number(n.into())).collect());
1082    }
1083    if let Ok(Some(u)) = row.try_get::<Option<uuid::Uuid>, _>(name) {
1084        return Value::String(u.to_string());
1085    }
1086    if let Ok(Some(d)) = row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(name) {
1087        return Value::String(d.to_rfc3339());
1088    }
1089    if let Ok(Some(d)) = row.try_get::<Option<chrono::NaiveDateTime>, _>(name) {
1090        return Value::String(d.format("%Y-%m-%dT%H:%M:%S%.f").to_string());
1091    }
1092    if let Ok(Some(d)) = row.try_get::<Option<chrono::NaiveDate>, _>(name) {
1093        return Value::String(d.format("%Y-%m-%d").to_string());
1094    }
1095    if let Ok(Some(s)) = row.try_get::<Option<String>, _>(name) {
1096        // Numeric columns are selected as ::text; parse so we return a JSON number not string
1097        if let Ok(n) = s.trim().parse::<f64>() {
1098            if let Some(num) = serde_json::Number::from_f64(n) {
1099                return Value::Number(num);
1100            }
1101        }
1102        return Value::String(s);
1103    }
1104    if let Ok(Some(j)) = row.try_get::<Option<serde_json::Value>, _>(name) {
1105        return j;
1106    }
1107    Value::Null
1108}