1use crate::config::ResolvedEntity;
4use crate::db::pool::{Connection, DbRow, Pool};
5use crate::db::Dialect;
6use crate::error::AppError;
7use crate::sql::{
8 archive, coerce_json_value_for_pg_array, delete, insert, select_by_column_in, select_by_id,
9 select_list, select_list_with_includes, unarchive, update, BindValue, FilterNode,
10 IncludeSelect, QueryBuf, SortSpec,
11};
12use serde_json::Value;
13use std::collections::HashMap;
14
15pub enum TenantExecutorInner<'a> {
17 Pool(&'a Pool),
18 Conn(&'a mut Connection),
19}
20
21pub struct TenantExecutor<'a> {
22 pub executor: TenantExecutorInner<'a>,
23 pub dialect: &'a dyn crate::db::Dialect,
24}
25
26impl<'a> TenantExecutor<'a> {
27 pub fn pool(pool: &'a Pool, dialect: &'a dyn crate::db::Dialect) -> Self {
28 TenantExecutor {
29 executor: TenantExecutorInner::Pool(pool),
30 dialect,
31 }
32 }
33 pub fn conn(conn: &'a mut Connection, dialect: &'a dyn crate::db::Dialect) -> Self {
34 TenantExecutor {
35 executor: TenantExecutorInner::Conn(conn),
36 dialect,
37 }
38 }
39}
40
41pub struct CrudService;
42
43impl CrudService {
44 #[allow(clippy::too_many_arguments)]
47 pub async fn list<'a>(
48 executor: &mut TenantExecutor<'a>,
49 entity: &ResolvedEntity,
50 filter: Option<&FilterNode>,
51 sort: &[SortSpec],
52 limit: Option<u32>,
53 offset: Option<u32>,
54 filter_includes: &[IncludeSelect<'_>],
55 schema_override: Option<&str>,
56 dialect: &dyn Dialect,
57 ) -> Result<Vec<Value>, AppError> {
58 const DEFAULT_LIMIT: u32 = 100;
59 let limit = limit.unwrap_or(DEFAULT_LIMIT).min(1000);
60 let offset = offset.unwrap_or(0);
61 let q = select_list(
62 entity,
63 filter,
64 sort,
65 Some(limit),
66 Some(offset),
67 filter_includes,
68 schema_override,
69 dialect,
70 )?;
71 Self::query_many_exec(executor, &q.sql, &q.params).await
72 }
73
74 #[allow(clippy::too_many_arguments)]
77 pub async fn list_with_includes<'a>(
78 executor: &mut TenantExecutor<'a>,
79 entity: &ResolvedEntity,
80 filter: Option<&FilterNode>,
81 sort: &[SortSpec],
82 limit: Option<u32>,
83 offset: Option<u32>,
84 includes: &[IncludeSelect<'_>],
85 filter_includes: &[IncludeSelect<'_>],
86 schema_override: Option<&str>,
87 dialect: &dyn Dialect,
88 ) -> Result<Vec<Value>, AppError> {
89 const DEFAULT_LIMIT: u32 = 100;
90 let limit = limit.unwrap_or(DEFAULT_LIMIT).min(1000);
91 let offset = offset.unwrap_or(0);
92 let q = select_list_with_includes(
93 entity,
94 filter,
95 sort,
96 Some(limit),
97 Some(offset),
98 includes,
99 filter_includes,
100 schema_override,
101 dialect,
102 )?;
103 Self::query_many_exec(executor, &q.sql, &q.params).await
104 }
105
106 pub async fn read<'a>(
108 executor: &mut TenantExecutor<'a>,
109 entity: &ResolvedEntity,
110 id: &Value,
111 schema_override: Option<&str>,
112 dialect: &dyn Dialect,
113 ) -> Result<Option<Value>, AppError> {
114 let q = select_by_id(entity, schema_override, dialect);
115 Self::query_one_exec(executor, &q.sql, std::slice::from_ref(id)).await
116 }
117
118 pub async fn fetch_where_column_in<'a>(
120 executor: &mut TenantExecutor<'a>,
121 entity: &ResolvedEntity,
122 column_name: &str,
123 values: &[Value],
124 schema_override: Option<&str>,
125 dialect: &dyn Dialect,
126 ) -> Result<Vec<Value>, AppError> {
127 if values.is_empty() {
128 return Ok(Vec::new());
129 }
130 let q = select_by_column_in(entity, column_name, values, schema_override, dialect);
131 Self::query_many_exec(executor, &q.sql, &q.params).await
132 }
133
134 pub async fn create<'a>(
138 executor: &mut TenantExecutor<'a>,
139 entity: &ResolvedEntity,
140 body: &HashMap<String, Value>,
141 schema_override: Option<&str>,
142 rls_tenant_id: Option<&str>,
143 caller_user_id: Option<&str>,
144 dialect: &dyn Dialect,
145 ) -> Result<Value, AppError> {
146 let include_pk = body.contains_key(&entity.pk_columns[0]);
147 let q = insert(
148 entity,
149 body,
150 include_pk,
151 schema_override,
152 rls_tenant_id,
153 caller_user_id,
154 dialect,
155 );
156 let row = Self::execute_returning_one_exec(executor, &q)
157 .await?
158 .ok_or_else(|| AppError::Db(sqlx::Error::RowNotFound))?;
159 if entity.audit_log {
160 Self::insert_audit(
161 executor,
162 entity,
163 "create",
164 &row,
165 None,
166 caller_user_id,
167 schema_override,
168 )
169 .await?;
170 }
171 Ok(row)
172 }
173
174 pub async fn update<'a>(
177 executor: &mut TenantExecutor<'a>,
178 entity: &ResolvedEntity,
179 id: &Value,
180 body: &HashMap<String, Value>,
181 schema_override: Option<&str>,
182 caller_user_id: Option<&str>,
183 dialect: &dyn Dialect,
184 ) -> Result<Option<Value>, AppError> {
185 let pre_row = if entity.audit_log {
186 let q = select_by_id(entity, schema_override, dialect);
187 Self::query_one_exec(executor, &q.sql, std::slice::from_ref(id)).await?
188 } else {
189 None
190 };
191 let q = update(entity, id, body, schema_override, caller_user_id, dialect);
192 let result = Self::execute_returning_one_exec(executor, &q).await?;
193 if entity.audit_log {
194 if let Some(ref post_row) = result {
195 Self::insert_audit(
196 executor,
197 entity,
198 "update",
199 post_row,
200 pre_row.as_ref(),
201 caller_user_id,
202 schema_override,
203 )
204 .await?;
205 }
206 }
207 Ok(result)
208 }
209
210 pub async fn delete<'a>(
213 executor: &mut TenantExecutor<'a>,
214 entity: &ResolvedEntity,
215 id: &Value,
216 schema_override: Option<&str>,
217 caller_user_id: Option<&str>,
218 dialect: &dyn Dialect,
219 ) -> Result<Option<Value>, AppError> {
220 let q = delete(entity, schema_override, dialect);
221 let result = Self::execute_returning_one_with_params_exec(
222 executor,
223 &q.sql,
224 std::slice::from_ref(id),
225 )
226 .await?;
227 if entity.audit_log {
228 if let Some(ref deleted_row) = result {
229 Self::insert_audit(
230 executor,
231 entity,
232 "delete",
233 deleted_row,
234 None,
235 caller_user_id,
236 schema_override,
237 )
238 .await?;
239 }
240 }
241 Ok(result)
242 }
243
244 pub async fn archive<'a>(
247 executor: &mut TenantExecutor<'a>,
248 entity: &ResolvedEntity,
249 archive_field: &str,
250 id: &Value,
251 schema_override: Option<&str>,
252 dialect: &dyn Dialect,
253 ) -> Result<Option<Value>, AppError> {
254 let q = archive(entity, archive_field, schema_override, dialect);
255 Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
256 .await
257 }
258
259 pub async fn unarchive<'a>(
262 executor: &mut TenantExecutor<'a>,
263 entity: &ResolvedEntity,
264 archive_field: &str,
265 id: &Value,
266 schema_override: Option<&str>,
267 dialect: &dyn Dialect,
268 ) -> Result<Option<Value>, AppError> {
269 let q = unarchive(entity, archive_field, schema_override, dialect);
270 Self::execute_returning_one_with_params_exec(executor, &q.sql, std::slice::from_ref(id))
271 .await
272 }
273
274 pub async fn bulk_create<'a>(
278 executor: &mut TenantExecutor<'a>,
279 entity: &ResolvedEntity,
280 items: &[HashMap<String, Value>],
281 schema_override: Option<&str>,
282 rls_tenant_id: Option<&str>,
283 caller_user_id: Option<&str>,
284 dialect: &dyn Dialect,
285 ) -> Result<Vec<Value>, AppError> {
286 const BULK_LIMIT: usize = 100;
287 if items.len() > BULK_LIMIT {
288 return Err(AppError::BadRequest(format!(
289 "bulk create limited to {} items",
290 BULK_LIMIT
291 )));
292 }
293 let mut out = Vec::with_capacity(items.len());
294 match executor.executor {
295 TenantExecutorInner::Pool(pool) => {
296 let mut tx = pool.begin().await?;
297 for body in items {
298 let include_pk = body.contains_key(&entity.pk_columns[0]);
299 let q = insert(
300 entity,
301 body,
302 include_pk,
303 schema_override,
304 rls_tenant_id,
305 caller_user_id,
306 dialect,
307 );
308 let row = Self::execute_returning_one_tx(&mut tx, &q)
309 .await?
310 .unwrap_or(Value::Null);
311 out.push(row);
312 }
313 tx.commit().await?;
314 }
315 TenantExecutorInner::Conn(ref mut conn) => {
316 for body in items {
317 let include_pk = body.contains_key(&entity.pk_columns[0]);
318 let q = insert(
319 entity,
320 body,
321 include_pk,
322 schema_override,
323 rls_tenant_id,
324 caller_user_id,
325 dialect,
326 );
327 let row = Self::execute_returning_one_conn(conn, &q)
328 .await?
329 .unwrap_or(Value::Null);
330 out.push(row);
331 }
332 }
333 }
334 Ok(out)
335 }
336
337 pub async fn bulk_create_collecting<'a>(
341 executor: &mut TenantExecutor<'a>,
342 entity: &ResolvedEntity,
343 items: &[HashMap<String, Value>],
344 schema_override: Option<&str>,
345 rls_tenant_id: Option<&str>,
346 caller_user_id: Option<&str>,
347 dialect: &dyn Dialect,
348 ) -> Result<(Vec<Value>, Vec<(usize, AppError)>), AppError> {
349 const BULK_LIMIT: usize = 100;
350 if items.len() > BULK_LIMIT {
351 return Err(AppError::BadRequest(format!(
352 "bulk create limited to {} items",
353 BULK_LIMIT
354 )));
355 }
356 let mut out = Vec::with_capacity(items.len());
357 let mut row_errors: Vec<(usize, AppError)> = Vec::new();
358 match executor.executor {
359 TenantExecutorInner::Pool(pool) => {
360 let mut tx = pool.begin().await?;
361 for (idx, body) in items.iter().enumerate() {
362 let sp = format!("sp_{}", idx);
363 sqlx::query(&format!("SAVEPOINT {}", sp))
364 .execute(&mut *tx)
365 .await?;
366 let include_pk = body.contains_key(&entity.pk_columns[0]);
367 let q = insert(
368 entity,
369 body,
370 include_pk,
371 schema_override,
372 rls_tenant_id,
373 caller_user_id,
374 dialect,
375 );
376 match Self::execute_returning_one_tx(&mut tx, &q).await {
377 Ok(row) => {
378 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
379 .execute(&mut *tx)
380 .await?;
381 out.push(row.unwrap_or(Value::Null));
382 }
383 Err(e) => {
384 sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
385 .execute(&mut *tx)
386 .await?;
387 row_errors.push((idx, e));
388 }
389 }
390 }
391 if row_errors.is_empty() {
392 tx.commit().await?;
393 } else {
394 tx.rollback().await?;
395 out.clear();
396 }
397 }
398 TenantExecutorInner::Conn(ref mut conn) => {
399 for (idx, body) in items.iter().enumerate() {
400 let sp = format!("sp_{}", idx);
401 sqlx::query(&format!("SAVEPOINT {}", sp))
402 .execute(&mut **conn)
403 .await?;
404 let include_pk = body.contains_key(&entity.pk_columns[0]);
405 let q = insert(
406 entity,
407 body,
408 include_pk,
409 schema_override,
410 rls_tenant_id,
411 caller_user_id,
412 dialect,
413 );
414 match Self::execute_returning_one_conn(conn, &q).await {
415 Ok(row) => {
416 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
417 .execute(&mut **conn)
418 .await?;
419 out.push(row.unwrap_or(Value::Null));
420 }
421 Err(e) => {
422 sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
423 .execute(&mut **conn)
424 .await?;
425 row_errors.push((idx, e));
426 }
427 }
428 }
429 if !row_errors.is_empty() {
430 out.clear();
431 }
432 }
433 }
434 Ok((out, row_errors))
435 }
436
437 pub async fn bulk_update<'a>(
440 executor: &mut TenantExecutor<'a>,
441 entity: &ResolvedEntity,
442 items: &[HashMap<String, Value>],
443 schema_override: Option<&str>,
444 caller_user_id: Option<&str>,
445 dialect: &dyn Dialect,
446 ) -> Result<Vec<Value>, AppError> {
447 const BULK_LIMIT: usize = 100;
448 if items.len() > BULK_LIMIT {
449 return Err(AppError::BadRequest(format!(
450 "bulk update limited to {} items",
451 BULK_LIMIT
452 )));
453 }
454 let pk = &entity.pk_columns[0];
455 let mut out = Vec::with_capacity(items.len());
456 match executor.executor {
457 TenantExecutorInner::Pool(pool) => {
458 let mut tx = pool.begin().await?;
459 for body in items {
460 let id = body.get(pk).ok_or_else(|| {
461 AppError::Validation(format!("each item must have '{}'", pk))
462 })?;
463 let mut body_without_pk = body.clone();
464 body_without_pk.remove(pk);
465 let q = update(
466 entity,
467 id,
468 &body_without_pk,
469 schema_override,
470 caller_user_id,
471 dialect,
472 );
473 if let Some(row) = Self::execute_returning_one_tx(&mut tx, &q).await? {
474 out.push(row);
475 }
476 }
477 tx.commit().await?;
478 }
479 TenantExecutorInner::Conn(ref mut conn) => {
480 for body in items {
481 let id = body.get(pk).ok_or_else(|| {
482 AppError::Validation(format!("each item must have '{}'", pk))
483 })?;
484 let mut body_without_pk = body.clone();
485 body_without_pk.remove(pk);
486 let q = update(
487 entity,
488 id,
489 &body_without_pk,
490 schema_override,
491 caller_user_id,
492 dialect,
493 );
494 if let Some(row) = Self::execute_returning_one_conn(conn, &q).await? {
495 out.push(row);
496 }
497 }
498 }
499 }
500 Ok(out)
501 }
502
503 pub async fn bulk_update_collecting<'a>(
508 executor: &mut TenantExecutor<'a>,
509 entity: &ResolvedEntity,
510 items: &[HashMap<String, Value>],
511 schema_override: Option<&str>,
512 caller_user_id: Option<&str>,
513 dialect: &dyn Dialect,
514 ) -> Result<(Vec<Value>, Vec<(usize, AppError)>), AppError> {
515 const BULK_LIMIT: usize = 100;
516 if items.len() > BULK_LIMIT {
517 return Err(AppError::BadRequest(format!(
518 "bulk update limited to {} items",
519 BULK_LIMIT
520 )));
521 }
522 let pk = entity.pk_columns[0].clone();
523 let mut out = Vec::with_capacity(items.len());
524 let mut row_errors: Vec<(usize, AppError)> = Vec::new();
525 match executor.executor {
526 TenantExecutorInner::Pool(pool) => {
527 let mut tx = pool.begin().await?;
528 for (idx, body) in items.iter().enumerate() {
529 let id = match body.get(&pk) {
530 Some(id) => id.clone(),
531 None => {
532 row_errors.push((
533 idx,
534 AppError::Validation(format!("each item must have '{}'", pk)),
535 ));
536 continue;
537 }
538 };
539 let sp = format!("sp_{}", idx);
540 sqlx::query(&format!("SAVEPOINT {}", sp))
541 .execute(&mut *tx)
542 .await?;
543 let mut body_without_pk = body.clone();
544 body_without_pk.remove(&pk);
545 let q = update(
546 entity,
547 &id,
548 &body_without_pk,
549 schema_override,
550 caller_user_id,
551 dialect,
552 );
553 match Self::execute_returning_one_tx(&mut tx, &q).await {
554 Ok(Some(row)) => {
555 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
556 .execute(&mut *tx)
557 .await?;
558 out.push(row);
559 }
560 Ok(None) => {
561 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
562 .execute(&mut *tx)
563 .await?;
564 }
565 Err(e) => {
566 sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
567 .execute(&mut *tx)
568 .await?;
569 row_errors.push((idx, e));
570 }
571 }
572 }
573 if row_errors.is_empty() {
574 tx.commit().await?;
575 } else {
576 tx.rollback().await?;
577 out.clear();
578 }
579 }
580 TenantExecutorInner::Conn(ref mut conn) => {
581 for (idx, body) in items.iter().enumerate() {
582 let id = match body.get(&pk) {
583 Some(id) => id.clone(),
584 None => {
585 row_errors.push((
586 idx,
587 AppError::Validation(format!("each item must have '{}'", pk)),
588 ));
589 continue;
590 }
591 };
592 let sp = format!("sp_{}", idx);
593 sqlx::query(&format!("SAVEPOINT {}", sp))
594 .execute(&mut **conn)
595 .await?;
596 let mut body_without_pk = body.clone();
597 body_without_pk.remove(&pk);
598 let q = update(
599 entity,
600 &id,
601 &body_without_pk,
602 schema_override,
603 caller_user_id,
604 dialect,
605 );
606 match Self::execute_returning_one_conn(conn, &q).await {
607 Ok(Some(row)) => {
608 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
609 .execute(&mut **conn)
610 .await?;
611 out.push(row);
612 }
613 Ok(None) => {
614 sqlx::query(&format!("RELEASE SAVEPOINT {}", sp))
615 .execute(&mut **conn)
616 .await?;
617 }
618 Err(e) => {
619 sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", sp))
620 .execute(&mut **conn)
621 .await?;
622 row_errors.push((idx, e));
623 }
624 }
625 }
626 if !row_errors.is_empty() {
627 out.clear();
628 }
629 }
630 }
631 Ok((out, row_errors))
632 }
633
634 async fn query_one_exec<'a>(
635 executor: &mut TenantExecutor<'a>,
636 sql: &str,
637 params: &[Value],
638 ) -> Result<Option<Value>, AppError> {
639 tracing::debug!(sql = %sql, params = ?params, "query");
640 let bind = Self::to_sqlx_param(¶ms[0]);
641 let row = match executor.executor {
642 TenantExecutorInner::Pool(pool) => {
643 sqlx::query(sql).bind(bind).fetch_optional(pool).await?
644 }
645 TenantExecutorInner::Conn(ref mut conn) => {
646 sqlx::query(sql)
647 .bind(bind)
648 .fetch_optional(&mut **conn)
649 .await?
650 }
651 };
652 Ok(row.map(|r| row_to_json(&r)))
653 }
654
655 async fn query_many_exec<'a>(
656 executor: &mut TenantExecutor<'a>,
657 sql: &str,
658 params: &[Value],
659 ) -> Result<Vec<Value>, AppError> {
660 tracing::debug!(sql = %sql, params = ?params, "query");
661 let mut query = sqlx::query(sql);
662 for p in params {
663 query = query.bind(Self::to_sqlx_param(p));
664 }
665 let rows = match executor.executor {
666 TenantExecutorInner::Pool(pool) => query.fetch_all(pool).await?,
667 TenantExecutorInner::Conn(ref mut conn) => query.fetch_all(&mut **conn).await?,
668 };
669 Ok(rows.iter().map(row_to_json).collect())
670 }
671
672 async fn execute_returning_one_exec<'a>(
673 executor: &mut TenantExecutor<'a>,
674 q: &QueryBuf,
675 ) -> Result<Option<Value>, AppError> {
676 tracing::debug!(sql = %q.sql, params = ?q.params, "query");
677 let mut query = sqlx::query(&q.sql);
678 for p in &q.params {
679 query = query.bind(Self::to_sqlx_param(p));
680 }
681 let row = match executor.executor {
682 TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
683 TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
684 };
685 Ok(row.map(|r| row_to_json(&r)))
686 }
687
688 async fn execute_returning_one_with_params_exec<'a>(
689 executor: &mut TenantExecutor<'a>,
690 sql: &str,
691 params: &[Value],
692 ) -> Result<Option<Value>, AppError> {
693 tracing::debug!(sql = %sql, params = ?params, "query");
694 let mut query = sqlx::query(sql);
695 for p in params {
696 query = query.bind(Self::to_sqlx_param(p));
697 }
698 let row = match executor.executor {
699 TenantExecutorInner::Pool(pool) => query.fetch_optional(pool).await?,
700 TenantExecutorInner::Conn(ref mut conn) => query.fetch_optional(&mut **conn).await?,
701 };
702 Ok(row.map(|r| row_to_json(&r)))
703 }
704
705 async fn execute_returning_one_conn(
706 conn: &mut Connection,
707 q: &QueryBuf,
708 ) -> Result<Option<Value>, AppError> {
709 tracing::debug!(sql = %q.sql, params = ?q.params, "query (conn)");
710 let mut query = sqlx::query(&q.sql);
711 for p in &q.params {
712 query = query.bind(Self::to_sqlx_param(p));
713 }
714 let row = query.fetch_optional(conn).await?;
715 Ok(row.map(|r| row_to_json(&r)))
716 }
717
718 async fn execute_returning_one_tx(
719 tx: &mut Connection,
720 q: &QueryBuf,
721 ) -> Result<Option<Value>, AppError> {
722 tracing::debug!(sql = %q.sql, params = ?q.params, "query (tx)");
723 let mut query = sqlx::query(&q.sql);
724 for p in &q.params {
725 query = query.bind(Self::to_sqlx_param(p));
726 }
727 let row = query.fetch_optional(&mut *tx).await?;
728 Ok(row.map(|r| row_to_json(&r)))
729 }
730
731 fn to_sqlx_param(v: &Value) -> BindValue {
732 BindValue::from_json(v).unwrap_or(BindValue::Null)
733 }
734
735 async fn insert_audit<'a>(
736 executor: &mut TenantExecutor<'a>,
737 entity: &ResolvedEntity,
738 action: &str,
739 row: &Value,
740 pre_row: Option<&Value>,
741 audit_by: Option<&str>,
742 schema_override: Option<&str>,
743 ) -> Result<(), AppError> {
744 let schema = schema_override.unwrap_or(&entity.schema_name);
745 let audit_table = format!(
746 "\"{}\".\"{}\"",
747 schema.replace('"', "\"\""),
748 format!("{}_audit", entity.table_name).replace('"', "\"\"")
749 );
750
751 let changed = if action == "update" {
752 pre_row.map(|pre| compute_changed_fields(pre, row, entity))
753 } else {
754 None
755 };
756
757 let mut col_names: Vec<String> = vec![
758 "\"audit_action\"".to_string(),
759 "\"audit_by\"".to_string(),
760 "\"changed_fields\"".to_string(),
761 ];
762 let mut placeholders: Vec<String> = Vec::new();
763 let mut params: Vec<Value> = Vec::new();
764
765 params.push(Value::String(action.to_string()));
766 placeholders.push(format!("${}", params.len()));
767
768 params.push(
769 audit_by
770 .map(|s| Value::String(s.to_string()))
771 .unwrap_or(Value::Null),
772 );
773 placeholders.push(format!("${}", params.len()));
774
775 params.push(changed.unwrap_or(Value::Null));
776 placeholders.push(format!("${}::jsonb", params.len()));
777
778 let row_obj = row.as_object();
779 for col in &entity.columns {
780 let raw = row_obj
781 .and_then(|o| o.get(&col.name))
782 .cloned()
783 .unwrap_or(Value::Null);
784 let val = coerce_json_value_for_pg_array(raw, col.pg_type.as_deref());
785 let param_num = params.len() + 1;
786 let ph = col
787 .pg_type
788 .as_deref()
789 .map(|t| format!("${}::{}", param_num, t))
790 .unwrap_or_else(|| format!("${}", param_num));
791 col_names.push(format!("\"{}\"", col.name));
792 placeholders.push(ph);
793 params.push(val);
794 }
795
796 let sql = format!(
797 "INSERT INTO {} ({}) VALUES ({})",
798 audit_table,
799 col_names.join(", "),
800 placeholders.join(", ")
801 );
802 tracing::debug!(sql = %sql, "audit insert");
803
804 let mut query = sqlx::query(&sql);
805 for p in ¶ms {
806 query = query.bind(Self::to_sqlx_param(p));
807 }
808 match executor.executor {
809 TenantExecutorInner::Pool(pool) => {
810 query.execute(pool).await?;
811 }
812 TenantExecutorInner::Conn(ref mut conn) => {
813 query.execute(&mut **conn).await?;
814 }
815 }
816 Ok(())
817 }
818}
819
820fn compute_changed_fields(pre: &Value, post: &Value, entity: &ResolvedEntity) -> Value {
821 let pre_obj = match pre.as_object() {
822 Some(o) => o,
823 None => return Value::Null,
824 };
825 let post_obj = match post.as_object() {
826 Some(o) => o,
827 None => return Value::Null,
828 };
829 let mut changes = serde_json::Map::new();
830 for col in &entity.columns {
831 let pre_val = pre_obj.get(&col.name).unwrap_or(&Value::Null);
832 let post_val = post_obj.get(&col.name).unwrap_or(&Value::Null);
833 if pre_val != post_val {
834 let mut diff = serde_json::Map::new();
835 diff.insert("old".to_string(), pre_val.clone());
836 diff.insert("new".to_string(), post_val.clone());
837 changes.insert(col.name.clone(), Value::Object(diff));
838 }
839 }
840 Value::Object(changes)
841}
842
843fn row_to_json(row: &DbRow) -> Value {
844 use sqlx::Column;
845 use sqlx::Row;
846 let mut map = serde_json::Map::new();
847 for col in row.columns() {
848 let name = col.name();
849 let v = cell_to_value(row, name);
850 map.insert(name.to_string(), v);
851 }
852 Value::Object(map)
853}
854
855fn cell_to_value(row: &DbRow, name: &str) -> Value {
856 use sqlx::Row;
857 if let Ok(Some(n)) = row.try_get::<Option<i16>, _>(name) {
858 return Value::Number(n.into());
859 }
860 if let Ok(Some(n)) = row.try_get::<Option<i32>, _>(name) {
861 return Value::Number(n.into());
862 }
863 if let Ok(Some(n)) = row.try_get::<Option<i64>, _>(name) {
864 return Value::Number(n.into());
865 }
866 if let Ok(Some(n)) = row.try_get::<Option<f32>, _>(name) {
867 if let Some(n) = serde_json::Number::from_f64(n as f64) {
868 return Value::Number(n);
869 }
870 }
871 if let Ok(Some(n)) = row.try_get::<Option<f64>, _>(name) {
872 if let Some(n) = serde_json::Number::from_f64(n) {
873 return Value::Number(n);
874 }
875 }
876 if let Ok(Some(b)) = row.try_get::<Option<bool>, _>(name) {
877 return Value::Bool(b);
878 }
879 #[cfg(feature = "postgres")]
880 if let Ok(Some(vec)) = row.try_get::<Option<Vec<String>>, _>(name) {
881 return Value::Array(vec.into_iter().map(Value::String).collect());
882 }
883 #[cfg(feature = "postgres")]
884 if let Ok(Some(vec)) = row.try_get::<Option<Vec<uuid::Uuid>>, _>(name) {
885 return Value::Array(
886 vec.into_iter()
887 .map(|u| Value::String(u.to_string()))
888 .collect(),
889 );
890 }
891 #[cfg(feature = "postgres")]
892 if let Ok(Some(vec)) = row.try_get::<Option<Vec<i64>>, _>(name) {
893 return Value::Array(vec.into_iter().map(|n| Value::Number(n.into())).collect());
894 }
895 if let Ok(Some(u)) = row.try_get::<Option<uuid::Uuid>, _>(name) {
896 return Value::String(u.to_string());
897 }
898 if let Ok(Some(d)) = row.try_get::<Option<chrono::DateTime<chrono::Utc>>, _>(name) {
899 return Value::String(d.to_rfc3339());
900 }
901 if let Ok(Some(d)) = row.try_get::<Option<chrono::NaiveDateTime>, _>(name) {
902 return Value::String(d.format("%Y-%m-%dT%H:%M:%S%.f").to_string());
903 }
904 if let Ok(Some(d)) = row.try_get::<Option<chrono::NaiveDate>, _>(name) {
905 return Value::String(d.format("%Y-%m-%d").to_string());
906 }
907 if let Ok(Some(s)) = row.try_get::<Option<String>, _>(name) {
908 if let Ok(n) = s.trim().parse::<f64>() {
910 if let Some(num) = serde_json::Number::from_f64(n) {
911 return Value::Number(num);
912 }
913 }
914 return Value::String(s);
915 }
916 if let Ok(Some(j)) = row.try_get::<Option<serde_json::Value>, _>(name) {
917 return j;
918 }
919 Value::Null
920}