1use super::{
2 ConnectionPool, DatabaseRow, DecodeRow, EncodeColumn, Entity, Executor, GlobalPool,
3 IntoSqlValue, JoinOn, ModelHelper, PrimaryKey, QueryBuilder, column::ColumnExt,
4 mutation::MutationExt, query::QueryExt,
5};
6use serde::de::DeserializeOwned;
7use std::sync::atomic::Ordering::Relaxed;
8use zino_core::{
9 JsonValue, Map, bail,
10 error::Error,
11 extension::{JsonObjectExt, JsonValueExt},
12 model::{Column, ModelHooks, Mutation, Query, QueryContext},
13 warn,
14};
15
16pub trait Schema: 'static + Send + Sync + ModelHooks {
20 type PrimaryKey: PrimaryKey;
22
23 const PRIMARY_KEY_NAME: &'static str = "id";
25 const READER_NAME: &'static str = "main";
27 const WRITER_NAME: &'static str = "main";
29 const TABLE_NAME: Option<&'static str> = None;
31
32 fn primary_key(&self) -> &Self::PrimaryKey;
34
35 fn schema() -> &'static apache_avro::Schema;
37
38 fn columns() -> &'static [Column<'static>];
40
41 fn fields() -> &'static [&'static str];
43
44 fn read_only_fields() -> &'static [&'static str];
46
47 fn write_only_fields() -> &'static [&'static str];
49
50 async fn acquire_reader() -> Result<&'static ConnectionPool, Error>;
52
53 async fn acquire_writer() -> Result<&'static ConnectionPool, Error>;
55
56 #[inline]
60 fn driver_name() -> &'static str {
61 super::DRIVER_NAME
62 }
63
64 #[inline]
66 fn table_prefix() -> &'static str {
67 *super::TABLE_PREFIX
68 }
69
70 #[inline]
72 fn namespace_prefix() -> &'static str {
73 *super::NAMESPACE_PREFIX
74 }
75
76 #[inline]
78 fn table_name() -> &'static str {
79 Self::TABLE_NAME.unwrap_or_else(|| [Self::table_prefix(), Self::MODEL_NAME].concat().leak())
80 }
81
82 #[inline]
84 fn model_namespace() -> &'static str {
85 [Self::namespace_prefix(), Self::MODEL_NAME].concat().leak()
86 }
87
88 #[inline]
90 fn primary_key_name() -> &'static str {
91 Self::primary_key_column()
92 .extra()
93 .get_str("column_name")
94 .unwrap_or(Self::PRIMARY_KEY_NAME)
95 }
96
97 #[inline]
99 fn primary_key_value(&self) -> JsonValue {
100 self.primary_key().to_string().into()
101 }
102
103 #[inline]
105 fn primary_key_column() -> &'static Column<'static> {
106 Self::get_column(Self::PRIMARY_KEY_NAME).expect("primary key column should always exist")
107 }
108
109 #[inline]
111 fn get_column(key: &str) -> Option<&Column<'static>> {
112 let key = if let Some((name, field)) = key.split_once('.') {
113 if Self::model_name() == name || Self::table_name() == name {
114 field
115 } else {
116 return None;
117 }
118 } else {
119 key
120 };
121 Self::columns().iter().find(|col| col.name() == key)
122 }
123
124 #[inline]
126 fn get_writable_column(key: &str) -> Option<&Column<'static>> {
127 let key = if let Some((name, field)) = key.split_once('.') {
128 if Self::model_name() == name || Self::table_name() == name {
129 field
130 } else {
131 return None;
132 }
133 } else {
134 key
135 };
136 Self::columns()
137 .iter()
138 .find(|col| col.name() == key && !col.is_read_only())
139 }
140
141 #[inline]
143 fn has_column(key: &str) -> bool {
144 let key = if let Some((name, field)) = key.split_once('.') {
145 if Self::model_name() == name || Self::table_name() == name {
146 field
147 } else {
148 return false;
149 }
150 } else {
151 key
152 };
153 Self::columns().iter().any(|col| col.name() == key)
154 }
155
156 #[inline]
158 fn default_query() -> Query {
159 let mut query = Query::default();
160 query.allow_fields(Self::fields());
161 query.deny_fields(Self::write_only_fields());
162 query
163 }
164
165 #[inline]
167 fn default_mutation() -> Mutation {
168 let mut mutation = Mutation::default();
169 mutation.allow_fields(Self::fields());
170 mutation.deny_fields(Self::read_only_fields());
171 mutation
172 }
173
174 #[inline]
176 fn init_reader() -> Result<&'static ConnectionPool, Error> {
177 GlobalPool::get(Self::READER_NAME)
178 .ok_or_else(|| warn!("connection to the database is unavailable"))
179 }
180
181 #[inline]
183 fn init_writer() -> Result<&'static ConnectionPool, Error> {
184 GlobalPool::get(Self::WRITER_NAME)
185 .ok_or_else(|| warn!("connection to the database is unavailable"))
186 }
187
188 async fn create_table() -> Result<(), Error> {
190 let connection_pool = Self::init_writer()?;
191 if !connection_pool.auto_migration() {
192 return Ok(());
193 }
194 Self::before_create_table().await?;
195
196 let primary_key_name = Self::primary_key_name();
197 let table_name = Self::table_name();
198 let table_name_escaped = Query::escape_table_name(table_name);
199 let columns = Self::columns();
200 let mut definitions = columns
201 .iter()
202 .map(|col| col.field_definition(primary_key_name))
203 .collect::<Vec<_>>();
204 for col in columns {
205 let mut constraints = col.constraints();
206 if !constraints.is_empty() {
207 definitions.append(&mut constraints);
208 }
209 }
210
211 let definitions = definitions.join(",\n ");
212 let sql = format!("CREATE TABLE IF NOT EXISTS {table_name_escaped} (\n {definitions}\n);");
213 let pool = connection_pool.pool();
214 if let Err(err) = pool.execute(&sql).await {
215 tracing::error!(table_name, "fail to execute `{sql}`");
216 return Err(err);
217 }
218 Self::after_create_table().await?;
219 Ok(())
220 }
221
222 async fn synchronize_schema() -> Result<(), Error> {
224 let connection_pool = Self::init_writer()?;
225 if !connection_pool.auto_migration() {
226 return Ok(());
227 }
228
229 let mut table_name = Self::table_name();
230 let table_name_escaped = Query::escape_table_name(table_name);
231 if let Some((_, suffix)) = table_name.rsplit_once('.') {
232 table_name = suffix;
233 }
234
235 let sql = if cfg!(any(
236 feature = "orm-mariadb",
237 feature = "orm-mysql",
238 feature = "orm-tidb"
239 )) {
240 let table_schema = connection_pool.database();
241 format!(
242 "SELECT column_name, data_type, column_default, is_nullable \
243 FROM information_schema.columns \
244 WHERE table_schema = '{table_schema}' AND table_name = '{table_name}';"
245 )
246 } else if cfg!(feature = "orm-postgres") {
247 format!(
248 "SELECT column_name, data_type, column_default, is_nullable \
249 FROM information_schema.columns \
250 WHERE table_schema = 'public' AND table_name = '{table_name}';"
251 )
252 } else {
253 format!(
254 "SELECT p.name AS column_name, p.type AS data_type, \
255 p.dflt_value AS column_default, p.[notnull] AS is_not_null \
256 FROM sqlite_master m LEFT OUTER JOIN pragma_table_info((m.name)) p
257 ON m.name <> p.name WHERE m.name = '{table_name}';"
258 )
259 };
260 let pool = connection_pool.pool();
261 let rows = pool.fetch(&sql).await?;
262 let mut data = Vec::with_capacity(rows.len());
263 for row in rows {
264 data.push(Map::decode_row(&row)?);
265 }
266
267 let model_name = Self::model_name();
268 let primary_key_name = Self::primary_key_name();
269 for col in Self::columns() {
270 let column_type = col.column_type();
271 let column_name = col
272 .extra()
273 .get_str("column_name")
274 .unwrap_or_else(|| col.name());
275 let column_opt = data.iter().find(|d| {
276 d.get_str("column_name")
277 .or_else(|| d.get_str("COLUMN_NAME"))
278 == Some(column_name)
279 });
280 if let Some(d) = column_opt {
281 let data_type = d.get_str("data_type").or_else(|| d.get_str("DATA_TYPE"));
282 let column_default = d
283 .get_str("column_default")
284 .or_else(|| d.get_str("COLUMN_DEFAULT"));
285 let is_not_null = if cfg!(any(feature = "orm-mysql", feature = "orm-postgres")) {
286 d.get_str("is_nullable")
287 .or_else(|| d.get_str("IS_NULLABLE"))
288 .unwrap_or("YES")
289 .eq_ignore_ascii_case("NO")
290 } else {
291 d.get_str("is_not_null") == Some("1")
292 };
293 if !data_type.is_some_and(|t| col.is_compatible(t)) {
294 tracing::warn!(
295 model_name,
296 table_name,
297 column_name,
298 column_type,
299 data_type,
300 column_default,
301 "data type of `{column_name}` should be altered as `{column_type}`",
302 );
303 } else if col.is_not_null() != is_not_null && column_name != primary_key_name {
304 tracing::warn!(
305 model_name,
306 table_name,
307 column_name,
308 column_type,
309 data_type,
310 column_default,
311 is_not_null,
312 "`NOT NULL` constraint of `{column_name}` should be consistent",
313 );
314 }
315 } else {
316 let column_definition = col.field_definition(primary_key_name);
317 let sql =
318 format!("ALTER TABLE {table_name_escaped} ADD COLUMN {column_definition};");
319 pool.execute(&sql).await?;
320 tracing::warn!(
321 model_name,
322 table_name,
323 column_name,
324 column_type,
325 "a new column `{column_name}` has been added",
326 );
327 }
328 }
329 Ok(())
330 }
331
332 async fn create_indexes() -> Result<u64, Error> {
334 let connection_pool = Self::init_writer()?;
335 if !connection_pool.auto_migration() {
336 return Ok(0);
337 }
338
339 let mut table_name = Self::table_name();
340 let table_name_escaped = Query::escape_table_name(table_name);
341 if let Some((_, suffix)) = table_name.rsplit_once('.') {
342 table_name = suffix;
343 }
344
345 let pool = connection_pool.pool();
346 let columns = Self::columns();
347 let mut rows = 0;
348 if cfg!(any(
349 feature = "orm-mariadb",
350 feature = "orm-mysql",
351 feature = "orm-tidb"
352 )) {
353 let sql = format!("SHOW INDEXES FROM {table_name_escaped}");
354 if pool.fetch(&sql).await?.len() > 1 {
355 return Ok(0);
356 }
357
358 let mut text_search_columns = Vec::new();
359 for col in columns {
360 if let Some(index_type) = col.index_type() {
361 let column_name = col.name();
362 if matches!(index_type, "fulltext" | "text") {
363 text_search_columns.push(column_name);
364 } else if matches!(index_type, "unique" | "spatial") {
365 let index_type = index_type.to_uppercase();
366 let sql = format!(
367 "CREATE {index_type} INDEX {table_name}_{column_name}_index \
368 ON {table_name_escaped} ({column_name});"
369 );
370 rows = pool.execute(&sql).await?.rows_affected().max(rows);
371 } else if matches!(index_type, "btree" | "hash") {
372 let index_type = index_type.to_uppercase();
373 let sql = format!(
374 "CREATE INDEX {table_name}_{column_name}_index \
375 ON {table_name_escaped} ({column_name}) USING {index_type};"
376 );
377 rows = pool.execute(&sql).await?.rows_affected().max(rows);
378 }
379 }
380 }
381 if !text_search_columns.is_empty() {
382 let text_search_columns = text_search_columns.join(", ");
383 let sql = format!(
384 "CREATE FULLTEXT INDEX {table_name}_text_search_index \
385 ON {table_name_escaped} ({text_search_columns});"
386 );
387 rows = pool.execute(&sql).await?.rows_affected().max(rows);
388 }
389 } else if cfg!(feature = "orm-postgres") {
390 let mut text_search_columns = Vec::new();
391 let mut text_search_languages = Vec::new();
392 for col in columns {
393 if let Some(index_type) = col.index_type() {
394 let column_name = col.name();
395 if index_type.starts_with("text") {
396 let language = index_type.strip_prefix("text:").unwrap_or("english");
397 let column = format!("coalesce({column_name}, '')");
398 text_search_languages.push(language);
399 text_search_columns.push((language, column));
400 } else if index_type == "unique" {
401 let sql = format!(
402 "CREATE UNIQUE INDEX IF NOT EXISTS {table_name}_{column_name}_index \
403 ON {table_name_escaped} ({column_name});"
404 );
405 rows = pool.execute(&sql).await?.rows_affected().max(rows);
406 } else {
407 let sort_order = if index_type == "btree" { " DESC" } else { "" };
408 let sql = format!(
409 "CREATE INDEX IF NOT EXISTS {table_name}_{column_name}_index \
410 ON {table_name_escaped} \
411 USING {index_type}({column_name}{sort_order});"
412 );
413 rows = pool.execute(&sql).await?.rows_affected().max(rows);
414 }
415 }
416 }
417 for language in text_search_languages {
418 let text = text_search_columns
419 .iter()
420 .filter_map(|col| (col.0 == language).then_some(col.1.as_str()))
421 .collect::<Vec<_>>()
422 .join(" || ' ' || ");
423 let text_search = format!("to_tsvector('{language}', {text})");
424 let sql = format!(
425 "CREATE INDEX IF NOT EXISTS {table_name}_text_search_{language}_index \
426 ON {table_name_escaped} USING gin({text_search});"
427 );
428 rows = pool.execute(&sql).await?.rows_affected().max(rows);
429 }
430 } else {
431 for col in columns {
432 if let Some(index_type) = col.index_type() {
433 let column_name = col.name();
434 let index_type = if index_type == "unique" { "UNIQUE" } else { "" };
435 let sql = format!(
436 "CREATE {index_type} INDEX IF NOT EXISTS {table_name}_{column_name}_index \
437 ON {table_name_escaped} ({column_name});"
438 );
439 rows = pool.execute(&sql).await?.rows_affected().max(rows);
440 }
441 }
442 }
443 Ok(rows)
444 }
445
446 async fn prepare_insert(self) -> Result<QueryContext, Error> {
448 let table_name = if let Some(table) = self.before_prepare().await? {
449 Query::escape_table_name(&table)
450 } else {
451 Query::escape_table_name(Self::table_name())
452 };
453 let map = self.into_map();
454 let columns = Self::columns();
455
456 let mut fields = Vec::with_capacity(columns.len());
457 let values = columns
458 .iter()
459 .filter_map(|col| {
460 if col.auto_increment() {
461 None
462 } else {
463 let name = col.name();
464 let field = Query::format_field(name);
465 fields.push(field);
466 Some(col.encode_value(map.get(name)))
467 }
468 })
469 .collect::<Vec<_>>()
470 .join(", ");
471 let fields = fields.join(", ");
472 let sql = if cfg!(feature = "orm-postgres") {
473 let primary_key_name = Self::primary_key_name();
474 format!(
475 "INSERT INTO {table_name} ({fields}) VALUES ({values}) \
476 RETURNING {primary_key_name};"
477 )
478 } else {
479 format!("INSERT INTO {table_name} ({fields}) VALUES ({values});")
480 };
481 let mut ctx = Self::before_scan(&sql).await?;
482 ctx.set_query(sql);
483 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
484 ctx.cancel();
485 }
486 Ok(ctx)
487 }
488
489 async fn insert(mut self) -> Result<QueryContext, Error> {
491 let model_data = self.before_insert().await?;
492 let mut ctx = self.prepare_insert().await?;
493 if ctx.is_cancelled() {
494 return Ok(ctx);
495 }
496
497 let pool = Self::acquire_writer().await?.pool();
498 let (last_insert_id, rows_affected) =
499 if cfg!(feature = "orm-postgres") && Self::primary_key_column().auto_increment() {
500 let primary_key = sqlx::query_scalar(ctx.query()).fetch_one(pool).await?;
501 (Some(primary_key), 1)
502 } else {
503 let query_result = pool.execute(ctx.query()).await?;
504 Query::parse_query_result(query_result)
505 };
506 let success = rows_affected == 1;
507 if let Some(last_insert_id) = last_insert_id {
508 ctx.set_last_insert_id(last_insert_id);
509 }
510 ctx.set_query_result(rows_affected, success);
511 Self::after_scan(&ctx).await?;
512 Self::after_insert(&ctx, model_data).await?;
513 if success {
514 Ok(ctx)
515 } else {
516 bail!(
517 "{} rows are affected while it is expected to affect 1 row",
518 rows_affected
519 );
520 }
521 }
522
523 async fn prepare_insert_many(models: Vec<Self>) -> Result<QueryContext, Error> {
525 if models.is_empty() {
526 bail!("list of models to be inserted should be nonempty");
527 }
528
529 let columns = Self::columns();
530 let mut values = Vec::with_capacity(models.len());
531 for mut model in models.into_iter() {
532 let _model_data = model.before_insert().await?;
533
534 let map = model.into_map();
535 let entries = columns
536 .iter()
537 .map(|col| col.encode_value(map.get(col.name())))
538 .collect::<Vec<_>>()
539 .join(", ");
540 values.push(format!("({entries})"));
541 }
542
543 let table_name = Query::escape_table_name(Self::table_name());
544 let fields = Self::fields()
545 .iter()
546 .map(|&field| Query::format_field(field))
547 .collect::<Vec<_>>()
548 .join(", ");
549 let values = values.join(", ");
550 let sql = format!("INSERT INTO {table_name} ({fields}) VALUES {values};");
551 let mut ctx = Self::before_scan(&sql).await?;
552 ctx.set_query(sql);
553 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
554 ctx.cancel();
555 }
556 Ok(ctx)
557 }
558
559 async fn insert_many(models: Vec<Self>) -> Result<QueryContext, Error> {
561 let mut ctx = Self::prepare_insert_many(models).await?;
562 if ctx.is_cancelled() {
563 return Ok(ctx);
564 }
565
566 let pool = Self::acquire_writer().await?.pool();
567 let query_result = pool.execute(ctx.query()).await?;
568 ctx.set_query_result(query_result.rows_affected(), true);
569 Self::after_scan(&ctx).await?;
570 Ok(ctx)
571 }
572
573 async fn prepare_insert_from_subquery<C, E>(
575 columns: &[C],
576 subquery: QueryBuilder<E>,
577 ) -> Result<QueryContext, Error>
578 where
579 C: AsRef<str>,
580 E: Entity + Schema,
581 {
582 if columns.is_empty() {
583 bail!("a list of columns should be nonempty");
584 }
585
586 let table_name = Query::escape_table_name(Self::table_name());
587 let fields = columns
588 .iter()
589 .map(|col| col.as_ref())
590 .collect::<Vec<_>>()
591 .join(", ");
592 let subquery = subquery.build_subquery();
593 let sql = format!("INSERT INTO {table_name} ({fields}) {subquery};");
594 let mut ctx = Self::before_scan(&sql).await?;
595 ctx.set_query(sql);
596 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
597 ctx.cancel();
598 }
599 Ok(ctx)
600 }
601
602 async fn insert_from_subquery<C, E>(
604 columns: &[C],
605 subquery: QueryBuilder<E>,
606 ) -> Result<QueryContext, Error>
607 where
608 C: AsRef<str>,
609 E: Entity + Schema,
610 {
611 let mut ctx = Self::prepare_insert_from_subquery(columns, subquery).await?;
612 if ctx.is_cancelled() {
613 return Ok(ctx);
614 }
615
616 let pool = Self::acquire_writer().await?.pool();
617 let query_result = pool.execute(ctx.query()).await?;
618 ctx.set_query_result(query_result.rows_affected(), true);
619 Self::after_scan(&ctx).await?;
620 Ok(ctx)
621 }
622
623 async fn prepare_update(self) -> Result<QueryContext, Error> {
625 let table_name = if let Some(table) = self.before_prepare().await? {
626 Query::escape_table_name(&table)
627 } else {
628 Query::escape_table_name(Self::table_name())
629 };
630 let primary_key = Query::escape_string(self.primary_key());
631 let primary_key_name = Self::primary_key_name();
632 let map = self.into_map();
633 let read_only_fields = Self::read_only_fields();
634 let num_writable_fields = Self::fields().len() - read_only_fields.len();
635 let mut mutations = Vec::with_capacity(num_writable_fields);
636 for col in Self::columns() {
637 let field = col.name();
638 if !read_only_fields.contains(&field) {
639 let value = col.encode_value(map.get(field));
640 let field = Query::format_field(field);
641 mutations.push(format!("{field} = {value}"));
642 }
643 }
644
645 let mutations = mutations.join(", ");
646 let sql = format!(
647 "UPDATE {table_name} SET {mutations} WHERE {primary_key_name} = {primary_key};"
648 );
649 let mut ctx = Self::before_scan(&sql).await?;
650 ctx.set_query(sql);
651 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
652 ctx.cancel();
653 }
654 Ok(ctx)
655 }
656
657 async fn update(mut self) -> Result<QueryContext, Error> {
659 let model_data = self.before_update().await?;
660 let mut ctx = self.prepare_update().await?;
661 if ctx.is_cancelled() {
662 return Ok(ctx);
663 }
664
665 let pool = Self::acquire_writer().await?.pool();
666 let query_result = pool.execute(ctx.query()).await?;
667 let rows_affected = query_result.rows_affected();
668 let success = rows_affected == 1;
669 ctx.set_query_result(rows_affected, success);
670 Self::after_scan(&ctx).await?;
671 Self::after_update(&ctx, model_data).await?;
672 if success {
673 Ok(ctx)
674 } else {
675 bail!(
676 "{} rows are affected while it is expected to affect 1 row",
677 rows_affected
678 );
679 }
680 }
681
682 async fn prepare_update_partial<C: AsRef<str>>(
684 self,
685 columns: &[C],
686 ) -> Result<QueryContext, Error> {
687 let table_name = if let Some(table) = self.before_prepare().await? {
688 Query::escape_table_name(&table)
689 } else {
690 Query::escape_table_name(Self::table_name())
691 };
692 let primary_key = Query::escape_string(self.primary_key());
693 let primary_key_name = Self::primary_key_name();
694 let map = self.into_map();
695 let read_only_fields = Self::read_only_fields();
696 let mut mutations = Vec::with_capacity(columns.len());
697 for col in columns {
698 let field = col.as_ref();
699 if !read_only_fields.contains(&field) {
700 if let Some(col) = Self::columns().iter().find(|col| col.name() == field) {
701 let value = col.encode_value(map.get(field));
702 let field = Query::format_field(field);
703 mutations.push(format!("{field} = {value}"));
704 }
705 }
706 }
707
708 let mutations = mutations.join(", ");
709 let sql = format!(
710 "UPDATE {table_name} SET {mutations} WHERE {primary_key_name} = {primary_key};"
711 );
712 let mut ctx = Self::before_scan(&sql).await?;
713 ctx.set_query(sql);
714 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
715 ctx.cancel();
716 }
717 Ok(ctx)
718 }
719
720 async fn update_partial<C: AsRef<str>>(mut self, columns: &[C]) -> Result<QueryContext, Error> {
722 let model_data = self.before_update().await?;
723 let mut ctx = self.prepare_update_partial(columns).await?;
724 if ctx.is_cancelled() {
725 return Ok(ctx);
726 }
727
728 let pool = Self::acquire_writer().await?.pool();
729 let query_result = pool.execute(ctx.query()).await?;
730 let rows_affected = query_result.rows_affected();
731 let success = rows_affected == 1;
732 ctx.set_query_result(rows_affected, success);
733 Self::after_scan(&ctx).await?;
734 Self::after_update(&ctx, model_data).await?;
735 if success {
736 Ok(ctx)
737 } else {
738 bail!(
739 "{} rows are affected while it is expected to affect 1 row",
740 rows_affected
741 );
742 }
743 }
744
745 async fn prepare_update_one(
747 query: &Query,
748 mutation: &mut Mutation,
749 ) -> Result<QueryContext, Error> {
750 Self::before_mutation(query, mutation).await?;
751
752 let primary_key_name = Self::primary_key_name();
753 let table_name = query.format_table_name::<Self>();
754 let filters = query.format_filters::<Self>();
755 let updates = mutation.format_updates::<Self>();
756 let sql = if cfg!(any(
757 feature = "orm-mariadb",
758 feature = "orm-mysql",
759 feature = "orm-tidb"
760 )) {
761 format!(
764 "UPDATE {table_name} SET {updates} WHERE {primary_key_name} IN \
765 (SELECT * from (SELECT {primary_key_name} FROM {table_name} {filters}) AS t);"
766 )
767 } else {
768 let sort = query.format_sort();
770 format!(
771 "UPDATE {table_name} SET {updates} WHERE {primary_key_name} IN \
772 (SELECT {primary_key_name} FROM {table_name} {filters} {sort} LIMIT 1);"
773 )
774 };
775 let mut ctx = Self::before_scan(&sql).await?;
776 ctx.set_query(sql);
777 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
778 ctx.cancel();
779 }
780 Ok(ctx)
781 }
782
783 async fn update_one(query: &Query, mutation: &mut Mutation) -> Result<QueryContext, Error> {
785 let mut ctx = Self::prepare_update_one(query, mutation).await?;
786 if ctx.is_cancelled() {
787 return Ok(ctx);
788 }
789
790 let pool = Self::acquire_writer().await?.pool();
791 let query_result = pool.execute(ctx.query()).await?;
792 let rows_affected = query_result.rows_affected();
793 let success = rows_affected <= 1;
794 ctx.set_query_result(rows_affected, success);
795 Self::after_scan(&ctx).await?;
796 Self::after_mutation(&ctx).await?;
797 if success {
798 Ok(ctx)
799 } else {
800 bail!(
801 "{} rows are affected while it is expected to affect at most 1 row",
802 rows_affected
803 );
804 }
805 }
806
807 async fn prepare_update_many(
809 query: &Query,
810 mutation: &mut Mutation,
811 ) -> Result<QueryContext, Error> {
812 Self::before_mutation(query, mutation).await?;
813
814 let table_name = query.format_table_name::<Self>();
815 let filters = query.format_filters::<Self>();
816 let updates = mutation.format_updates::<Self>();
817 let sql = format!("UPDATE {table_name} SET {updates} {filters};");
818 let mut ctx = Self::before_scan(&sql).await?;
819 ctx.set_query(sql);
820 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
821 ctx.cancel();
822 }
823 Ok(ctx)
824 }
825
826 async fn update_many(query: &Query, mutation: &mut Mutation) -> Result<QueryContext, Error> {
828 let mut ctx = Self::prepare_update_many(query, mutation).await?;
829 if ctx.is_cancelled() {
830 return Ok(ctx);
831 }
832
833 let pool = Self::acquire_writer().await?.pool();
834 let query_result = pool.execute(ctx.query()).await?;
835 ctx.set_query_result(query_result.rows_affected(), true);
836 Self::after_scan(&ctx).await?;
837 Self::after_mutation(&ctx).await?;
838 Ok(ctx)
839 }
840
841 async fn prepare_upsert(self) -> Result<QueryContext, Error> {
843 let table_name = if let Some(table) = self.before_prepare().await? {
844 Query::escape_table_name(&table)
845 } else {
846 Query::escape_table_name(Self::table_name())
847 };
848 let map = self.into_map();
849 let num_fields = Self::fields().len();
850 let read_only_fields = Self::read_only_fields();
851 let num_writable_fields = num_fields - read_only_fields.len();
852 let mut fields = Vec::with_capacity(num_fields);
853 let mut values = Vec::with_capacity(num_fields);
854 let mut mutations = Vec::with_capacity(num_writable_fields);
855 for col in Self::columns() {
856 let name = col.name();
857 let field = Query::format_field(name);
858 let value = col.encode_value(map.get(name));
859 if !read_only_fields.contains(&name) {
860 mutations.push(format!("{field} = {value}"));
861 }
862 fields.push(field);
863 values.push(value);
864 }
865
866 let fields = fields.join(", ");
867 let values = values.join(", ");
868 let mutations = mutations.join(", ");
869 let sql = if cfg!(any(
870 feature = "orm-mariadb",
871 feature = "orm-mysql",
872 feature = "orm-tidb"
873 )) {
874 format!(
875 "INSERT INTO {table_name} ({fields}) VALUES ({values}) \
876 ON DUPLICATE KEY UPDATE {mutations};"
877 )
878 } else {
879 let primary_key_name = Self::primary_key_name();
880
881 format!(
883 "INSERT INTO {table_name} ({fields}) VALUES ({values}) \
884 ON CONFLICT ({primary_key_name}) DO UPDATE SET {mutations} \
885 RETURNING {primary_key_name};"
886 )
887 };
888 let mut ctx = Self::before_scan(&sql).await?;
889 ctx.set_query(sql);
890 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
891 ctx.cancel();
892 }
893 Ok(ctx)
894 }
895
896 async fn upsert(mut self) -> Result<QueryContext, Error> {
898 let model_data = self.before_upsert().await?;
899 let mut ctx = self.prepare_upsert().await?;
900 if ctx.is_cancelled() {
901 return Ok(ctx);
902 }
903
904 let pool = Self::acquire_writer().await?.pool();
905 let (last_insert_id, rows_affected) =
906 if cfg!(feature = "orm-postgres") && Self::primary_key_column().auto_increment() {
907 let primary_key = sqlx::query_scalar(ctx.query()).fetch_one(pool).await?;
908 (Some(primary_key), 1)
909 } else {
910 let query_result = pool.execute(ctx.query()).await?;
911 Query::parse_query_result(query_result)
912 };
913 let success = rows_affected == 1;
914 if let Some(last_insert_id) = last_insert_id {
915 ctx.set_last_insert_id(last_insert_id);
916 }
917 ctx.set_query_result(rows_affected, success);
918 Self::after_scan(&ctx).await?;
919 Self::after_upsert(&ctx, model_data).await?;
920 if success {
921 Ok(ctx)
922 } else {
923 bail!(
924 "{} rows are affected while it is expected to affect 1 row",
925 rows_affected
926 );
927 }
928 }
929
930 async fn prepare_delete(&self) -> Result<QueryContext, Error> {
932 let table_name = if let Some(table) = self.before_prepare().await? {
933 Query::escape_table_name(&table)
934 } else {
935 Query::escape_table_name(Self::table_name())
936 };
937 let primary_key_name = Self::primary_key_name();
938 let placeholder = Query::placeholder(1);
939 let sql = if cfg!(feature = "orm-postgres") {
940 let type_annotation = Self::primary_key_column().type_annotation();
941 format!(
942 "DELETE FROM {table_name} \
943 WHERE {primary_key_name} = ({placeholder}){type_annotation};"
944 )
945 } else {
946 format!("DELETE FROM {table_name} WHERE {primary_key_name} = {placeholder};")
947 };
948 let mut ctx = Self::before_scan(&sql).await?;
949 ctx.set_query(sql);
950 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
951 ctx.cancel();
952 }
953 Ok(ctx)
954 }
955
956 async fn delete(mut self) -> Result<QueryContext, Error> {
958 let model_data = self.before_delete().await?;
959 let mut ctx = self.prepare_delete().await?;
960 if ctx.is_cancelled() {
961 return Ok(ctx);
962 }
963
964 let pool = Self::acquire_writer().await?.pool();
965 let primary_key = self.primary_key();
966 let query_result = pool.execute_with(ctx.query(), &[primary_key]).await?;
967 let rows_affected = query_result.rows_affected();
968 let success = rows_affected == 1;
969 ctx.add_argument(primary_key);
970 ctx.set_query_result(rows_affected, success);
971 Self::after_scan(&ctx).await?;
972 self.after_delete(&ctx, model_data).await?;
973 if success {
974 Ok(ctx)
975 } else {
976 bail!(
977 "{} rows are affected while it is expected to affect 1 row",
978 rows_affected
979 );
980 }
981 }
982
983 async fn prepare_delete_one(query: &Query) -> Result<QueryContext, Error> {
985 Self::before_query(query).await?;
986
987 let primary_key_name = Self::primary_key_name();
988 let table_name = query.format_table_name::<Self>();
989 let filters = query.format_filters::<Self>();
990 let sort = query.format_sort();
991 let sql = format!(
992 "DELETE FROM {table_name} WHERE {primary_key_name} IN \
993 (SELECT {primary_key_name} FROM {table_name} {filters} {sort} LIMIT 1);"
994 );
995 let mut ctx = Self::before_scan(&sql).await?;
996 ctx.set_query(sql);
997 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
998 ctx.cancel();
999 }
1000 Ok(ctx)
1001 }
1002
1003 async fn delete_one(query: &Query) -> Result<QueryContext, Error> {
1005 let mut ctx = Self::prepare_delete_one(query).await?;
1006 if ctx.is_cancelled() {
1007 return Ok(ctx);
1008 }
1009
1010 let pool = Self::acquire_writer().await?.pool();
1011 let query_result = pool.execute(ctx.query()).await?;
1012 let rows_affected = query_result.rows_affected();
1013 let success = rows_affected <= 1;
1014 ctx.set_query_result(rows_affected, success);
1015 Self::after_scan(&ctx).await?;
1016 Self::after_query(&ctx).await?;
1017 if success {
1018 Ok(ctx)
1019 } else {
1020 bail!(
1021 "{} rows are affected while it is expected to affect at most 1 row",
1022 rows_affected
1023 );
1024 }
1025 }
1026
1027 async fn prepare_delete_many(query: &Query) -> Result<QueryContext, Error> {
1029 Self::before_query(query).await?;
1030
1031 let table_name = query.format_table_name::<Self>();
1032 let filters = query.format_filters::<Self>();
1033 let sort = query.format_sort();
1034 let pagination = query.format_pagination();
1035 let sql = if cfg!(feature = "orm-postgres") {
1036 let primary_key_name = Self::primary_key_name();
1037 format!(
1038 "DELETE FROM {table_name} WHERE {primary_key_name} IN \
1039 (SELECT {primary_key_name} FROM {table_name} {filters} {sort} {pagination});"
1040 )
1041 } else {
1042 format!("DELETE FROM {table_name} {filters} {sort} {pagination};")
1043 };
1044 let mut ctx = Self::before_scan(&sql).await?;
1045 ctx.set_query(sql);
1046 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1047 ctx.cancel();
1048 }
1049 Ok(ctx)
1050 }
1051
1052 async fn delete_many(query: &Query) -> Result<QueryContext, Error> {
1054 let mut ctx = Self::prepare_delete_many(query).await?;
1055 if ctx.is_cancelled() {
1056 return Ok(ctx);
1057 }
1058
1059 let pool = Self::acquire_writer().await?.pool();
1060 let query_result = pool.execute(ctx.query()).await?;
1061 ctx.set_query_result(query_result.rows_affected(), true);
1062 Self::after_scan(&ctx).await?;
1063 Self::after_query(&ctx).await?;
1064 Ok(ctx)
1065 }
1066
1067 async fn prepare_delete_by_subquery<C, E>(
1069 columns: &[C],
1070 subquery: QueryBuilder<E>,
1071 ) -> Result<QueryContext, Error>
1072 where
1073 C: AsRef<str>,
1074 E: Entity + Schema,
1075 {
1076 if columns.is_empty() {
1077 bail!("a list of columns should be nonempty");
1078 }
1079
1080 let table_name = Query::escape_table_name(Self::table_name());
1081 let fields = columns
1082 .iter()
1083 .map(|col| col.as_ref())
1084 .collect::<Vec<_>>()
1085 .join(", ");
1086 let subquery = subquery.build_subquery();
1087 let sql = format!("DELETE FROM {table_name} WHERE ({fields}) IN {subquery};");
1088 let mut ctx = Self::before_scan(&sql).await?;
1089 ctx.set_query(sql);
1090 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1091 ctx.cancel();
1092 }
1093 Ok(ctx)
1094 }
1095
1096 async fn delete_by_subquery<C, E>(
1098 columns: &[C],
1099 subquery: QueryBuilder<E>,
1100 ) -> Result<QueryContext, Error>
1101 where
1102 C: AsRef<str>,
1103 E: Entity + Schema,
1104 {
1105 let mut ctx = Self::prepare_delete_by_subquery(columns, subquery).await?;
1106 if ctx.is_cancelled() {
1107 return Ok(ctx);
1108 }
1109
1110 let pool = Self::acquire_writer().await?.pool();
1111 let query_result = pool.execute(ctx.query()).await?;
1112 ctx.set_query_result(query_result.rows_affected(), true);
1113 Self::after_scan(&ctx).await?;
1114 Self::after_query(&ctx).await?;
1115 Ok(ctx)
1116 }
1117
1118 async fn find<T>(query: &Query) -> Result<Vec<T>, Error>
1121 where
1122 T: DecodeRow<DatabaseRow, Error = Error>,
1123 {
1124 Self::before_query(query).await?;
1125
1126 let table_name = query.format_table_name::<Self>();
1127 let projection = query.format_table_fields::<Self>();
1128 let filters = query.format_filters::<Self>();
1129 let sort = query.format_sort();
1130 let pagination = query.format_pagination();
1131 let sql = format!("SELECT {projection} FROM {table_name} {filters} {sort} {pagination};");
1132 let mut ctx = Self::before_scan(&sql).await?;
1133 ctx.set_query(&sql);
1134
1135 let pool = Self::acquire_reader().await?.pool();
1136 let rows = pool.fetch(ctx.query()).await?;
1137 let mut data = Vec::with_capacity(rows.len());
1138 for row in rows {
1139 data.push(T::decode_row(&row)?);
1140 }
1141 ctx.set_query_result(u64::try_from(data.len())?, true);
1142 Self::after_scan(&ctx).await?;
1143 Self::after_query(&ctx).await?;
1144 Ok(data)
1145 }
1146
1147 async fn find_as<T: DeserializeOwned>(query: &Query) -> Result<Vec<T>, Error> {
1150 let mut data = Self::find::<Map>(query).await?;
1151 let translate_enabled = query.translate_enabled();
1152 for model in data.iter_mut() {
1153 translate_enabled.then(|| Self::translate_model(model));
1154 Self::after_decode(model).await?;
1155 }
1156 serde_json::from_value(data.into()).map_err(Error::from)
1157 }
1158
1159 async fn find_one<T>(query: &Query) -> Result<Option<T>, Error>
1162 where
1163 T: DecodeRow<DatabaseRow, Error = Error>,
1164 {
1165 Self::before_query(query).await?;
1166
1167 let table_name = query.format_table_name::<Self>();
1168 let projection = query.format_table_fields::<Self>();
1169 let filters = query.format_filters::<Self>();
1170 let sort = query.format_sort();
1171 let sql = format!("SELECT {projection} FROM {table_name} {filters} {sort} LIMIT 1;");
1172 let mut ctx = Self::before_scan(&sql).await?;
1173 ctx.set_query(sql);
1174
1175 let pool = Self::acquire_reader().await?.pool();
1176 let (num_rows, data) = if let Some(row) = pool.fetch_optional(ctx.query()).await? {
1177 (1, Some(T::decode_row(&row)?))
1178 } else {
1179 (0, None)
1180 };
1181 ctx.set_query_result(num_rows, true);
1182 Self::after_scan(&ctx).await?;
1183 Self::after_query(&ctx).await?;
1184 Ok(data)
1185 }
1186
1187 async fn find_one_as<T: DeserializeOwned>(query: &Query) -> Result<Option<T>, Error> {
1190 match Self::find_one::<Map>(query).await? {
1191 Some(mut data) => {
1192 query
1193 .translate_enabled()
1194 .then(|| Self::translate_model(&mut data));
1195 Self::after_decode(&mut data).await?;
1196 serde_json::from_value(data.into()).map_err(Error::from)
1197 }
1198 None => Ok(None),
1199 }
1200 }
1201
1202 async fn populate<C: AsRef<str>>(
1205 query: &mut Query,
1206 data: &mut Vec<Map>,
1207 columns: &[C],
1208 ) -> Result<u64, Error> {
1209 Self::before_query(query).await?;
1210
1211 let primary_key_name = Self::primary_key_name();
1212 let mut values = Vec::new();
1213 for row in data.iter() {
1214 for col in columns {
1215 if let Some(value) = row.get(col.as_ref()) {
1216 if let JsonValue::Array(vec) = value {
1217 for value in vec {
1218 if !values.contains(value) {
1219 values.push(value.to_owned());
1220 }
1221 }
1222 } else if !values.contains(value) {
1223 values.push(value.to_owned());
1224 }
1225 }
1226 }
1227 }
1228
1229 let num_values = values.len();
1230 if num_values > 0 {
1231 let primary_key_values = Map::from_entry("$in", values);
1232 query.add_filter(primary_key_name, primary_key_values);
1233 } else {
1234 return Ok(0);
1235 }
1236
1237 let table_name = query.format_table_name::<Self>();
1238 let projection = query.format_table_fields::<Self>();
1239 let filters = query.format_filters::<Self>();
1240 let sql = format!("SELECT {projection} FROM {table_name} {filters};");
1241 let mut ctx = Self::before_scan(&sql).await?;
1242 ctx.set_query(&sql);
1243
1244 let pool = Self::acquire_reader().await?.pool();
1245 let rows = pool.fetch(ctx.query()).await?;
1246 let translate_enabled = query.translate_enabled();
1247 let mut associations = Vec::with_capacity(num_values);
1248 for row in rows {
1249 let mut map = Map::decode_row(&row)?;
1250 let primary_key = map.get(primary_key_name).cloned();
1251 Self::after_populate(&mut map).await?;
1252 translate_enabled.then(|| Self::translate_model(&mut map));
1253 Self::after_decode(&mut map).await?;
1254 if let Some(key) = primary_key {
1255 associations.push((key, map));
1256 }
1257 }
1258
1259 let associations_len = u64::try_from(associations.len())?;
1260 ctx.set_query_result(associations_len, true);
1261 Self::after_scan(&ctx).await?;
1262 Self::after_query(&ctx).await?;
1263
1264 for row in data {
1265 for col in columns {
1266 let field = col.as_ref();
1267 if let Some(vec) = row.get_array(field).filter(|vec| !vec.is_empty()) {
1268 let populated_field = [field, "_populated"].concat();
1269 let populated_values = vec
1270 .iter()
1271 .map(|key| {
1272 let populated_value = associations
1273 .iter()
1274 .find_map(|(k, v)| (key == k).then_some(v));
1275 if let Some(value) = populated_value {
1276 value.to_owned().into()
1277 } else {
1278 key.to_owned()
1279 }
1280 })
1281 .collect::<Vec<_>>();
1282 row.upsert(populated_field, populated_values);
1283 } else if let Some(key) = row.get(field) {
1284 let populated_value = associations
1285 .iter()
1286 .find_map(|(k, v)| (key == k).then_some(v));
1287 if let Some(value) = populated_value {
1288 let populated_field = [field, "_populated"].concat();
1289 row.upsert(populated_field, value.to_owned());
1290 }
1291 }
1292 }
1293 }
1294 Ok(associations_len)
1295 }
1296
1297 async fn populate_one<C: AsRef<str>>(
1300 query: &mut Query,
1301 data: &mut Map,
1302 columns: &[C],
1303 ) -> Result<(), Error> {
1304 Self::before_query(query).await?;
1305
1306 let primary_key_name = Self::primary_key_name();
1307 let mut values = Vec::new();
1308 for col in columns {
1309 if let Some(value) = data.get(col.as_ref()) {
1310 if let JsonValue::Array(vec) = value {
1311 for value in vec {
1312 if !values.contains(value) {
1313 values.push(value.to_owned());
1314 }
1315 }
1316 } else if !values.contains(value) {
1317 values.push(value.to_owned());
1318 }
1319 }
1320 }
1321
1322 let num_values = values.len();
1323 if num_values > 0 {
1324 let primary_key_values = Map::from_entry("$in", values);
1325 query.add_filter(primary_key_name, primary_key_values);
1326 } else {
1327 return Ok(());
1328 }
1329
1330 let table_name = query.format_table_name::<Self>();
1331 let projection = query.format_projection();
1332 let filters = query.format_filters::<Self>();
1333 let sql = format!("SELECT {projection} FROM {table_name} {filters};");
1334 let mut ctx = Self::before_scan(&sql).await?;
1335 ctx.set_query(&sql);
1336
1337 let pool = Self::acquire_reader().await?.pool();
1338 let rows = pool.fetch(ctx.query()).await?;
1339 let translate_enabled = query.translate_enabled();
1340 let mut associations = Vec::with_capacity(num_values);
1341 for row in rows {
1342 let mut map = Map::decode_row(&row)?;
1343 let primary_key = map.get(primary_key_name).cloned();
1344 Self::after_populate(&mut map).await?;
1345 translate_enabled.then(|| Self::translate_model(&mut map));
1346 Self::after_decode(&mut map).await?;
1347 if let Some(key) = primary_key {
1348 associations.push((key, map));
1349 }
1350 }
1351 ctx.set_query_result(u64::try_from(associations.len())?, true);
1352 Self::after_scan(&ctx).await?;
1353 Self::after_query(&ctx).await?;
1354
1355 for col in columns {
1356 let field = col.as_ref();
1357 if let Some(vec) = data.get_array(field).filter(|vec| !vec.is_empty()) {
1358 let populated_field = [field, "_populated"].concat();
1359 let populated_values = vec
1360 .iter()
1361 .map(|key| {
1362 let populated_value = associations
1363 .iter()
1364 .find_map(|(k, v)| (key == k).then_some(v));
1365 if let Some(value) = populated_value {
1366 value.to_owned().into()
1367 } else {
1368 key.to_owned()
1369 }
1370 })
1371 .collect::<Vec<_>>();
1372 data.upsert(populated_field, populated_values);
1373 } else if let Some(key) = data.get(field) {
1374 let populated_value = associations
1375 .iter()
1376 .find_map(|(k, v)| (key == k).then_some(v));
1377 if let Some(value) = populated_value {
1378 let populated_field = [field, "_populated"].concat();
1379 data.upsert(populated_field, value.to_owned());
1380 }
1381 }
1382 }
1383 Ok(())
1384 }
1385
1386 async fn lookup<T>(query: &Query, joins: &[JoinOn]) -> Result<Vec<T>, Error>
1389 where
1390 T: DecodeRow<DatabaseRow, Error = Error>,
1391 {
1392 Self::before_query(query).await?;
1393
1394 let join_conditions = joins
1395 .iter()
1396 .map(|join_on| {
1397 let join_type = join_on.join_type().as_str();
1398 let join_table = join_on.join_table();
1399 let on_conditions = join_on.format_conditions();
1400 format!("{join_type} {join_table} ON {on_conditions}")
1401 })
1402 .collect::<Vec<_>>()
1403 .join(" ");
1404 let table_name = query.format_table_name::<Self>();
1405 let projection = query.format_table_fields::<Self>();
1406 let filters = query.format_filters::<Self>();
1407 let sort = query.format_sort();
1408 let pagination = query.format_pagination();
1409 let sql = format!(
1410 "SELECT {projection} FROM {table_name} {join_conditions} {filters} {sort} {pagination};"
1411 );
1412 let mut ctx = Self::before_scan(&sql).await?;
1413 ctx.set_query(&sql);
1414
1415 let pool = Self::acquire_reader().await?.pool();
1416 let rows = pool.fetch(ctx.query()).await?;
1417 let mut data = Vec::with_capacity(rows.len());
1418 for row in rows {
1419 data.push(T::decode_row(&row)?);
1420 }
1421 ctx.set_query_result(u64::try_from(data.len())?, true);
1422 Self::after_scan(&ctx).await?;
1423 Self::after_query(&ctx).await?;
1424 Ok(data)
1425 }
1426
1427 async fn lookup_as<T: DeserializeOwned>(
1430 query: &Query,
1431 joins: &[JoinOn],
1432 ) -> Result<Vec<T>, Error> {
1433 let mut data = Self::lookup::<Map>(query, joins).await?;
1434 let translate_enabled = query.translate_enabled();
1435 for model in data.iter_mut() {
1436 translate_enabled.then(|| Self::translate_model(model));
1437 Self::after_decode(model).await?;
1438 }
1439 serde_json::from_value(data.into()).map_err(Error::from)
1440 }
1441
1442 async fn exists(query: &Query) -> Result<bool, Error> {
1444 Self::before_query(query).await?;
1445
1446 let table_name = query.format_table_name::<Self>();
1447 let filters = query.format_filters::<Self>();
1448 let sql = format!("SELECT 1 FROM {table_name} {filters} LIMIT 1;");
1449 let mut ctx = Self::before_scan(&sql).await?;
1450 ctx.set_query(sql);
1451
1452 let pool = Self::acquire_reader().await?.pool();
1453 let optional_row = pool.fetch_optional(ctx.query()).await?;
1454 let num_rows = if optional_row.is_some() { 1 } else { 0 };
1455 ctx.set_query_result(num_rows, true);
1456 Self::after_scan(&ctx).await?;
1457 Self::after_query(&ctx).await?;
1458 Ok(num_rows == 1)
1459 }
1460
1461 async fn count(query: &Query) -> Result<u64, Error> {
1463 Self::before_count(query).await?;
1464
1465 let table_name = query.format_table_name::<Self>();
1466 let filters = query.format_filters::<Self>();
1467 let sql = format!("SELECT count(*) AS count FROM {table_name} {filters};");
1468 let mut ctx = Self::before_scan(&sql).await?;
1469 ctx.set_query(sql);
1470
1471 let pool = Self::acquire_reader().await?.pool();
1472 let row = pool.fetch_one(ctx.query()).await?;
1473 let map = Map::decode_row(&row)?;
1474
1475 let count = map.parse_u64("count").transpose()?.unwrap_or_default();
1477 ctx.set_query_result(count, true);
1478 Self::after_scan(&ctx).await?;
1479 Self::after_count(&ctx).await?;
1480 Ok(count)
1481 }
1482
1483 async fn count_many<C, T>(query: &Query, columns: &[(C, bool)]) -> Result<T, Error>
1486 where
1487 C: AsRef<str>,
1488 T: DecodeRow<DatabaseRow, Error = Error>,
1489 {
1490 Self::before_count(query).await?;
1491
1492 let table_name = query.format_table_name::<Self>();
1493 let filters = query.format_filters::<Self>();
1494 let projection = columns
1495 .iter()
1496 .map(|(col, distinct)| {
1497 let col_name = col.as_ref();
1498 let field = Query::format_field(col_name);
1499 if col_name != "*" {
1500 if *distinct {
1501 format!(r#"count(distinct {field}) AS {col_name}_distinct"#)
1502 } else {
1503 format!(r#"count({field}) AS {col_name}_count"#)
1504 }
1505 } else {
1506 "count(*)".to_owned()
1507 }
1508 })
1509 .collect::<Vec<_>>()
1510 .join(", ");
1511 let sql = format!("SELECT {projection} FROM {table_name} {filters};");
1512 let mut ctx = Self::before_scan(&sql).await?;
1513 ctx.set_query(sql);
1514
1515 let pool = Self::acquire_reader().await?.pool();
1516 let row = pool.fetch_one(ctx.query()).await?;
1517 ctx.set_query_result(1, true);
1518 Self::after_scan(&ctx).await?;
1519 Self::after_count(&ctx).await?;
1520 T::decode_row(&row)
1521 }
1522
1523 async fn count_many_as<C, T>(query: &Query, columns: &[(C, bool)]) -> Result<T, Error>
1526 where
1527 C: AsRef<str>,
1528 T: DeserializeOwned,
1529 {
1530 let map = Self::count_many::<C, Map>(query, columns).await?;
1531 serde_json::from_value(map.into()).map_err(Error::from)
1532 }
1533
1534 async fn aggregate<T>(query: &Query) -> Result<Vec<T>, Error>
1536 where
1537 T: DecodeRow<DatabaseRow, Error = Error>,
1538 {
1539 Self::before_aggregate(query).await?;
1540
1541 let table_name = query.format_table_name::<Self>();
1542 let projection = query.format_table_fields::<Self>();
1543 let filters = query.format_filters::<Self>();
1544 let sort = query.format_sort();
1545 let pagination = query.format_pagination();
1546 let sql = format!("SELECT {projection} FROM {table_name} {filters} {sort} {pagination};");
1547 let mut ctx = Self::before_scan(&sql).await?;
1548 ctx.set_query(sql);
1549
1550 let pool = Self::acquire_reader().await?.pool();
1551 let rows = pool.fetch(ctx.query()).await?;
1552 let mut data = Vec::with_capacity(rows.len());
1553 for row in rows {
1554 data.push(T::decode_row(&row)?);
1555 }
1556 ctx.set_query_result(u64::try_from(data.len())?, true);
1557 Self::after_scan(&ctx).await?;
1558 Self::after_aggregate(&ctx).await?;
1559 Ok(data)
1560 }
1561
1562 async fn aggregate_as<T: DeserializeOwned>(query: &Query) -> Result<Vec<T>, Error> {
1565 let data = Self::aggregate::<Map>(query).await?;
1566 serde_json::from_value(data.into()).map_err(Error::from)
1567 }
1568
1569 async fn execute(query: &str, params: Option<&Map>) -> Result<QueryContext, Error> {
1571 let (sql, values) = Query::prepare_query(query, params);
1572 let mut ctx = Self::before_scan(&sql).await?;
1573 ctx.set_query(sql);
1574 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1575 ctx.cancel();
1576 }
1577 if ctx.is_cancelled() {
1578 return Ok(ctx);
1579 }
1580
1581 let mut arguments = values
1582 .iter()
1583 .map(|v| v.to_string_unquoted())
1584 .collect::<Vec<_>>();
1585 let pool = Self::acquire_writer().await?.pool();
1586 let query_result = pool.execute_with(ctx.query(), &arguments).await?;
1587 ctx.append_arguments(&mut arguments);
1588 ctx.set_query_result(query_result.rows_affected(), true);
1589 Self::after_scan(&ctx).await?;
1590 Ok(ctx)
1591 }
1592
1593 async fn query<T>(query: &str, params: Option<&Map>) -> Result<Vec<T>, Error>
1595 where
1596 T: DecodeRow<DatabaseRow, Error = Error>,
1597 {
1598 let (sql, values) = Query::prepare_query(query, params);
1599 let mut ctx = Self::before_scan(&sql).await?;
1600 ctx.set_query(sql);
1601
1602 let mut arguments = values
1603 .iter()
1604 .map(|v| v.to_string_unquoted())
1605 .collect::<Vec<_>>();
1606 let pool = Self::acquire_reader().await?.pool();
1607 let rows = pool.fetch_with(ctx.query(), &arguments).await?;
1608 let mut data = Vec::with_capacity(rows.len());
1609 for row in rows {
1610 data.push(T::decode_row(&row)?);
1611 }
1612 ctx.append_arguments(&mut arguments);
1613 ctx.set_query_result(u64::try_from(data.len())?, true);
1614 Self::after_scan(&ctx).await?;
1615 Ok(data)
1616 }
1617
1618 async fn query_as<T: DeserializeOwned>(
1620 query: &str,
1621 params: Option<&Map>,
1622 ) -> Result<Vec<T>, Error> {
1623 let mut data = Self::query::<Map>(query, params).await?;
1624 for model in data.iter_mut() {
1625 Self::after_decode(model).await?;
1626 }
1627 serde_json::from_value(data.into()).map_err(Error::from)
1628 }
1629
1630 async fn query_one<T>(query: &str, params: Option<&Map>) -> Result<Option<T>, Error>
1632 where
1633 T: DecodeRow<DatabaseRow, Error = Error>,
1634 {
1635 let (sql, values) = Query::prepare_query(query, params);
1636 let mut ctx = Self::before_scan(&sql).await?;
1637 ctx.set_query(sql);
1638
1639 let mut arguments = values
1640 .iter()
1641 .map(|v| v.to_string_unquoted())
1642 .collect::<Vec<_>>();
1643 let pool = Self::acquire_reader().await?.pool();
1644 let optional_row = pool.fetch_optional_with(ctx.query(), &arguments).await?;
1645 let (num_rows, data) = if let Some(row) = optional_row {
1646 (1, Some(T::decode_row(&row)?))
1647 } else {
1648 (0, None)
1649 };
1650 ctx.append_arguments(&mut arguments);
1651 ctx.set_query_result(num_rows, true);
1652 Self::after_scan(&ctx).await?;
1653 Ok(data)
1654 }
1655
1656 async fn query_one_as<T: DeserializeOwned>(
1658 query: &str,
1659 params: Option<&Map>,
1660 ) -> Result<Option<T>, Error> {
1661 match Self::query_one::<Map>(query, params).await? {
1662 Some(mut data) => {
1663 Self::after_decode(&mut data).await?;
1664 serde_json::from_value(data.into()).map_err(Error::from)
1665 }
1666 None => Ok(None),
1667 }
1668 }
1669
1670 async fn prepare_delete_by_id() -> Result<QueryContext, Error> {
1672 let primary_key_name = Self::primary_key_name();
1673 let table_name = Query::escape_table_name(Self::table_name());
1674 let placeholder = Query::placeholder(1);
1675 let sql = if cfg!(feature = "orm-postgres") {
1676 let type_annotation = Self::primary_key_column().type_annotation();
1677 format!(
1678 "DELETE FROM {table_name} \
1679 WHERE {primary_key_name} = ({placeholder}){type_annotation};"
1680 )
1681 } else {
1682 format!("DELETE FROM {table_name} WHERE {primary_key_name} = {placeholder};")
1683 };
1684 let mut ctx = Self::before_scan(&sql).await?;
1685 ctx.set_query(sql);
1686 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1687 ctx.cancel();
1688 }
1689 Ok(ctx)
1690 }
1691
1692 async fn delete_by_id(primary_key: &Self::PrimaryKey) -> Result<QueryContext, Error> {
1694 let mut ctx = Self::prepare_delete_by_id().await?;
1695 if ctx.is_cancelled() {
1696 return Ok(ctx);
1697 }
1698
1699 let pool = Self::acquire_writer().await?.pool();
1700 let query_result = pool.execute_with(ctx.query(), &[primary_key]).await?;
1701 let rows_affected = query_result.rows_affected();
1702 let success = rows_affected == 1;
1703 ctx.add_argument(primary_key);
1704 ctx.set_query_result(rows_affected, success);
1705 Self::after_scan(&ctx).await?;
1706 if success {
1707 Ok(ctx)
1708 } else {
1709 bail!(
1710 "{} rows are affected while it is expected to affect 1 row",
1711 rows_affected
1712 );
1713 }
1714 }
1715
1716 async fn prepare_update_by_id(mutation: &mut Mutation) -> Result<QueryContext, Error> {
1718 let primary_key_name = Self::primary_key_name();
1719 let table_name = Query::escape_table_name(Self::table_name());
1720 let updates = mutation.format_updates::<Self>();
1721 let placeholder = Query::placeholder(1);
1722 let sql = if cfg!(any(
1723 feature = "orm-mariadb",
1724 feature = "orm-mysql",
1725 feature = "orm-tidb"
1726 )) {
1727 format!(
1728 "UPDATE {table_name} SET {updates} \
1729 WHERE {primary_key_name} = {placeholder};"
1730 )
1731 } else if cfg!(feature = "orm-postgres") {
1732 let type_annotation = Self::primary_key_column().type_annotation();
1733 format!(
1734 "UPDATE {table_name} SET {updates} \
1735 WHERE {primary_key_name} = ({placeholder}){type_annotation} RETURNING *;"
1736 )
1737 } else {
1738 format!(
1739 "UPDATE {table_name} SET {updates} \
1740 WHERE {primary_key_name} = {placeholder} RETURNING *;"
1741 )
1742 };
1743 let mut ctx = Self::before_scan(&sql).await?;
1744 ctx.set_query(sql);
1745 if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1746 ctx.cancel();
1747 }
1748 Ok(ctx)
1749 }
1750
1751 async fn update_by_id<T>(
1754 primary_key: &Self::PrimaryKey,
1755 mutation: &mut Mutation,
1756 ) -> Result<Option<T>, Error>
1757 where
1758 T: DecodeRow<DatabaseRow, Error = Error>,
1759 {
1760 let mut ctx = Self::prepare_update_by_id(mutation).await?;
1761 if ctx.is_cancelled() {
1762 return Ok(None);
1763 }
1764
1765 let pool = Self::acquire_writer().await?.pool();
1766 let optional_row = if cfg!(any(
1767 feature = "orm-mariadb",
1768 feature = "orm-mysql",
1769 feature = "orm-tidb"
1770 )) {
1771 use sqlx::Acquire;
1772
1773 let mut transaction = pool.begin().await?;
1774 let connection = transaction.acquire().await?;
1775 let query_result = connection.execute_with(ctx.query(), &[primary_key]).await?;
1776 let optional_row = if query_result.rows_affected() == 1 {
1777 let primary_key_name = Self::primary_key_name();
1778 let table_name = Query::escape_table_name(Self::table_name());
1779 let placeholder = Query::placeholder(1);
1780 let sql =
1781 format!("SELECT * FROM {table_name} WHERE {primary_key_name} = {placeholder};");
1782 connection.fetch_optional_with(&sql, &[primary_key]).await?
1783 } else {
1784 None
1785 };
1786 transaction.commit().await?;
1787 optional_row
1788 } else {
1789 pool.fetch_optional_with(ctx.query(), &[primary_key])
1790 .await?
1791 };
1792 let (num_rows, data) = if let Some(row) = optional_row {
1793 (1, Some(T::decode_row(&row)?))
1794 } else {
1795 (0, None)
1796 };
1797 ctx.add_argument(primary_key);
1798 ctx.set_query_result(num_rows, true);
1799 Self::after_scan(&ctx).await?;
1800 Self::after_query(&ctx).await?;
1801 Ok(data)
1802 }
1803
1804 async fn find_by_id<T>(primary_key: &Self::PrimaryKey) -> Result<Option<T>, Error>
1807 where
1808 T: DecodeRow<DatabaseRow, Error = Error>,
1809 {
1810 let primary_key_name = Self::primary_key_name();
1811 let query = Self::default_query();
1812 let table_name = query.format_table_name::<Self>();
1813 let projection = query.format_projection();
1814 let placeholder = Query::placeholder(1);
1815 let sql = if cfg!(feature = "orm-postgres") {
1816 let type_annotation = Self::primary_key_column().type_annotation();
1817 format!(
1818 "SELECT {projection} FROM {table_name} \
1819 WHERE {primary_key_name} = ({placeholder}){type_annotation};"
1820 )
1821 } else {
1822 format!(
1823 "SELECT {projection} FROM {table_name} WHERE {primary_key_name} = {placeholder};"
1824 )
1825 };
1826 let mut ctx = Self::before_scan(&sql).await?;
1827 ctx.set_query(sql);
1828
1829 let pool = Self::acquire_reader().await?.pool();
1830 let optional_row = pool
1831 .fetch_optional_with(ctx.query(), &[primary_key])
1832 .await?;
1833 let (num_rows, data) = if let Some(row) = optional_row {
1834 (1, Some(T::decode_row(&row)?))
1835 } else {
1836 (0, None)
1837 };
1838 ctx.add_argument(primary_key);
1839 ctx.set_query_result(num_rows, true);
1840 Self::after_scan(&ctx).await?;
1841 Self::after_query(&ctx).await?;
1842 Ok(data)
1843 }
1844
1845 async fn try_get_model(primary_key: &Self::PrimaryKey) -> Result<Self, Error> {
1847 let primary_key_name = Self::primary_key_name();
1848 let query = Self::default_query();
1849 let table_name = query.format_table_name::<Self>();
1850 let projection = query.format_projection();
1851 let placeholder = Query::placeholder(1);
1852 let sql = if cfg!(feature = "orm-postgres") {
1853 let type_annotation = Self::primary_key_column().type_annotation();
1854 format!(
1855 "SELECT {projection} FROM {table_name} \
1856 WHERE {primary_key_name} = ({placeholder}){type_annotation};"
1857 )
1858 } else {
1859 format!(
1860 "SELECT {projection} FROM {table_name} WHERE {primary_key_name} = {placeholder};"
1861 )
1862 };
1863 let mut ctx = Self::before_scan(&sql).await?;
1864 ctx.set_query(sql);
1865 ctx.add_argument(primary_key);
1866
1867 let pool = Self::acquire_reader().await?.pool();
1868 let optional_row = pool
1869 .fetch_optional_with(ctx.query(), &[primary_key])
1870 .await?;
1871 if let Some(row) = optional_row {
1872 ctx.set_query_result(1, true);
1873 Self::after_scan(&ctx).await?;
1874 Self::after_query(&ctx).await?;
1875
1876 let mut map = Map::decode_row(&row)?;
1877 Self::after_decode(&mut map).await?;
1878 Self::try_from_map(map).map_err(|err| {
1879 warn!(
1880 "fail to decode the value as a model `{}`: {}",
1881 Self::MODEL_NAME,
1882 err
1883 )
1884 })
1885 } else {
1886 ctx.set_query_result(0, true);
1887 Self::after_scan(&ctx).await?;
1888 Self::after_query(&ctx).await?;
1889 bail!(
1890 "404 Not Found: no rows for the model `{}` with the key `{}`",
1891 Self::MODEL_NAME,
1892 primary_key
1893 );
1894 }
1895 }
1896
1897 async fn sample(size: usize) -> Result<Vec<JsonValue>, Error> {
1900 if size == 0 {
1901 return Ok(Vec::new());
1902 }
1903
1904 let primary_key_name = Self::primary_key_name();
1905 let mut query = Query::default();
1906 query.allow_fields(&[primary_key_name]);
1907 query.add_filter("$rand", 0.05);
1908 query.set_limit(size);
1909
1910 let mut data = Self::find::<Map>(&query)
1911 .await?
1912 .into_iter()
1913 .filter_map(|mut map| map.remove(primary_key_name))
1914 .collect::<Vec<_>>();
1915 let remainder_size = size - data.len();
1916 if remainder_size > 0 {
1917 let mut query = Query::default();
1918 query.add_filter(primary_key_name, Map::from_entry("$nin", data.clone()));
1919 query.allow_fields(&[primary_key_name]);
1920 query.set_limit(remainder_size);
1921
1922 let remainder_data = Self::find::<Map>(&query).await?;
1923 for mut map in remainder_data {
1924 if let Some(value) = map.remove(primary_key_name) {
1925 data.push(value);
1926 }
1927 }
1928 }
1929 Ok(data)
1930 }
1931
1932 async fn filter<T: IntoSqlValue>(primary_key_values: Vec<T>) -> Result<Vec<JsonValue>, Error> {
1934 let primary_key_name = Self::primary_key_name();
1935 let limit = primary_key_values.len();
1936 let mut query = Query::default();
1937 query.allow_fields(&[primary_key_name]);
1938 query.add_filter(
1939 primary_key_name,
1940 Map::from_entry("$in", primary_key_values.into_sql_value()),
1941 );
1942 query.set_limit(limit);
1943
1944 let data = Self::find::<Map>(&query).await?;
1945 let mut primary_key_values = Vec::with_capacity(data.len());
1946 for map in data.into_iter() {
1947 for (_key, value) in map.into_iter() {
1948 primary_key_values.push(value);
1949 }
1950 }
1951 Ok(primary_key_values)
1952 }
1953
1954 async fn is_unique_on<C, T>(&self, columns: Vec<(C, T)>) -> Result<bool, Error>
1956 where
1957 C: AsRef<str>,
1958 T: IntoSqlValue,
1959 {
1960 let primary_key_name = Self::primary_key_name();
1961 let mut query = Query::default();
1962 let mut fields = Vec::with_capacity(columns.len() + 1);
1963 fields.push(primary_key_name.to_owned());
1964 for (col, value) in columns.into_iter() {
1965 let field = col.as_ref();
1966 fields.push(field.to_owned());
1967 query.add_filter(field, value.into_sql_value());
1968 }
1969 query.set_fields(fields);
1970 query.set_limit(2);
1971
1972 let data = Self::find::<Map>(&query).await?;
1973 match data.len() {
1974 0 => Ok(true),
1975 1 => {
1976 if let Some(value) = data.first().and_then(|m| m.get(primary_key_name)) {
1977 Ok(&self.primary_key_value() == value)
1978 } else {
1979 Ok(true)
1980 }
1981 }
1982 _ => Ok(false),
1983 }
1984 }
1985}