1use crate::config::ResolvedEntity;
4use crate::db::pool::{Connection, DbRow, Pool};
5use crate::db::Dialect;
6use crate::error::AppError;
7use crate::extensible_fields::ExtensibleRegistry;
8use crate::sql::{
9 archive, coerce_json_value_for_pg_array, delete, insert, insert_history_snapshot,
10 prune_history, select_by_column_in, select_by_id, select_list, select_list_with_includes,
11 unarchive, update, BindValue, FilterNode, IncludeSelect, QueryBuf, SortSpec,
12};
13use serde_json::Value;
14use std::collections::HashMap;
15
16pub enum TenantExecutorInner<'a> {
18 Pool(&'a Pool),
19 Conn(&'a mut Connection),
20}
21
22pub struct TenantExecutor<'a> {
23 pub executor: TenantExecutorInner<'a>,
24 pub dialect: &'a dyn crate::db::Dialect,
25}
26
27impl<'a> TenantExecutor<'a> {
28 pub fn pool(pool: &'a Pool, dialect: &'a dyn crate::db::Dialect) -> Self {
29 TenantExecutor {
30 executor: TenantExecutorInner::Pool(pool),
31 dialect,
32 }
33 }
34 pub fn conn(conn: &'a mut Connection, dialect: &'a dyn crate::db::Dialect) -> Self {
35 TenantExecutor {
36 executor: TenantExecutorInner::Conn(conn),
37 dialect,
38 }
39 }
40}
41
42pub struct CrudService;
43
44impl CrudService {
45 #[allow(clippy::too_many_arguments)]
48 pub async fn list<'a>(
49 executor: &mut TenantExecutor<'a>,
50 entity: &ResolvedEntity,
51 filter: Option<&FilterNode>,
52 sort: &[SortSpec],
53 limit: Option<u32>,
54 offset: Option<u32>,
55 filter_includes: &[IncludeSelect<'_>],
56 schema_override: Option<&str>,
57 dialect: &dyn Dialect,
58 registry: Option<&ExtensibleRegistry>,
59 ) -> Result<Vec<Value>, AppError> {
60 const DEFAULT_LIMIT: u32 = 100;
61 let limit = limit.unwrap_or(DEFAULT_LIMIT).min(1000);
62 let offset = offset.unwrap_or(0);
63 let q = select_list(
64 entity,
65 filter,
66 sort,
67 Some(limit),
68 Some(offset),
69 filter_includes,
70 schema_override,
71 dialect,
72 registry,
73 )?;
74 Self::query_many_exec(executor, &q.sql, &q.params).await
75 }
76
77 #[allow(clippy::too_many_arguments)]
80 pub async fn list_with_includes<'a>(
81 executor: &mut TenantExecutor<'a>,
82 entity: &ResolvedEntity,
83 filter: Option<&FilterNode>,
84 sort: &[SortSpec],
85 limit: Option<u32>,
86 offset: Option<u32>,
87 includes: &[IncludeSelect<'_>],
88 filter_includes: &[IncludeSelect<'_>],
89 schema_override: Option<&str>,
90 dialect: &dyn Dialect,
91 registry: Option<&ExtensibleRegistry>,
92 ) -> Result<Vec<Value>, AppError> {
93 const DEFAULT_LIMIT: u32 = 100;
94 let limit = limit.unwrap_or(DEFAULT_LIMIT).min(1000);
95 let offset = offset.unwrap_or(0);
96 let q = select_list_with_includes(
97 entity,
98 filter,
99 sort,
100 Some(limit),
101 Some(offset),
102 includes,
103 filter_includes,
104 schema_override,
105 dialect,
106 registry,
107 )?;
108 Self::query_many_exec(executor, &q.sql, &q.params).await
109 }
110
111 pub async fn read<'a>(
113 executor: &mut TenantExecutor<'a>,
114 entity: &ResolvedEntity,
115 id: &Value,
116 schema_override: Option<&str>,
117 dialect: &dyn Dialect,
118 ) -> Result<Option<Value>, AppError> {
119 let q = select_by_id(entity, schema_override, dialect);
120 Self::query_one_exec(executor, &q.sql, std::slice::from_ref(id)).await
121 }
122
123 pub async fn fetch_where_column_in<'a>(
125 executor: &mut TenantExecutor<'a>,
126 entity: &ResolvedEntity,
127 column_name: &str,
128 values: &[Value],
129 schema_override: Option<&str>,
130 dialect: &dyn Dialect,
131 ) -> Result<Vec<Value>, AppError> {
132 if values.is_empty() {
133 return Ok(Vec::new());
134 }
135 let q = select_by_column_in(entity, column_name, values, schema_override, dialect);
136 Self::query_many_exec(executor, &q.sql, &q.params).await
137 }
138
139 pub async fn create<'a>(
143 executor: &mut TenantExecutor<'a>,
144 entity: &ResolvedEntity,
145 body: &HashMap<String, Value>,
146 schema_override: Option<&str>,
147 rls_tenant_id: Option<&str>,
148 caller_user_id: Option<&str>,
149 dialect: &dyn Dialect,
150 ) -> Result<Value, AppError> {
151 let include_pk = body.contains_key(&entity.pk_columns[0]);
152 let q = insert(
153 entity,
154 body,
155 include_pk,
156 schema_override,
157 rls_tenant_id,
158 caller_user_id,
159 dialect,
160 );
161 let row = Self::execute_returning_one_exec(executor, &q)
162 .await?
163 .ok_or_else(|| AppError::Db(sqlx::Error::RowNotFound))?;
164 if entity.audit_log {
165 Self::insert_audit(
166 executor,
167 entity,
168 "create",
169 &row,
170 None,
171 caller_user_id,
172 schema_override,
173 )
174 .await?;
175 }
176 Ok(row)
177 }
178
179 pub async fn update<'a>(
183 executor: &mut TenantExecutor<'a>,
184 entity: &ResolvedEntity,
185 id: &Value,
186 body: &HashMap<String, Value>,
187 schema_override: Option<&str>,
188 caller_user_id: Option<&str>,
189 dialect: &dyn Dialect,
190 ) -> Result<Option<Value>, AppError> {
191 let versioning_enabled = entity.versioning.as_ref().is_some_and(|v| v.enabled);
192
193 let pre_row = if entity.audit_log || versioning_enabled {
194 let q = select_by_id(entity, schema_override, dialect);
195 Self::query_one_exec(executor, &q.sql, std::slice::from_ref(id)).await?
196 } else {
197 None
198 };
199
200 let result = if versioning_enabled {
201 let snap_q = insert_history_snapshot(entity, "update", schema_override, dialect);
203 let upd_q = update(entity, id, body, schema_override, caller_user_id, dialect);
204 let keep = entity.versioning.as_ref().and_then(|v| v.keep_versions);
205 let prune_q = keep.map(|_| prune_history(entity, schema_override, dialect));
206 Self::run_versioned_update(executor, id, snap_q, upd_q, prune_q, keep).await?
207 } else {
208 let q = update(entity, id, body, schema_override, caller_user_id, dialect);
209 Self::execute_returning_one_exec(executor, &q).await?
210 };
211
212 if entity.audit_log {
213 if let Some(ref post_row) = result {
214 Self::insert_audit(
215 executor,
216 entity,
217 "update",
218 post_row,
219 pre_row.as_ref(),
220 caller_user_id,
221 schema_override,
222 )
223 .await?;
224 }
225 }
226 Ok(result)
227 }
228
229 pub async fn delete<'a>(
233 executor: &mut TenantExecutor<'a>,
234 entity: &ResolvedEntity,
235 id: &Value,
236 schema_override: Option<&str>,
237 caller_user_id: Option<&str>,
238 dialect: &dyn Dialect,
239 ) -> Result<Option<Value>, AppError> {
240 let versioning_enabled = entity.versioning.as_ref().is_some_and(|v| v.enabled);
241
242 let result = if versioning_enabled {
243 let snap_q = insert_history_snapshot(entity, "delete", schema_override, dialect);
244 let del_q = delete(entity, schema_override, dialect);
245 Self::run_versioned_delete(executor, id, snap_q, del_q).await?
246 } else {
247 let q = delete(entity, schema_override, dialect);
248 Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
249 .await?
250 };
251
252 if entity.audit_log {
253 if let Some(ref deleted_row) = result {
254 Self::insert_audit(
255 executor,
256 entity,
257 "delete",
258 deleted_row,
259 None,
260 caller_user_id,
261 schema_override,
262 )
263 .await?;
264 }
265 }
266 Ok(result)
267 }
268
269 pub async fn archive<'a>(
272 executor: &mut TenantExecutor<'a>,
273 entity: &ResolvedEntity,
274 archive_field: &str,
275 id: &Value,
276 schema_override: Option<&str>,
277 dialect: &dyn Dialect,
278 ) -> Result<Option<Value>, AppError> {
279 let q = archive(entity, archive_field, schema_override, dialect);
280 Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
281 .await
282 }
283
284 pub async fn unarchive<'a>(
287 executor: &mut TenantExecutor<'a>,
288 entity: &ResolvedEntity,
289 archive_field: &str,
290 id: &Value,
291 schema_override: Option<&str>,
292 dialect: &dyn Dialect,
293 ) -> Result<Option<Value>, AppError> {
294 let q = unarchive(entity, archive_field, schema_override, dialect);
295 Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
296 .await
297 }
298
299 pub async fn bulk_create<'a>(
303 executor: &mut TenantExecutor<'a>,
304 entity: &ResolvedEntity,
305 items: &[HashMap<String, Value>],
306 schema_override: Option<&str>,
307 rls_tenant_id: Option<&str>,
308 caller_user_id: Option<&str>,
309 dialect: &dyn Dialect,
310 ) -> Result<Vec<Value>, AppError> {
311 const BULK_LIMIT: usize = 100;
312 if items.len() > BULK_LIMIT {
313 return Err(AppError::BadRequest(format!(
314 "bulk create limited to {} items",
315 BULK_LIMIT
316 )));
317 }
318 let mut out = Vec::with_capacity(items.len());
319 match executor.executor {
320 TenantExecutorInner::Pool(pool) => {
321 let mut tx = pool.begin().await?;
322 for body in items {
323 let include_pk = body.contains_key(&entity.pk_columns[0]);
324 let q = insert(
325 entity,
326 body,
327 include_pk,
328 schema_override,
329 rls_tenant_id,
330 caller_user_id,
331 dialect,
332 );
333 let row = Self::execute_returning_one_tx(&mut tx, &q)
334 .await?
335 .unwrap_or(Value::Null);
336 out.push(row);
337 }
338 tx.commit().await?;
339 }
340 TenantExecutorInner::Conn(ref mut conn) => {
341 for body in items {
342 let include_pk = body.contains_key(&entity.pk_columns[0]);
343 let q = insert(
344 entity,
345 body,
346 include_pk,
347 schema_override,
348 rls_tenant_id,
349 caller_user_id,
350 dialect,
351 );
352 let row = Self::execute_returning_one_conn(conn, &q)
353 .await?
354 .unwrap_or(Value::Null);
355 out.push(row);
356 }
357 }
358 }
359 Ok(out)
360 }
361
362 pub async fn bulk_create_collecting<'a>(
366 executor: &mut TenantExecutor<'a>,
367 entity: &ResolvedEntity,
368 items: &[HashMap<String, Value>],
369 schema_override: Option<&str>,
370 rls_tenant_id: Option<&str>,
371 caller_user_id: Option<&str>,
372 dialect: &dyn Dialect,
373 ) -> Result<(Vec<Value>, Vec<(usize, AppError)>), AppError> {
374 const BULK_LIMIT: usize = 100;
375 if items.len() > BULK_LIMIT {
376 return Err(AppError::BadRequest(format!(
377 "bulk create limited to {} items",
378 BULK_LIMIT
379 )));
380 }
381 let mut out = Vec::with_capacity(items.len());
382 let mut row_errors: Vec<(usize, AppError)> = Vec::new();
383 match executor.executor {
384 TenantExecutorInner::Pool(pool) => {
385 let mut tx = pool.begin().await?;
386 for (idx, body) in items.iter().enumerate() {
387 let sp = format!("sp_{}", idx);
388 sqlx::query(&format!("SAVEPOINT {}", sp))
389 .execute(&mut *tx)
390 .await?;
391 let include_pk = body.contains_key(&entity.pk_columns[0]);
392 let q = insert(
393 entity,
394 body,
395 include_pk,
396 schema_override,
397 rls_tenant_id,
398 caller_user_id,
399 dialect,
400 );
401 match Self::execute_returning_one_tx(&mut tx, &q).await {
402 Ok(row) => {
403 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
404 .execute(&mut *tx)
405 .await?;
406 out.push(row.unwrap_or(Value::Null));
407 }
408 Err(e) => {
409 sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
410 .execute(&mut *tx)
411 .await?;
412 row_errors.push((idx, e));
413 }
414 }
415 }
416 if row_errors.is_empty() {
417 tx.commit().await?;
418 } else {
419 tx.rollback().await?;
420 out.clear();
421 }
422 }
423 TenantExecutorInner::Conn(ref mut conn) => {
424 for (idx, body) in items.iter().enumerate() {
425 let sp = format!("sp_{}", idx);
426 sqlx::query(&format!("SAVEPOINT {}", sp))
427 .execute(&mut **conn)
428 .await?;
429 let include_pk = body.contains_key(&entity.pk_columns[0]);
430 let q = insert(
431 entity,
432 body,
433 include_pk,
434 schema_override,
435 rls_tenant_id,
436 caller_user_id,
437 dialect,
438 );
439 match Self::execute_returning_one_conn(conn, &q).await {
440 Ok(row) => {
441 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
442 .execute(&mut **conn)
443 .await?;
444 out.push(row.unwrap_or(Value::Null));
445 }
446 Err(e) => {
447 sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
448 .execute(&mut **conn)
449 .await?;
450 row_errors.push((idx, e));
451 }
452 }
453 }
454 if !row_errors.is_empty() {
455 out.clear();
456 }
457 }
458 }
459 Ok((out, row_errors))
460 }
461
462 pub async fn bulk_update<'a>(
465 executor: &mut TenantExecutor<'a>,
466 entity: &ResolvedEntity,
467 items: &[HashMap<String, Value>],
468 schema_override: Option<&str>,
469 caller_user_id: Option<&str>,
470 dialect: &dyn Dialect,
471 ) -> Result<Vec<Value>, AppError> {
472 const BULK_LIMIT: usize = 100;
473 if items.len() > BULK_LIMIT {
474 return Err(AppError::BadRequest(format!(
475 "bulk update limited to {} items",
476 BULK_LIMIT
477 )));
478 }
479 let pk = &entity.pk_columns[0];
480 let mut out = Vec::with_capacity(items.len());
481 match executor.executor {
482 TenantExecutorInner::Pool(pool) => {
483 let mut tx = pool.begin().await?;
484 for body in items {
485 let id = body.get(pk).ok_or_else(|| {
486 AppError::Validation(format!("each item must have '{}'", pk))
487 })?;
488 let mut body_without_pk = body.clone();
489 body_without_pk.remove(pk);
490 let q = update(
491 entity,
492 id,
493 &body_without_pk,
494 schema_override,
495 caller_user_id,
496 dialect,
497 );
498 if let Some(row) = Self::execute_returning_one_tx(&mut tx, &q).await? {
499 out.push(row);
500 }
501 }
502 tx.commit().await?;
503 }
504 TenantExecutorInner::Conn(ref mut conn) => {
505 for body in items {
506 let id = body.get(pk).ok_or_else(|| {
507 AppError::Validation(format!("each item must have '{}'", pk))
508 })?;
509 let mut body_without_pk = body.clone();
510 body_without_pk.remove(pk);
511 let q = update(
512 entity,
513 id,
514 &body_without_pk,
515 schema_override,
516 caller_user_id,
517 dialect,
518 );
519 if let Some(row) = Self::execute_returning_one_conn(conn, &q).await? {
520 out.push(row);
521 }
522 }
523 }
524 }
525 Ok(out)
526 }
527
528 pub async fn bulk_update_collecting<'a>(
533 executor: &mut TenantExecutor<'a>,
534 entity: &ResolvedEntity,
535 items: &[HashMap<String, Value>],
536 schema_override: Option<&str>,
537 caller_user_id: Option<&str>,
538 dialect: &dyn Dialect,
539 ) -> Result<(Vec<Value>, Vec<(usize, AppError)>), AppError> {
540 const BULK_LIMIT: usize = 100;
541 if items.len() > BULK_LIMIT {
542 return Err(AppError::BadRequest(format!(
543 "bulk update limited to {} items",
544 BULK_LIMIT
545 )));
546 }
547 let pk = entity.pk_columns[0].clone();
548 let mut out = Vec::with_capacity(items.len());
549 let mut row_errors: Vec<(usize, AppError)> = Vec::new();
550 match executor.executor {
551 TenantExecutorInner::Pool(pool) => {
552 let mut tx = pool.begin().await?;
553 for (idx, body) in items.iter().enumerate() {
554 let id = match body.get(&pk) {
555 Some(id) => id.clone(),
556 None => {
557 row_errors.push((
558 idx,
559 AppError::Validation(format!("each item must have '{}'", pk)),
560 ));
561 continue;
562 }
563 };
564 let sp = format!("sp_{}", idx);
565 sqlx::query(&format!("SAVEPOINT {}", sp))
566 .execute(&mut *tx)
567 .await?;
568 let mut body_without_pk = body.clone();
569 body_without_pk.remove(&pk);
570 let q = update(
571 entity,
572 &id,
573 &body_without_pk,
574 schema_override,
575 caller_user_id,
576 dialect,
577 );
578 match Self::execute_returning_one_tx(&mut tx, &q).await {
579 Ok(Some(row)) => {
580 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
581 .execute(&mut *tx)
582 .await?;
583 out.push(row);
584 }
585 Ok(None) => {
586 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
587 .execute(&mut *tx)
588 .await?;
589 }
590 Err(e) => {
591 sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
592 .execute(&mut *tx)
593 .await?;
594 row_errors.push((idx, e));
595 }
596 }
597 }
598 if row_errors.is_empty() {
599 tx.commit().await?;
600 } else {
601 tx.rollback().await?;
602 out.clear();
603 }
604 }
605 TenantExecutorInner::Conn(ref mut conn) => {
606 for (idx, body) in items.iter().enumerate() {
607 let id = match body.get(&pk) {
608 Some(id) => id.clone(),
609 None => {
610 row_errors.push((
611 idx,
612 AppError::Validation(format!("each item must have '{}'", pk)),
613 ));
614 continue;
615 }
616 };
617 let sp = format!("sp_{}", idx);
618 sqlx::query(&format!("SAVEPOINT {}", sp))
619 .execute(&mut **conn)
620 .await?;
621 let mut body_without_pk = body.clone();
622 body_without_pk.remove(&pk);
623 let q = update(
624 entity,
625 &id,
626 &body_without_pk,
627 schema_override,
628 caller_user_id,
629 dialect,
630 );
631 match Self::execute_returning_one_conn(conn, &q).await {
632 Ok(Some(row)) => {
633 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
634 .execute(&mut **conn)
635 .await?;
636 out.push(row);
637 }
638 Ok(None) => {
639 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
640 .execute(&mut **conn)
641 .await?;
642 }
643 Err(e) => {
644 sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
645 .execute(&mut **conn)
646 .await?;
647 row_errors.push((idx, e));
648 }
649 }
650 }
651 if !row_errors.is_empty() {
652 out.clear();
653 }
654 }
655 }
656 Ok((out, row_errors))
657 }
658
659 pub async fn query_history_many<'a>(
662 executor: &mut TenantExecutor<'a>,
663 sql: &str,
664 params: &[Value],
665 ) -> Result<Vec<Value>, AppError> {
666 Self::query_many_exec(executor, sql, params).await
667 }
668
669 pub async fn query_history_one<'a>(
672 executor: &mut TenantExecutor<'a>,
673 sql: &str,
674 id: &Value,
675 version: i64,
676 ) -> Result<Option<Value>, AppError> {
677 tracing::debug!(sql = %sql, "history query");
678 let mut query = sqlx::query(sql);
679 query = query.bind(Self::to_sqlx_param(id));
680 query = query.bind(version);
681 let row = match executor.executor {
682 TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
683 TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
684 };
685 Ok(row.map(|r| row_to_json(&r)))
686 }
687
688 async fn query_one_exec<'a>(
689 executor: &mut TenantExecutor<'a>,
690 sql: &str,
691 params: &[Value],
692 ) -> Result<Option<Value>, AppError> {
693 tracing::debug!(sql = %sql, params = ?params, "query");
694 let bind = Self::to_sqlx_param(¶ms[0]);
695 let row = match executor.executor {
696 TenantExecutorInner::Pool(pool) => {
697 sqlx::query(sql).bind(bind).fetch_optional(pool).await?
698 }
699 TenantExecutorInner::Conn(ref mut conn) => {
700 sqlx::query(sql)
701 .bind(bind)
702 .fetch_optional(&mut **conn)
703 .await?
704 }
705 };
706 Ok(row.map(|r| row_to_json(&r)))
707 }
708
709 async fn query_many_exec<'a>(
710 executor: &mut TenantExecutor<'a>,
711 sql: &str,
712 params: &[Value],
713 ) -> Result<Vec<Value>, AppError> {
714 tracing::debug!(sql = %sql, params = ?params, "query");
715 let mut query = sqlx::query(sql);
716 for p in params {
717 query = query.bind(Self::to_sqlx_param(p));
718 }
719 let rows = match executor.executor {
720 TenantExecutorInner::Pool(pool) => query.fetch_all(pool).await?,
721 TenantExecutorInner::Conn(ref mut conn) => query.fetch_all(&mut **conn).await?,
722 };
723 Ok(rows.iter().map(row_to_json).collect())
724 }
725
726 async fn execute_returning_one_exec<'a>(
727 executor: &mut TenantExecutor<'a>,
728 q: &QueryBuf,
729 ) -> Result<Option<Value>, AppError> {
730 tracing::debug!(sql = %q.sql, params = ?q.params, "query");
731 let mut query = sqlx::query(&q.sql);
732 for p in &q.params {
733 query = query.bind(Self::to_sqlx_param(p));
734 }
735 let row = match executor.executor {
736 TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
737 TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
738 };
739 Ok(row.map(|r| row_to_json(&r)))
740 }
741
742 async fn execute_returning_one_with_params_exec<'a>(
743 executor: &mut TenantExecutor<'a>,
744 sql: &str,
745 params: &[Value],
746 ) -> Result<Option<Value>, AppError> {
747 tracing::debug!(sql = %sql, params = ?params, "query");
748 let mut query = sqlx::query(sql);
749 for p in params {
750 query = query.bind(Self::to_sqlx_param(p));
751 }
752 let row = match executor.executor {
753 TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
754 TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
755 };
756 Ok(row.map(|r| row_to_json(&r)))
757 }
758
759 async fn execute_returning_one_conn(
760 conn: &mut Connection,
761 q: &QueryBuf,
762 ) -> Result<Option<Value>, AppError> {
763 tracing::debug!(sql = %q.sql, params = ?q.params, "query (conn)");
764 let mut query = sqlx::query(&q.sql);
765 for p in &q.params {
766 query = query.bind(Self::to_sqlx_param(p));
767 }
768 let row = query.fetch_optional(conn).await?;
769 Ok(row.map(|r| row_to_json(&r)))
770 }
771
772 async fn execute_returning_one_tx(
773 tx: &mut Connection,
774 q: &QueryBuf,
775 ) -> Result<Option<Value>, AppError> {
776 tracing::debug!(sql = %q.sql, params = ?q.params, "query (tx)");
777 let mut query = sqlx::query(&q.sql);
778 for p in &q.params {
779 query = query.bind(Self::to_sqlx_param(p));
780 }
781 let row = query.fetch_optional(&mut *tx).await?;
782 Ok(row.map(|r| row_to_json(&r)))
783 }
784
785 fn to_sqlx_param(v: &Value) -> BindValue {
786 BindValue::from_json(v).unwrap_or(BindValue::Null)
787 }
788
789 async fn run_versioned_update<'a>(
791 executor: &mut TenantExecutor<'a>,
792 id: &Value,
793 snap_q: QueryBuf,
794 upd_q: QueryBuf,
795 prune_q: Option<QueryBuf>,
796 keep_versions: Option<i64>,
797 ) -> Result<Option<Value>, AppError> {
798 match executor.executor {
799 TenantExecutorInner::Pool(pool) => {
800 let mut tx = pool.begin().await?;
801 let mut snap = sqlx::query(&snap_q.sql);
803 snap = snap.bind(Self::to_sqlx_param(&snap_q.params[0])); snap = snap.bind(Self::to_sqlx_param(id)); snap.execute(&mut *tx).await?;
806 let mut upd = sqlx::query(&upd_q.sql);
808 for p in &upd_q.params {
809 upd = upd.bind(Self::to_sqlx_param(p));
810 }
811 let row = upd.fetch_optional(&mut *tx).await?.map(|r| row_to_json(&r));
812 if let (Some(pq), Some(kv)) = (prune_q, keep_versions) {
814 let mut pr = sqlx::query(&pq.sql);
815 pr = pr.bind(Self::to_sqlx_param(id));
816 pr = pr.bind(kv);
817 pr.execute(&mut *tx).await?;
818 }
819 tx.commit().await?;
820 Ok(row)
821 }
822 TenantExecutorInner::Conn(ref mut conn) => {
823 sqlx::query("SAVEPOINT sp_versioned_update")
825 .execute(&mut **conn)
826 .await?;
827 let snap_res = async {
828 let mut snap = sqlx::query(&snap_q.sql);
829 snap = snap.bind(Self::to_sqlx_param(&snap_q.params[0]));
830 snap = snap.bind(Self::to_sqlx_param(id));
831 snap.execute(&mut **conn).await?;
832 let mut upd = sqlx::query(&upd_q.sql);
833 for p in &upd_q.params {
834 upd = upd.bind(Self::to_sqlx_param(p));
835 }
836 let row = upd
837 .fetch_optional(&mut **conn)
838 .await?
839 .map(|r| row_to_json(&r));
840 if let (Some(pq), Some(kv)) = (prune_q, keep_versions) {
841 let mut pr = sqlx::query(&pq.sql);
842 pr = pr.bind(Self::to_sqlx_param(id));
843 pr = pr.bind(kv);
844 pr.execute(&mut **conn).await?;
845 }
846 Ok::<_, sqlx::Error>(row)
847 }
848 .await;
849 match snap_res {
850 Ok(row) => {
851 sqlx::query("RELEASE SAVEPOINT sp_versioned_update")
852 .execute(&mut **conn)
853 .await?;
854 Ok(row)
855 }
856 Err(e) => {
857 sqlx::query("ROLLBACK TO SAVEPOINT sp_versioned_update")
858 .execute(&mut **conn)
859 .await?;
860 Err(AppError::Db(e))
861 }
862 }
863 }
864 }
865 }
866
867 async fn run_versioned_delete<'a>(
869 executor: &mut TenantExecutor<'a>,
870 id: &Value,
871 snap_q: QueryBuf,
872 del_q: QueryBuf,
873 ) -> Result<Option<Value>, AppError> {
874 match executor.executor {
875 TenantExecutorInner::Pool(pool) => {
876 let mut tx = pool.begin().await?;
877 let mut snap = sqlx::query(&snap_q.sql);
878 snap = snap.bind(Self::to_sqlx_param(&snap_q.params[0])); snap = snap.bind(Self::to_sqlx_param(id)); snap.execute(&mut *tx).await?;
881 let mut del = sqlx::query(&del_q.sql);
882 del = del.bind(Self::to_sqlx_param(id));
883 let row = del.fetch_optional(&mut *tx).await?.map(|r| row_to_json(&r));
884 tx.commit().await?;
885 Ok(row)
886 }
887 TenantExecutorInner::Conn(ref mut conn) => {
888 sqlx::query("SAVEPOINT sp_versioned_delete")
889 .execute(&mut **conn)
890 .await?;
891 let snap_res = async {
892 let mut snap = sqlx::query(&snap_q.sql);
893 snap = snap.bind(Self::to_sqlx_param(&snap_q.params[0]));
894 snap = snap.bind(Self::to_sqlx_param(id));
895 snap.execute(&mut **conn).await?;
896 let mut del = sqlx::query(&del_q.sql);
897 del = del.bind(Self::to_sqlx_param(id));
898 let row = del
899 .fetch_optional(&mut **conn)
900 .await?
901 .map(|r| row_to_json(&r));
902 Ok::<_, sqlx::Error>(row)
903 }
904 .await;
905 match snap_res {
906 Ok(row) => {
907 sqlx::query("RELEASE SAVEPOINT sp_versioned_delete")
908 .execute(&mut **conn)
909 .await?;
910 Ok(row)
911 }
912 Err(e) => {
913 sqlx::query("ROLLBACK TO SAVEPOINT sp_versioned_delete")
914 .execute(&mut **conn)
915 .await?;
916 Err(AppError::Db(e))
917 }
918 }
919 }
920 }
921 }
922
923 async fn insert_audit<'a>(
924 executor: &mut TenantExecutor<'a>,
925 entity: &ResolvedEntity,
926 action: &str,
927 row: &Value,
928 pre_row: Option<&Value>,
929 audit_by: Option<&str>,
930 schema_override: Option<&str>,
931 ) -> Result<(), AppError> {
932 let schema = schema_override.unwrap_or(&entity.schema_name);
933 let audit_table = format!(
934 "\"{}\".\"{}\"",
935 schema.replace('"', "\"\""),
936 format!("{}_audit", entity.table_name).replace('"', "\"\"")
937 );
938
939 let changed = if action == "update" {
940 pre_row.map(|pre| compute_changed_fields(pre, row, entity))
941 } else {
942 None
943 };
944
945 let mut col_names: Vec<String> = vec![
946 "\"audit_action\"".to_string(),
947 "\"audit_by\"".to_string(),
948 "\"changed_fields\"".to_string(),
949 ];
950 let mut placeholders: Vec<String> = Vec::new();
951 let mut params: Vec<Value> = Vec::new();
952
953 params.push(Value::String(action.to_string()));
954 placeholders.push(format!("${}", params.len()));
955
956 params.push(
957 audit_by
958 .map(|s| Value::String(s.to_string()))
959 .unwrap_or(Value::Null),
960 );
961 placeholders.push(format!("${}", params.len()));
962
963 params.push(changed.unwrap_or(Value::Null));
964 placeholders.push(format!("${}::jsonb", params.len()));
965
966 let row_obj = row.as_object();
967 for col in &entity.columns {
968 let raw = row_obj
969 .and_then(|o| o.get(&col.name))
970 .cloned()
971 .unwrap_or(Value::Null);
972 let val = coerce_json_value_for_pg_array(raw, col.pg_type.as_deref());
973 let param_num = params.len() + 1;
974 let ph = col
975 .pg_type
976 .as_deref()
977 .map(|t| format!("${}::{}", param_num, t))
978 .unwrap_or_else(|| format!("${}", param_num));
979 col_names.push(format!("\"{}\"", col.name));
980 placeholders.push(ph);
981 params.push(val);
982 }
983
984 let sql = format!(
985 "INSERT INTO {} ({}) VALUES ({})",
986 audit_table,
987 col_names.join(", "),
988 placeholders.join(", ")
989 );
990 tracing::debug!(sql = %sql, "audit insert");
991
992 let mut query = sqlx::query(&sql);
993 for p in ¶ms {
994 query = query.bind(Self::to_sqlx_param(p));
995 }
996 match executor.executor {
997 TenantExecutorInner::Pool(pool) => {
998 query.execute(pool).await?;
999 }
1000 TenantExecutorInner::Conn(ref mut conn) => {
1001 query.execute(&mut **conn).await?;
1002 }
1003 }
1004 Ok(())
1005 }
1006}
1007
1008fn compute_changed_fields(pre: &Value, post: &Value, entity: &ResolvedEntity) -> Value {
1009 let pre_obj = match pre.as_object() {
1010 Some(o) => o,
1011 None => return Value::Null,
1012 };
1013 let post_obj = match post.as_object() {
1014 Some(o) => o,
1015 None => return Value::Null,
1016 };
1017 let mut changes = serde_json::Map::new();
1018 for col in &entity.columns {
1019 let pre_val = pre_obj.get(&col.name).unwrap_or(&Value::Null);
1020 let post_val = post_obj.get(&col.name).unwrap_or(&Value::Null);
1021 if pre_val != post_val {
1022 let mut diff = serde_json::Map::new();
1023 diff.insert("old".to_string(), pre_val.clone());
1024 diff.insert("new".to_string(), post_val.clone());
1025 changes.insert(col.name.clone(), Value::Object(diff));
1026 }
1027 }
1028 Value::Object(changes)
1029}
1030
1031fn row_to_json(row: &DbRow) -> Value {
1032 use sqlx::Column;
1033 use sqlx::Row;
1034 let mut map = serde_json::Map::new();
1035 for col in row.columns() {
1036 let name = col.name();
1037 let v = cell_to_value(row, name);
1038 map.insert(name.to_string(), v);
1039 }
1040 Value::Object(map)
1041}
1042
1043fn cell_to_value(row: &DbRow, name: &str) -> Value {
1044 use sqlx::Row;
1045 if let Ok(Some(n)) = row.try_get::<Option<i16>, _>(name) {
1046 return Value::Number(n.into());
1047 }
1048 if let Ok(Some(n)) = row.try_get::<Option<i32>, _>(name) {
1049 return Value::Number(n.into());
1050 }
1051 if let Ok(Some(n)) = row.try_get::<Option<i64>, _>(name) {
1052 return Value::Number(n.into());
1053 }
1054 if let Ok(Some(n)) = row.try_get::<Option<f32>, _>(name) {
1055 if let Some(n) = serde_json::Number::from_f64(n as f64) {
1056 return Value::Number(n);
1057 }
1058 }
1059 if let Ok(Some(n)) = row.try_get::<Option<f64>, _>(name) {
1060 if let Some(n) = serde_json::Number::from_f64(n) {
1061 return Value::Number(n);
1062 }
1063 }
1064 if let Ok(Some(b)) = row.try_get::<Option<bool>, _>(name) {
1065 return Value::Bool(b);
1066 }
1067 #[cfg(feature = "postgres")]
1068 if let Ok(Some(vec)) = row.try_get::<Option<Vec<String>>, _>(name) {
1069 return Value::Array(vec.into_iter().map(Value::String).collect());
1070 }
1071 #[cfg(feature = "postgres")]
1072 if let Ok(Some(vec)) = row.try_get::<Option<Vec<uuid::Uuid>>, _>(name) {
1073 return Value::Array(
1074 vec.into_iter()
1075 .map(|u| Value::String(u.to_string()))
1076 .collect(),
1077 );
1078 }
1079 #[cfg(feature = "postgres")]
1080 if let Ok(Some(vec)) = row.try_get::<Option<Vec<i64>>, _>(name) {
1081 return Value::Array(vec.into_iter().map(|n| Value::Number(n.into())).collect());
1082 }
1083 if let Ok(Some(u)) = row.try_get::<Option<uuid::Uuid>, _>(name) {
1084 return Value::String(u.to_string());
1085 }
1086 if let Ok(Some(d)) = row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(name) {
1087 return Value::String(d.to_rfc3339());
1088 }
1089 if let Ok(Some(d)) = row.try_get::<Option<chrono::NaiveDateTime>, _>(name) {
1090 return Value::String(d.format("%Y-%m-%dT%H:%M:%S%.f").to_string());
1091 }
1092 if let Ok(Some(d)) = row.try_get::<Option<chrono::NaiveDate>, _>(name) {
1093 return Value::String(d.format("%Y-%m-%d").to_string());
1094 }
1095 if let Ok(Some(s)) = row.try_get::<Option<String>, _>(name) {
1096 if let Ok(n) = s.trim().parse::<f64>() {
1098 if let Some(num) = serde_json::Number::from_f64(n) {
1099 return Value::Number(num);
1100 }
1101 }
1102 return Value::String(s);
1103 }
1104 if let Ok(Some(j)) = row.try_get::<Option<serde_json::Value>, _>(name) {
1105 return j;
1106 }
1107 Value::Null
1108}