zino_orm/
schema.rs

1use super::{
2    ConnectionPool, DatabaseRow, DecodeRow, EncodeColumn, Entity, Executor, GlobalPool,
3    IntoSqlValue, JoinOn, ModelHelper, PrimaryKey, QueryBuilder, column::ColumnExt,
4    mutation::MutationExt, query::QueryExt,
5};
6use serde::de::DeserializeOwned;
7use std::sync::atomic::Ordering::Relaxed;
8use zino_core::{
9    JsonValue, Map, bail,
10    error::Error,
11    extension::{JsonObjectExt, JsonValueExt},
12    model::{Column, ModelHooks, Mutation, Query, QueryContext},
13    warn,
14};
15
16/// Database schema.
17///
18/// This trait can be derived by `zino_derive::Schema`.
19pub trait Schema: 'static + Send + Sync + ModelHooks {
20    /// Primary key.
21    type PrimaryKey: PrimaryKey;
22
23    /// Primary key name.
24    const PRIMARY_KEY_NAME: &'static str = "id";
25    /// Reader name.
26    const READER_NAME: &'static str = "main";
27    /// Writer name.
28    const WRITER_NAME: &'static str = "main";
29    /// Optional custom table name.
30    const TABLE_NAME: Option<&'static str> = None;
31
32    /// Returns the primary key.
33    fn primary_key(&self) -> &Self::PrimaryKey;
34
35    /// Returns a reference to the Avro schema.
36    fn schema() -> &'static apache_avro::Schema;
37
38    /// Returns a reference to the columns.
39    fn columns() -> &'static [Column<'static>];
40
41    /// Returns a reference to the column fields.
42    fn fields() -> &'static [&'static str];
43
44    /// Returns a reference to the read-only column fields.
45    fn read_only_fields() -> &'static [&'static str];
46
47    /// Returns a reference to the write-only column fields.
48    fn write_only_fields() -> &'static [&'static str];
49
50    /// Retrieves a connection pool for the model reader.
51    async fn acquire_reader() -> Result<&'static ConnectionPool, Error>;
52
53    /// Retrieves a connection pool for the model writer.
54    async fn acquire_writer() -> Result<&'static ConnectionPool, Error>;
55
56    /// Returns the driver name.
57    ///
58    /// Supported drivers: `mariadb` | `mysql` | `postgres` | `sqlite` | `tidb`.
59    #[inline]
60    fn driver_name() -> &'static str {
61        super::DRIVER_NAME
62    }
63
64    /// Returns the prefix for the table name.
65    #[inline]
66    fn table_prefix() -> &'static str {
67        *super::TABLE_PREFIX
68    }
69
70    /// Returns the prefix for the model namespace.
71    #[inline]
72    fn namespace_prefix() -> &'static str {
73        *super::NAMESPACE_PREFIX
74    }
75
76    /// Returns the table name.
77    #[inline]
78    fn table_name() -> &'static str {
79        Self::TABLE_NAME.unwrap_or_else(|| [Self::table_prefix(), Self::MODEL_NAME].concat().leak())
80    }
81
82    /// Returns the model namespace.
83    #[inline]
84    fn model_namespace() -> &'static str {
85        [Self::namespace_prefix(), Self::MODEL_NAME].concat().leak()
86    }
87
88    /// Returns the primary key name.
89    #[inline]
90    fn primary_key_name() -> &'static str {
91        Self::primary_key_column()
92            .extra()
93            .get_str("column_name")
94            .unwrap_or(Self::PRIMARY_KEY_NAME)
95    }
96
97    /// Returns the primary key as a JSON value.
98    #[inline]
99    fn primary_key_value(&self) -> JsonValue {
100        self.primary_key().to_string().into()
101    }
102
103    /// Returns the primary key column.
104    #[inline]
105    fn primary_key_column() -> &'static Column<'static> {
106        Self::get_column(Self::PRIMARY_KEY_NAME).expect("primary key column should always exist")
107    }
108
109    /// Gets a column for the field.
110    #[inline]
111    fn get_column(key: &str) -> Option<&Column<'static>> {
112        let key = if let Some((name, field)) = key.split_once('.') {
113            if Self::model_name() == name || Self::table_name() == name {
114                field
115            } else {
116                return None;
117            }
118        } else {
119            key
120        };
121        Self::columns().iter().find(|col| col.name() == key)
122    }
123
124    /// Gets a column for the field if it is writable.
125    #[inline]
126    fn get_writable_column(key: &str) -> Option<&Column<'static>> {
127        let key = if let Some((name, field)) = key.split_once('.') {
128            if Self::model_name() == name || Self::table_name() == name {
129                field
130            } else {
131                return None;
132            }
133        } else {
134            key
135        };
136        Self::columns()
137            .iter()
138            .find(|col| col.name() == key && !col.is_read_only())
139    }
140
141    /// Returns `true` if the model has a column for the specific field.
142    #[inline]
143    fn has_column(key: &str) -> bool {
144        let key = if let Some((name, field)) = key.split_once('.') {
145            if Self::model_name() == name || Self::table_name() == name {
146                field
147            } else {
148                return false;
149            }
150        } else {
151            key
152        };
153        Self::columns().iter().any(|col| col.name() == key)
154    }
155
156    /// Constructs a default `Query` for the model.
157    #[inline]
158    fn default_query() -> Query {
159        let mut query = Query::default();
160        query.allow_fields(Self::fields());
161        query.deny_fields(Self::write_only_fields());
162        query
163    }
164
165    /// Constructs a default `Mutation` for the model.
166    #[inline]
167    fn default_mutation() -> Mutation {
168        let mut mutation = Mutation::default();
169        mutation.allow_fields(Self::fields());
170        mutation.deny_fields(Self::read_only_fields());
171        mutation
172    }
173
174    /// Initializes the model reader.
175    #[inline]
176    fn init_reader() -> Result<&'static ConnectionPool, Error> {
177        GlobalPool::get(Self::READER_NAME)
178            .ok_or_else(|| warn!("connection to the database is unavailable"))
179    }
180
181    /// Initializes the model writer.
182    #[inline]
183    fn init_writer() -> Result<&'static ConnectionPool, Error> {
184        GlobalPool::get(Self::WRITER_NAME)
185            .ok_or_else(|| warn!("connection to the database is unavailable"))
186    }
187
188    /// Creates a database table for the model.
189    async fn create_table() -> Result<(), Error> {
190        let connection_pool = Self::init_writer()?;
191        if !connection_pool.auto_migration() {
192            return Ok(());
193        }
194        Self::before_create_table().await?;
195
196        let primary_key_name = Self::primary_key_name();
197        let table_name = Self::table_name();
198        let table_name_escaped = Query::escape_table_name(table_name);
199        let columns = Self::columns();
200        let mut definitions = columns
201            .iter()
202            .map(|col| col.field_definition(primary_key_name))
203            .collect::<Vec<_>>();
204        for col in columns {
205            let mut constraints = col.constraints();
206            if !constraints.is_empty() {
207                definitions.append(&mut constraints);
208            }
209        }
210
211        let definitions = definitions.join(",\n  ");
212        let sql = format!("CREATE TABLE IF NOT EXISTS {table_name_escaped} (\n  {definitions}\n);");
213        let pool = connection_pool.pool();
214        if let Err(err) = pool.execute(&sql).await {
215            tracing::error!(table_name, "fail to execute `{sql}`");
216            return Err(err);
217        }
218        Self::after_create_table().await?;
219        Ok(())
220    }
221
222    /// Synchronizes the table schema for the model.
223    async fn synchronize_schema() -> Result<(), Error> {
224        let connection_pool = Self::init_writer()?;
225        if !connection_pool.auto_migration() {
226            return Ok(());
227        }
228
229        let mut table_name = Self::table_name();
230        let table_name_escaped = Query::escape_table_name(table_name);
231        if let Some((_, suffix)) = table_name.rsplit_once('.') {
232            table_name = suffix;
233        }
234
235        let sql = if cfg!(any(
236            feature = "orm-mariadb",
237            feature = "orm-mysql",
238            feature = "orm-tidb"
239        )) {
240            let table_schema = connection_pool.database();
241            format!(
242                "SELECT column_name, data_type, column_default, is_nullable \
243                    FROM information_schema.columns \
244                        WHERE table_schema = '{table_schema}' AND table_name = '{table_name}';"
245            )
246        } else if cfg!(feature = "orm-postgres") {
247            format!(
248                "SELECT column_name, data_type, column_default, is_nullable \
249                    FROM information_schema.columns \
250                        WHERE table_schema = 'public' AND table_name = '{table_name}';"
251            )
252        } else {
253            format!(
254                "SELECT p.name AS column_name, p.type AS data_type, \
255                        p.dflt_value AS column_default, p.[notnull] AS is_not_null \
256                    FROM sqlite_master m LEFT OUTER JOIN pragma_table_info((m.name)) p
257                        ON m.name <> p.name WHERE m.name = '{table_name}';"
258            )
259        };
260        let pool = connection_pool.pool();
261        let rows = pool.fetch(&sql).await?;
262        let mut data = Vec::with_capacity(rows.len());
263        for row in rows {
264            data.push(Map::decode_row(&row)?);
265        }
266
267        let model_name = Self::model_name();
268        let primary_key_name = Self::primary_key_name();
269        for col in Self::columns() {
270            let column_type = col.column_type();
271            let column_name = col
272                .extra()
273                .get_str("column_name")
274                .unwrap_or_else(|| col.name());
275            let column_opt = data.iter().find(|d| {
276                d.get_str("column_name")
277                    .or_else(|| d.get_str("COLUMN_NAME"))
278                    == Some(column_name)
279            });
280            if let Some(d) = column_opt {
281                let data_type = d.get_str("data_type").or_else(|| d.get_str("DATA_TYPE"));
282                let column_default = d
283                    .get_str("column_default")
284                    .or_else(|| d.get_str("COLUMN_DEFAULT"));
285                let is_not_null = if cfg!(any(feature = "orm-mysql", feature = "orm-postgres")) {
286                    d.get_str("is_nullable")
287                        .or_else(|| d.get_str("IS_NULLABLE"))
288                        .unwrap_or("YES")
289                        .eq_ignore_ascii_case("NO")
290                } else {
291                    d.get_str("is_not_null") == Some("1")
292                };
293                if !data_type.is_some_and(|t| col.is_compatible(t)) {
294                    tracing::warn!(
295                        model_name,
296                        table_name,
297                        column_name,
298                        column_type,
299                        data_type,
300                        column_default,
301                        "data type of `{column_name}` should be altered as `{column_type}`",
302                    );
303                } else if col.is_not_null() != is_not_null && column_name != primary_key_name {
304                    tracing::warn!(
305                        model_name,
306                        table_name,
307                        column_name,
308                        column_type,
309                        data_type,
310                        column_default,
311                        is_not_null,
312                        "`NOT NULL` constraint of `{column_name}` should be consistent",
313                    );
314                }
315            } else {
316                let column_definition = col.field_definition(primary_key_name);
317                let sql =
318                    format!("ALTER TABLE {table_name_escaped} ADD COLUMN {column_definition};");
319                pool.execute(&sql).await?;
320                tracing::warn!(
321                    model_name,
322                    table_name,
323                    column_name,
324                    column_type,
325                    "a new column `{column_name}` has been added",
326                );
327            }
328        }
329        Ok(())
330    }
331
332    /// Creates indexes for the model.
333    async fn create_indexes() -> Result<u64, Error> {
334        let connection_pool = Self::init_writer()?;
335        if !connection_pool.auto_migration() {
336            return Ok(0);
337        }
338
339        let mut table_name = Self::table_name();
340        let table_name_escaped = Query::escape_table_name(table_name);
341        if let Some((_, suffix)) = table_name.rsplit_once('.') {
342            table_name = suffix;
343        }
344
345        let pool = connection_pool.pool();
346        let columns = Self::columns();
347        let mut rows = 0;
348        if cfg!(any(
349            feature = "orm-mariadb",
350            feature = "orm-mysql",
351            feature = "orm-tidb"
352        )) {
353            let sql = format!("SHOW INDEXES FROM {table_name_escaped}");
354            if pool.fetch(&sql).await?.len() > 1 {
355                return Ok(0);
356            }
357
358            let mut text_search_columns = Vec::new();
359            for col in columns {
360                if let Some(index_type) = col.index_type() {
361                    let column_name = col.name();
362                    if matches!(index_type, "fulltext" | "text") {
363                        text_search_columns.push(column_name);
364                    } else if matches!(index_type, "unique" | "spatial") {
365                        let index_type = index_type.to_uppercase();
366                        let sql = format!(
367                            "CREATE {index_type} INDEX {table_name}_{column_name}_index \
368                                ON {table_name_escaped} ({column_name});"
369                        );
370                        rows = pool.execute(&sql).await?.rows_affected().max(rows);
371                    } else if matches!(index_type, "btree" | "hash") {
372                        let index_type = index_type.to_uppercase();
373                        let sql = format!(
374                            "CREATE INDEX {table_name}_{column_name}_index \
375                                ON {table_name_escaped} ({column_name}) USING {index_type};"
376                        );
377                        rows = pool.execute(&sql).await?.rows_affected().max(rows);
378                    }
379                }
380            }
381            if !text_search_columns.is_empty() {
382                let text_search_columns = text_search_columns.join(", ");
383                let sql = format!(
384                    "CREATE FULLTEXT INDEX {table_name}_text_search_index \
385                        ON {table_name_escaped} ({text_search_columns});"
386                );
387                rows = pool.execute(&sql).await?.rows_affected().max(rows);
388            }
389        } else if cfg!(feature = "orm-postgres") {
390            let mut text_search_columns = Vec::new();
391            let mut text_search_languages = Vec::new();
392            for col in columns {
393                if let Some(index_type) = col.index_type() {
394                    let column_name = col.name();
395                    if index_type.starts_with("text") {
396                        let language = index_type.strip_prefix("text:").unwrap_or("english");
397                        let column = format!("coalesce({column_name}, '')");
398                        text_search_languages.push(language);
399                        text_search_columns.push((language, column));
400                    } else if index_type == "unique" {
401                        let sql = format!(
402                            "CREATE UNIQUE INDEX IF NOT EXISTS {table_name}_{column_name}_index \
403                                ON {table_name_escaped} ({column_name});"
404                        );
405                        rows = pool.execute(&sql).await?.rows_affected().max(rows);
406                    } else {
407                        let sort_order = if index_type == "btree" { " DESC" } else { "" };
408                        let sql = format!(
409                            "CREATE INDEX IF NOT EXISTS {table_name}_{column_name}_index \
410                                ON {table_name_escaped} \
411                                    USING {index_type}({column_name}{sort_order});"
412                        );
413                        rows = pool.execute(&sql).await?.rows_affected().max(rows);
414                    }
415                }
416            }
417            for language in text_search_languages {
418                let text = text_search_columns
419                    .iter()
420                    .filter_map(|col| (col.0 == language).then_some(col.1.as_str()))
421                    .collect::<Vec<_>>()
422                    .join(" || ' ' || ");
423                let text_search = format!("to_tsvector('{language}', {text})");
424                let sql = format!(
425                    "CREATE INDEX IF NOT EXISTS {table_name}_text_search_{language}_index \
426                        ON {table_name_escaped} USING gin({text_search});"
427                );
428                rows = pool.execute(&sql).await?.rows_affected().max(rows);
429            }
430        } else {
431            for col in columns {
432                if let Some(index_type) = col.index_type() {
433                    let column_name = col.name();
434                    let index_type = if index_type == "unique" { "UNIQUE" } else { "" };
435                    let sql = format!(
436                        "CREATE {index_type} INDEX IF NOT EXISTS {table_name}_{column_name}_index \
437                            ON {table_name_escaped} ({column_name});"
438                    );
439                    rows = pool.execute(&sql).await?.rows_affected().max(rows);
440                }
441            }
442        }
443        Ok(rows)
444    }
445
446    /// Prepares the SQL to insert the model into the table.
447    async fn prepare_insert(self) -> Result<QueryContext, Error> {
448        let table_name = if let Some(table) = self.before_prepare().await? {
449            Query::escape_table_name(&table)
450        } else {
451            Query::escape_table_name(Self::table_name())
452        };
453        let map = self.into_map();
454        let columns = Self::columns();
455
456        let mut fields = Vec::with_capacity(columns.len());
457        let values = columns
458            .iter()
459            .filter_map(|col| {
460                if col.auto_increment() {
461                    None
462                } else {
463                    let name = col.name();
464                    let field = Query::format_field(name);
465                    fields.push(field);
466                    Some(col.encode_value(map.get(name)))
467                }
468            })
469            .collect::<Vec<_>>()
470            .join(", ");
471        let fields = fields.join(", ");
472        let sql = if cfg!(feature = "orm-postgres") {
473            let primary_key_name = Self::primary_key_name();
474            format!(
475                "INSERT INTO {table_name} ({fields}) VALUES ({values}) \
476                    RETURNING {primary_key_name};"
477            )
478        } else {
479            format!("INSERT INTO {table_name} ({fields}) VALUES ({values});")
480        };
481        let mut ctx = Self::before_scan(&sql).await?;
482        ctx.set_query(sql);
483        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
484            ctx.cancel();
485        }
486        Ok(ctx)
487    }
488
489    /// Inserts the model into the table.
490    async fn insert(mut self) -> Result<QueryContext, Error> {
491        let model_data = self.before_insert().await?;
492        let mut ctx = self.prepare_insert().await?;
493        if ctx.is_cancelled() {
494            return Ok(ctx);
495        }
496
497        let pool = Self::acquire_writer().await?.pool();
498        let (last_insert_id, rows_affected) =
499            if cfg!(feature = "orm-postgres") && Self::primary_key_column().auto_increment() {
500                let primary_key = sqlx::query_scalar(ctx.query()).fetch_one(pool).await?;
501                (Some(primary_key), 1)
502            } else {
503                let query_result = pool.execute(ctx.query()).await?;
504                Query::parse_query_result(query_result)
505            };
506        let success = rows_affected == 1;
507        if let Some(last_insert_id) = last_insert_id {
508            ctx.set_last_insert_id(last_insert_id);
509        }
510        ctx.set_query_result(rows_affected, success);
511        Self::after_scan(&ctx).await?;
512        Self::after_insert(&ctx, model_data).await?;
513        if success {
514            Ok(ctx)
515        } else {
516            bail!(
517                "{} rows are affected while it is expected to affect 1 row",
518                rows_affected
519            );
520        }
521    }
522
523    /// Prepares the SQL to insert many models into the table.
524    async fn prepare_insert_many(models: Vec<Self>) -> Result<QueryContext, Error> {
525        if models.is_empty() {
526            bail!("list of models to be inserted should be nonempty");
527        }
528
529        let columns = Self::columns();
530        let mut values = Vec::with_capacity(models.len());
531        for mut model in models.into_iter() {
532            let _model_data = model.before_insert().await?;
533
534            let map = model.into_map();
535            let entries = columns
536                .iter()
537                .map(|col| col.encode_value(map.get(col.name())))
538                .collect::<Vec<_>>()
539                .join(", ");
540            values.push(format!("({entries})"));
541        }
542
543        let table_name = Query::escape_table_name(Self::table_name());
544        let fields = Self::fields()
545            .iter()
546            .map(|&field| Query::format_field(field))
547            .collect::<Vec<_>>()
548            .join(", ");
549        let values = values.join(", ");
550        let sql = format!("INSERT INTO {table_name} ({fields}) VALUES {values};");
551        let mut ctx = Self::before_scan(&sql).await?;
552        ctx.set_query(sql);
553        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
554            ctx.cancel();
555        }
556        Ok(ctx)
557    }
558
559    /// Inserts many models into the table.
560    async fn insert_many(models: Vec<Self>) -> Result<QueryContext, Error> {
561        let mut ctx = Self::prepare_insert_many(models).await?;
562        if ctx.is_cancelled() {
563            return Ok(ctx);
564        }
565
566        let pool = Self::acquire_writer().await?.pool();
567        let query_result = pool.execute(ctx.query()).await?;
568        ctx.set_query_result(query_result.rows_affected(), true);
569        Self::after_scan(&ctx).await?;
570        Ok(ctx)
571    }
572
573    /// Prepares the SQL to insert models selected by a subquery.
574    async fn prepare_insert_from_subquery<C, E>(
575        columns: &[C],
576        subquery: QueryBuilder<E>,
577    ) -> Result<QueryContext, Error>
578    where
579        C: AsRef<str>,
580        E: Entity + Schema,
581    {
582        if columns.is_empty() {
583            bail!("a list of columns should be nonempty");
584        }
585
586        let table_name = Query::escape_table_name(Self::table_name());
587        let fields = columns
588            .iter()
589            .map(|col| col.as_ref())
590            .collect::<Vec<_>>()
591            .join(", ");
592        let subquery = subquery.build_subquery();
593        let sql = format!("INSERT INTO {table_name} ({fields}) {subquery};");
594        let mut ctx = Self::before_scan(&sql).await?;
595        ctx.set_query(sql);
596        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
597            ctx.cancel();
598        }
599        Ok(ctx)
600    }
601
602    /// Inserts the models selected by a subquery.
603    async fn insert_from_subquery<C, E>(
604        columns: &[C],
605        subquery: QueryBuilder<E>,
606    ) -> Result<QueryContext, Error>
607    where
608        C: AsRef<str>,
609        E: Entity + Schema,
610    {
611        let mut ctx = Self::prepare_insert_from_subquery(columns, subquery).await?;
612        if ctx.is_cancelled() {
613            return Ok(ctx);
614        }
615
616        let pool = Self::acquire_writer().await?.pool();
617        let query_result = pool.execute(ctx.query()).await?;
618        ctx.set_query_result(query_result.rows_affected(), true);
619        Self::after_scan(&ctx).await?;
620        Ok(ctx)
621    }
622
623    /// Prepares the SQL to update the model in the table.
624    async fn prepare_update(self) -> Result<QueryContext, Error> {
625        let table_name = if let Some(table) = self.before_prepare().await? {
626            Query::escape_table_name(&table)
627        } else {
628            Query::escape_table_name(Self::table_name())
629        };
630        let primary_key = Query::escape_string(self.primary_key());
631        let primary_key_name = Self::primary_key_name();
632        let map = self.into_map();
633        let read_only_fields = Self::read_only_fields();
634        let num_writable_fields = Self::fields().len() - read_only_fields.len();
635        let mut mutations = Vec::with_capacity(num_writable_fields);
636        for col in Self::columns() {
637            let field = col.name();
638            if !read_only_fields.contains(&field) {
639                let value = col.encode_value(map.get(field));
640                let field = Query::format_field(field);
641                mutations.push(format!("{field} = {value}"));
642            }
643        }
644
645        let mutations = mutations.join(", ");
646        let sql = format!(
647            "UPDATE {table_name} SET {mutations} WHERE {primary_key_name} = {primary_key};"
648        );
649        let mut ctx = Self::before_scan(&sql).await?;
650        ctx.set_query(sql);
651        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
652            ctx.cancel();
653        }
654        Ok(ctx)
655    }
656
657    /// Updates the model in the table.
658    async fn update(mut self) -> Result<QueryContext, Error> {
659        let model_data = self.before_update().await?;
660        let mut ctx = self.prepare_update().await?;
661        if ctx.is_cancelled() {
662            return Ok(ctx);
663        }
664
665        let pool = Self::acquire_writer().await?.pool();
666        let query_result = pool.execute(ctx.query()).await?;
667        let rows_affected = query_result.rows_affected();
668        let success = rows_affected == 1;
669        ctx.set_query_result(rows_affected, success);
670        Self::after_scan(&ctx).await?;
671        Self::after_update(&ctx, model_data).await?;
672        if success {
673            Ok(ctx)
674        } else {
675            bail!(
676                "{} rows are affected while it is expected to affect 1 row",
677                rows_affected
678            );
679        }
680    }
681
682    /// Prepares the SQL to update the model for partial columns in the table.
683    async fn prepare_update_partial<C: AsRef<str>>(
684        self,
685        columns: &[C],
686    ) -> Result<QueryContext, Error> {
687        let table_name = if let Some(table) = self.before_prepare().await? {
688            Query::escape_table_name(&table)
689        } else {
690            Query::escape_table_name(Self::table_name())
691        };
692        let primary_key = Query::escape_string(self.primary_key());
693        let primary_key_name = Self::primary_key_name();
694        let map = self.into_map();
695        let read_only_fields = Self::read_only_fields();
696        let mut mutations = Vec::with_capacity(columns.len());
697        for col in columns {
698            let field = col.as_ref();
699            if !read_only_fields.contains(&field) {
700                if let Some(col) = Self::columns().iter().find(|col| col.name() == field) {
701                    let value = col.encode_value(map.get(field));
702                    let field = Query::format_field(field);
703                    mutations.push(format!("{field} = {value}"));
704                }
705            }
706        }
707
708        let mutations = mutations.join(", ");
709        let sql = format!(
710            "UPDATE {table_name} SET {mutations} WHERE {primary_key_name} = {primary_key};"
711        );
712        let mut ctx = Self::before_scan(&sql).await?;
713        ctx.set_query(sql);
714        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
715            ctx.cancel();
716        }
717        Ok(ctx)
718    }
719
720    /// Updates the model for partial columns in the table.
721    async fn update_partial<C: AsRef<str>>(mut self, columns: &[C]) -> Result<QueryContext, Error> {
722        let model_data = self.before_update().await?;
723        let mut ctx = self.prepare_update_partial(columns).await?;
724        if ctx.is_cancelled() {
725            return Ok(ctx);
726        }
727
728        let pool = Self::acquire_writer().await?.pool();
729        let query_result = pool.execute(ctx.query()).await?;
730        let rows_affected = query_result.rows_affected();
731        let success = rows_affected == 1;
732        ctx.set_query_result(rows_affected, success);
733        Self::after_scan(&ctx).await?;
734        Self::after_update(&ctx, model_data).await?;
735        if success {
736            Ok(ctx)
737        } else {
738            bail!(
739                "{} rows are affected while it is expected to affect 1 row",
740                rows_affected
741            );
742        }
743    }
744
745    /// Prepares the SQL to update at most one model selected by the query in the table.
746    async fn prepare_update_one(
747        query: &Query,
748        mutation: &mut Mutation,
749    ) -> Result<QueryContext, Error> {
750        Self::before_mutation(query, mutation).await?;
751
752        let primary_key_name = Self::primary_key_name();
753        let table_name = query.format_table_name::<Self>();
754        let filters = query.format_filters::<Self>();
755        let updates = mutation.format_updates::<Self>();
756        let sql = if cfg!(any(
757            feature = "orm-mariadb",
758            feature = "orm-mysql",
759            feature = "orm-tidb"
760        )) {
761            // MySQL doesn't yet support 'LIMIT & IN/ALL/ANY/SOME subquery'
762            // and self-referencing in UPDATE/DELETE
763            format!(
764                "UPDATE {table_name} SET {updates} WHERE {primary_key_name} IN \
765                    (SELECT * from (SELECT {primary_key_name} FROM {table_name} {filters}) AS t);"
766            )
767        } else {
768            // Both PostgreQL and SQLite support a `LIMIT` in subquery
769            let sort = query.format_sort();
770            format!(
771                "UPDATE {table_name} SET {updates} WHERE {primary_key_name} IN \
772                    (SELECT {primary_key_name} FROM {table_name} {filters} {sort} LIMIT 1);"
773            )
774        };
775        let mut ctx = Self::before_scan(&sql).await?;
776        ctx.set_query(sql);
777        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
778            ctx.cancel();
779        }
780        Ok(ctx)
781    }
782
783    /// Updates at most one model selected by the query in the table.
784    async fn update_one(query: &Query, mutation: &mut Mutation) -> Result<QueryContext, Error> {
785        let mut ctx = Self::prepare_update_one(query, mutation).await?;
786        if ctx.is_cancelled() {
787            return Ok(ctx);
788        }
789
790        let pool = Self::acquire_writer().await?.pool();
791        let query_result = pool.execute(ctx.query()).await?;
792        let rows_affected = query_result.rows_affected();
793        let success = rows_affected <= 1;
794        ctx.set_query_result(rows_affected, success);
795        Self::after_scan(&ctx).await?;
796        Self::after_mutation(&ctx).await?;
797        if success {
798            Ok(ctx)
799        } else {
800            bail!(
801                "{} rows are affected while it is expected to affect at most 1 row",
802                rows_affected
803            );
804        }
805    }
806
807    /// Prepares the SQL to update many models selected by the query in the table.
808    async fn prepare_update_many(
809        query: &Query,
810        mutation: &mut Mutation,
811    ) -> Result<QueryContext, Error> {
812        Self::before_mutation(query, mutation).await?;
813
814        let table_name = query.format_table_name::<Self>();
815        let filters = query.format_filters::<Self>();
816        let updates = mutation.format_updates::<Self>();
817        let sql = format!("UPDATE {table_name} SET {updates} {filters};");
818        let mut ctx = Self::before_scan(&sql).await?;
819        ctx.set_query(sql);
820        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
821            ctx.cancel();
822        }
823        Ok(ctx)
824    }
825
826    /// Updates many models selected by the query in the table.
827    async fn update_many(query: &Query, mutation: &mut Mutation) -> Result<QueryContext, Error> {
828        let mut ctx = Self::prepare_update_many(query, mutation).await?;
829        if ctx.is_cancelled() {
830            return Ok(ctx);
831        }
832
833        let pool = Self::acquire_writer().await?.pool();
834        let query_result = pool.execute(ctx.query()).await?;
835        ctx.set_query_result(query_result.rows_affected(), true);
836        Self::after_scan(&ctx).await?;
837        Self::after_mutation(&ctx).await?;
838        Ok(ctx)
839    }
840
841    /// Prepares the SQL to update or insert the model into the table.
842    async fn prepare_upsert(self) -> Result<QueryContext, Error> {
843        let table_name = if let Some(table) = self.before_prepare().await? {
844            Query::escape_table_name(&table)
845        } else {
846            Query::escape_table_name(Self::table_name())
847        };
848        let map = self.into_map();
849        let num_fields = Self::fields().len();
850        let read_only_fields = Self::read_only_fields();
851        let num_writable_fields = num_fields - read_only_fields.len();
852        let mut fields = Vec::with_capacity(num_fields);
853        let mut values = Vec::with_capacity(num_fields);
854        let mut mutations = Vec::with_capacity(num_writable_fields);
855        for col in Self::columns() {
856            let name = col.name();
857            let field = Query::format_field(name);
858            let value = col.encode_value(map.get(name));
859            if !read_only_fields.contains(&name) {
860                mutations.push(format!("{field} = {value}"));
861            }
862            fields.push(field);
863            values.push(value);
864        }
865
866        let fields = fields.join(", ");
867        let values = values.join(", ");
868        let mutations = mutations.join(", ");
869        let sql = if cfg!(any(
870            feature = "orm-mariadb",
871            feature = "orm-mysql",
872            feature = "orm-tidb"
873        )) {
874            format!(
875                "INSERT INTO {table_name} ({fields}) VALUES ({values}) \
876                    ON DUPLICATE KEY UPDATE {mutations};"
877            )
878        } else {
879            let primary_key_name = Self::primary_key_name();
880
881            // Both PostgreQL and SQLite (3.35+) support this syntax.
882            format!(
883                "INSERT INTO {table_name} ({fields}) VALUES ({values}) \
884                    ON CONFLICT ({primary_key_name}) DO UPDATE SET {mutations} \
885                        RETURNING {primary_key_name};"
886            )
887        };
888        let mut ctx = Self::before_scan(&sql).await?;
889        ctx.set_query(sql);
890        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
891            ctx.cancel();
892        }
893        Ok(ctx)
894    }
895
896    /// Updates or inserts the model into the table.
897    async fn upsert(mut self) -> Result<QueryContext, Error> {
898        let model_data = self.before_upsert().await?;
899        let mut ctx = self.prepare_upsert().await?;
900        if ctx.is_cancelled() {
901            return Ok(ctx);
902        }
903
904        let pool = Self::acquire_writer().await?.pool();
905        let (last_insert_id, rows_affected) =
906            if cfg!(feature = "orm-postgres") && Self::primary_key_column().auto_increment() {
907                let primary_key = sqlx::query_scalar(ctx.query()).fetch_one(pool).await?;
908                (Some(primary_key), 1)
909            } else {
910                let query_result = pool.execute(ctx.query()).await?;
911                Query::parse_query_result(query_result)
912            };
913        let success = rows_affected == 1;
914        if let Some(last_insert_id) = last_insert_id {
915            ctx.set_last_insert_id(last_insert_id);
916        }
917        ctx.set_query_result(rows_affected, success);
918        Self::after_scan(&ctx).await?;
919        Self::after_upsert(&ctx, model_data).await?;
920        if success {
921            Ok(ctx)
922        } else {
923            bail!(
924                "{} rows are affected while it is expected to affect 1 row",
925                rows_affected
926            );
927        }
928    }
929
930    /// Prepares the SQL to delete the model in the table.
931    async fn prepare_delete(&self) -> Result<QueryContext, Error> {
932        let table_name = if let Some(table) = self.before_prepare().await? {
933            Query::escape_table_name(&table)
934        } else {
935            Query::escape_table_name(Self::table_name())
936        };
937        let primary_key_name = Self::primary_key_name();
938        let placeholder = Query::placeholder(1);
939        let sql = if cfg!(feature = "orm-postgres") {
940            let type_annotation = Self::primary_key_column().type_annotation();
941            format!(
942                "DELETE FROM {table_name} \
943                    WHERE {primary_key_name} = ({placeholder}){type_annotation};"
944            )
945        } else {
946            format!("DELETE FROM {table_name} WHERE {primary_key_name} = {placeholder};")
947        };
948        let mut ctx = Self::before_scan(&sql).await?;
949        ctx.set_query(sql);
950        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
951            ctx.cancel();
952        }
953        Ok(ctx)
954    }
955
956    /// Deletes the model in the table.
957    async fn delete(mut self) -> Result<QueryContext, Error> {
958        let model_data = self.before_delete().await?;
959        let mut ctx = self.prepare_delete().await?;
960        if ctx.is_cancelled() {
961            return Ok(ctx);
962        }
963
964        let pool = Self::acquire_writer().await?.pool();
965        let primary_key = self.primary_key();
966        let query_result = pool.execute_with(ctx.query(), &[primary_key]).await?;
967        let rows_affected = query_result.rows_affected();
968        let success = rows_affected == 1;
969        ctx.add_argument(primary_key);
970        ctx.set_query_result(rows_affected, success);
971        Self::after_scan(&ctx).await?;
972        self.after_delete(&ctx, model_data).await?;
973        if success {
974            Ok(ctx)
975        } else {
976            bail!(
977                "{} rows are affected while it is expected to affect 1 row",
978                rows_affected
979            );
980        }
981    }
982
983    /// Prepares the SQL to delete at most one model selected by the query in the table.
984    async fn prepare_delete_one(query: &Query) -> Result<QueryContext, Error> {
985        Self::before_query(query).await?;
986
987        let primary_key_name = Self::primary_key_name();
988        let table_name = query.format_table_name::<Self>();
989        let filters = query.format_filters::<Self>();
990        let sort = query.format_sort();
991        let sql = format!(
992            "DELETE FROM {table_name} WHERE {primary_key_name} IN \
993                (SELECT {primary_key_name} FROM {table_name} {filters} {sort} LIMIT 1);"
994        );
995        let mut ctx = Self::before_scan(&sql).await?;
996        ctx.set_query(sql);
997        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
998            ctx.cancel();
999        }
1000        Ok(ctx)
1001    }
1002
1003    /// Deletes at most one model selected by the query in the table.
1004    async fn delete_one(query: &Query) -> Result<QueryContext, Error> {
1005        let mut ctx = Self::prepare_delete_one(query).await?;
1006        if ctx.is_cancelled() {
1007            return Ok(ctx);
1008        }
1009
1010        let pool = Self::acquire_writer().await?.pool();
1011        let query_result = pool.execute(ctx.query()).await?;
1012        let rows_affected = query_result.rows_affected();
1013        let success = rows_affected <= 1;
1014        ctx.set_query_result(rows_affected, success);
1015        Self::after_scan(&ctx).await?;
1016        Self::after_query(&ctx).await?;
1017        if success {
1018            Ok(ctx)
1019        } else {
1020            bail!(
1021                "{} rows are affected while it is expected to affect at most 1 row",
1022                rows_affected
1023            );
1024        }
1025    }
1026
1027    /// Prepares the SQL to delete many models selected by the query in the table.
1028    async fn prepare_delete_many(query: &Query) -> Result<QueryContext, Error> {
1029        Self::before_query(query).await?;
1030
1031        let table_name = query.format_table_name::<Self>();
1032        let filters = query.format_filters::<Self>();
1033        let sort = query.format_sort();
1034        let pagination = query.format_pagination();
1035        let sql = if cfg!(feature = "orm-postgres") {
1036            let primary_key_name = Self::primary_key_name();
1037            format!(
1038                "DELETE FROM {table_name} WHERE {primary_key_name} IN \
1039                    (SELECT {primary_key_name} FROM {table_name} {filters} {sort} {pagination});"
1040            )
1041        } else {
1042            format!("DELETE FROM {table_name} {filters} {sort} {pagination};")
1043        };
1044        let mut ctx = Self::before_scan(&sql).await?;
1045        ctx.set_query(sql);
1046        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1047            ctx.cancel();
1048        }
1049        Ok(ctx)
1050    }
1051
1052    /// Deletes many models selected by the query in the table.
1053    async fn delete_many(query: &Query) -> Result<QueryContext, Error> {
1054        let mut ctx = Self::prepare_delete_many(query).await?;
1055        if ctx.is_cancelled() {
1056            return Ok(ctx);
1057        }
1058
1059        let pool = Self::acquire_writer().await?.pool();
1060        let query_result = pool.execute(ctx.query()).await?;
1061        ctx.set_query_result(query_result.rows_affected(), true);
1062        Self::after_scan(&ctx).await?;
1063        Self::after_query(&ctx).await?;
1064        Ok(ctx)
1065    }
1066
1067    /// Prepares the SQL to delete models selected by a subquery.
1068    async fn prepare_delete_by_subquery<C, E>(
1069        columns: &[C],
1070        subquery: QueryBuilder<E>,
1071    ) -> Result<QueryContext, Error>
1072    where
1073        C: AsRef<str>,
1074        E: Entity + Schema,
1075    {
1076        if columns.is_empty() {
1077            bail!("a list of columns should be nonempty");
1078        }
1079
1080        let table_name = Query::escape_table_name(Self::table_name());
1081        let fields = columns
1082            .iter()
1083            .map(|col| col.as_ref())
1084            .collect::<Vec<_>>()
1085            .join(", ");
1086        let subquery = subquery.build_subquery();
1087        let sql = format!("DELETE FROM {table_name} WHERE ({fields}) IN {subquery};");
1088        let mut ctx = Self::before_scan(&sql).await?;
1089        ctx.set_query(sql);
1090        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1091            ctx.cancel();
1092        }
1093        Ok(ctx)
1094    }
1095
1096    /// Deletes the models selected by a subquery.
1097    async fn delete_by_subquery<C, E>(
1098        columns: &[C],
1099        subquery: QueryBuilder<E>,
1100    ) -> Result<QueryContext, Error>
1101    where
1102        C: AsRef<str>,
1103        E: Entity + Schema,
1104    {
1105        let mut ctx = Self::prepare_delete_by_subquery(columns, subquery).await?;
1106        if ctx.is_cancelled() {
1107            return Ok(ctx);
1108        }
1109
1110        let pool = Self::acquire_writer().await?.pool();
1111        let query_result = pool.execute(ctx.query()).await?;
1112        ctx.set_query_result(query_result.rows_affected(), true);
1113        Self::after_scan(&ctx).await?;
1114        Self::after_query(&ctx).await?;
1115        Ok(ctx)
1116    }
1117
1118    /// Finds a list of models selected by the query in the table,
1119    /// and decodes it as `Vec<T>`.
1120    async fn find<T>(query: &Query) -> Result<Vec<T>, Error>
1121    where
1122        T: DecodeRow<DatabaseRow, Error = Error>,
1123    {
1124        Self::before_query(query).await?;
1125
1126        let table_name = query.format_table_name::<Self>();
1127        let projection = query.format_table_fields::<Self>();
1128        let filters = query.format_filters::<Self>();
1129        let sort = query.format_sort();
1130        let pagination = query.format_pagination();
1131        let sql = format!("SELECT {projection} FROM {table_name} {filters} {sort} {pagination};");
1132        let mut ctx = Self::before_scan(&sql).await?;
1133        ctx.set_query(&sql);
1134
1135        let pool = Self::acquire_reader().await?.pool();
1136        let rows = pool.fetch(ctx.query()).await?;
1137        let mut data = Vec::with_capacity(rows.len());
1138        for row in rows {
1139            data.push(T::decode_row(&row)?);
1140        }
1141        ctx.set_query_result(u64::try_from(data.len())?, true);
1142        Self::after_scan(&ctx).await?;
1143        Self::after_query(&ctx).await?;
1144        Ok(data)
1145    }
1146
1147    /// Finds a list of models selected by the query in the table,
1148    /// and parses it as `Vec<T>`.
1149    async fn find_as<T: DeserializeOwned>(query: &Query) -> Result<Vec<T>, Error> {
1150        let mut data = Self::find::<Map>(query).await?;
1151        let translate_enabled = query.translate_enabled();
1152        for model in data.iter_mut() {
1153            translate_enabled.then(|| Self::translate_model(model));
1154            Self::after_decode(model).await?;
1155        }
1156        serde_json::from_value(data.into()).map_err(Error::from)
1157    }
1158
1159    /// Finds one model selected by the query in the table,
1160    /// and decodes it as an instance of type `T`.
1161    async fn find_one<T>(query: &Query) -> Result<Option<T>, Error>
1162    where
1163        T: DecodeRow<DatabaseRow, Error = Error>,
1164    {
1165        Self::before_query(query).await?;
1166
1167        let table_name = query.format_table_name::<Self>();
1168        let projection = query.format_table_fields::<Self>();
1169        let filters = query.format_filters::<Self>();
1170        let sort = query.format_sort();
1171        let sql = format!("SELECT {projection} FROM {table_name} {filters} {sort} LIMIT 1;");
1172        let mut ctx = Self::before_scan(&sql).await?;
1173        ctx.set_query(sql);
1174
1175        let pool = Self::acquire_reader().await?.pool();
1176        let (num_rows, data) = if let Some(row) = pool.fetch_optional(ctx.query()).await? {
1177            (1, Some(T::decode_row(&row)?))
1178        } else {
1179            (0, None)
1180        };
1181        ctx.set_query_result(num_rows, true);
1182        Self::after_scan(&ctx).await?;
1183        Self::after_query(&ctx).await?;
1184        Ok(data)
1185    }
1186
1187    /// Finds one model selected by the query in the table,
1188    /// and parses it as an instance of type `T`.
1189    async fn find_one_as<T: DeserializeOwned>(query: &Query) -> Result<Option<T>, Error> {
1190        match Self::find_one::<Map>(query).await? {
1191            Some(mut data) => {
1192                query
1193                    .translate_enabled()
1194                    .then(|| Self::translate_model(&mut data));
1195                Self::after_decode(&mut data).await?;
1196                serde_json::from_value(data.into()).map_err(Error::from)
1197            }
1198            None => Ok(None),
1199        }
1200    }
1201
1202    /// Populates the related data in the corresponding `columns` for `Vec<Map>` using
1203    /// a merged select on the primary key, which solves the `N+1` problem.
1204    async fn populate<C: AsRef<str>>(
1205        query: &mut Query,
1206        data: &mut Vec<Map>,
1207        columns: &[C],
1208    ) -> Result<u64, Error> {
1209        Self::before_query(query).await?;
1210
1211        let primary_key_name = Self::primary_key_name();
1212        let mut values = Vec::new();
1213        for row in data.iter() {
1214            for col in columns {
1215                if let Some(value) = row.get(col.as_ref()) {
1216                    if let JsonValue::Array(vec) = value {
1217                        for value in vec {
1218                            if !values.contains(value) {
1219                                values.push(value.to_owned());
1220                            }
1221                        }
1222                    } else if !values.contains(value) {
1223                        values.push(value.to_owned());
1224                    }
1225                }
1226            }
1227        }
1228
1229        let num_values = values.len();
1230        if num_values > 0 {
1231            let primary_key_values = Map::from_entry("$in", values);
1232            query.add_filter(primary_key_name, primary_key_values);
1233        } else {
1234            return Ok(0);
1235        }
1236
1237        let table_name = query.format_table_name::<Self>();
1238        let projection = query.format_table_fields::<Self>();
1239        let filters = query.format_filters::<Self>();
1240        let sql = format!("SELECT {projection} FROM {table_name} {filters};");
1241        let mut ctx = Self::before_scan(&sql).await?;
1242        ctx.set_query(&sql);
1243
1244        let pool = Self::acquire_reader().await?.pool();
1245        let rows = pool.fetch(ctx.query()).await?;
1246        let translate_enabled = query.translate_enabled();
1247        let mut associations = Vec::with_capacity(num_values);
1248        for row in rows {
1249            let mut map = Map::decode_row(&row)?;
1250            let primary_key = map.get(primary_key_name).cloned();
1251            Self::after_populate(&mut map).await?;
1252            translate_enabled.then(|| Self::translate_model(&mut map));
1253            Self::after_decode(&mut map).await?;
1254            if let Some(key) = primary_key {
1255                associations.push((key, map));
1256            }
1257        }
1258
1259        let associations_len = u64::try_from(associations.len())?;
1260        ctx.set_query_result(associations_len, true);
1261        Self::after_scan(&ctx).await?;
1262        Self::after_query(&ctx).await?;
1263
1264        for row in data {
1265            for col in columns {
1266                let field = col.as_ref();
1267                if let Some(vec) = row.get_array(field).filter(|vec| !vec.is_empty()) {
1268                    let populated_field = [field, "_populated"].concat();
1269                    let populated_values = vec
1270                        .iter()
1271                        .map(|key| {
1272                            let populated_value = associations
1273                                .iter()
1274                                .find_map(|(k, v)| (key == k).then_some(v));
1275                            if let Some(value) = populated_value {
1276                                value.to_owned().into()
1277                            } else {
1278                                key.to_owned()
1279                            }
1280                        })
1281                        .collect::<Vec<_>>();
1282                    row.upsert(populated_field, populated_values);
1283                } else if let Some(key) = row.get(field) {
1284                    let populated_value = associations
1285                        .iter()
1286                        .find_map(|(k, v)| (key == k).then_some(v));
1287                    if let Some(value) = populated_value {
1288                        let populated_field = [field, "_populated"].concat();
1289                        row.upsert(populated_field, value.to_owned());
1290                    }
1291                }
1292            }
1293        }
1294        Ok(associations_len)
1295    }
1296
1297    /// Populates the related data in the corresponding `columns` for `Map` using
1298    /// a merged select on the primary key, which solves the `N+1` problem.
1299    async fn populate_one<C: AsRef<str>>(
1300        query: &mut Query,
1301        data: &mut Map,
1302        columns: &[C],
1303    ) -> Result<(), Error> {
1304        Self::before_query(query).await?;
1305
1306        let primary_key_name = Self::primary_key_name();
1307        let mut values = Vec::new();
1308        for col in columns {
1309            if let Some(value) = data.get(col.as_ref()) {
1310                if let JsonValue::Array(vec) = value {
1311                    for value in vec {
1312                        if !values.contains(value) {
1313                            values.push(value.to_owned());
1314                        }
1315                    }
1316                } else if !values.contains(value) {
1317                    values.push(value.to_owned());
1318                }
1319            }
1320        }
1321
1322        let num_values = values.len();
1323        if num_values > 0 {
1324            let primary_key_values = Map::from_entry("$in", values);
1325            query.add_filter(primary_key_name, primary_key_values);
1326        } else {
1327            return Ok(());
1328        }
1329
1330        let table_name = query.format_table_name::<Self>();
1331        let projection = query.format_projection();
1332        let filters = query.format_filters::<Self>();
1333        let sql = format!("SELECT {projection} FROM {table_name} {filters};");
1334        let mut ctx = Self::before_scan(&sql).await?;
1335        ctx.set_query(&sql);
1336
1337        let pool = Self::acquire_reader().await?.pool();
1338        let rows = pool.fetch(ctx.query()).await?;
1339        let translate_enabled = query.translate_enabled();
1340        let mut associations = Vec::with_capacity(num_values);
1341        for row in rows {
1342            let mut map = Map::decode_row(&row)?;
1343            let primary_key = map.get(primary_key_name).cloned();
1344            Self::after_populate(&mut map).await?;
1345            translate_enabled.then(|| Self::translate_model(&mut map));
1346            Self::after_decode(&mut map).await?;
1347            if let Some(key) = primary_key {
1348                associations.push((key, map));
1349            }
1350        }
1351        ctx.set_query_result(u64::try_from(associations.len())?, true);
1352        Self::after_scan(&ctx).await?;
1353        Self::after_query(&ctx).await?;
1354
1355        for col in columns {
1356            let field = col.as_ref();
1357            if let Some(vec) = data.get_array(field).filter(|vec| !vec.is_empty()) {
1358                let populated_field = [field, "_populated"].concat();
1359                let populated_values = vec
1360                    .iter()
1361                    .map(|key| {
1362                        let populated_value = associations
1363                            .iter()
1364                            .find_map(|(k, v)| (key == k).then_some(v));
1365                        if let Some(value) = populated_value {
1366                            value.to_owned().into()
1367                        } else {
1368                            key.to_owned()
1369                        }
1370                    })
1371                    .collect::<Vec<_>>();
1372                data.upsert(populated_field, populated_values);
1373            } else if let Some(key) = data.get(field) {
1374                let populated_value = associations
1375                    .iter()
1376                    .find_map(|(k, v)| (key == k).then_some(v));
1377                if let Some(value) = populated_value {
1378                    let populated_field = [field, "_populated"].concat();
1379                    data.upsert(populated_field, value.to_owned());
1380                }
1381            }
1382        }
1383        Ok(())
1384    }
1385
1386    /// Performs the joins on one or more tables to filter rows in the "joined" table,
1387    /// and decodes it as `Vec<T>`.
1388    async fn lookup<T>(query: &Query, joins: &[JoinOn]) -> Result<Vec<T>, Error>
1389    where
1390        T: DecodeRow<DatabaseRow, Error = Error>,
1391    {
1392        Self::before_query(query).await?;
1393
1394        let join_conditions = joins
1395            .iter()
1396            .map(|join_on| {
1397                let join_type = join_on.join_type().as_str();
1398                let join_table = join_on.join_table();
1399                let on_conditions = join_on.format_conditions();
1400                format!("{join_type} {join_table} ON {on_conditions}")
1401            })
1402            .collect::<Vec<_>>()
1403            .join(" ");
1404        let table_name = query.format_table_name::<Self>();
1405        let projection = query.format_table_fields::<Self>();
1406        let filters = query.format_filters::<Self>();
1407        let sort = query.format_sort();
1408        let pagination = query.format_pagination();
1409        let sql = format!(
1410            "SELECT {projection} FROM {table_name} {join_conditions} {filters} {sort} {pagination};"
1411        );
1412        let mut ctx = Self::before_scan(&sql).await?;
1413        ctx.set_query(&sql);
1414
1415        let pool = Self::acquire_reader().await?.pool();
1416        let rows = pool.fetch(ctx.query()).await?;
1417        let mut data = Vec::with_capacity(rows.len());
1418        for row in rows {
1419            data.push(T::decode_row(&row)?);
1420        }
1421        ctx.set_query_result(u64::try_from(data.len())?, true);
1422        Self::after_scan(&ctx).await?;
1423        Self::after_query(&ctx).await?;
1424        Ok(data)
1425    }
1426
1427    /// Performs the joins on one or more tables to filter rows in the "joined" table,
1428    /// and parses it as `Vec<T>`.
1429    async fn lookup_as<T: DeserializeOwned>(
1430        query: &Query,
1431        joins: &[JoinOn],
1432    ) -> Result<Vec<T>, Error> {
1433        let mut data = Self::lookup::<Map>(query, joins).await?;
1434        let translate_enabled = query.translate_enabled();
1435        for model in data.iter_mut() {
1436            translate_enabled.then(|| Self::translate_model(model));
1437            Self::after_decode(model).await?;
1438        }
1439        serde_json::from_value(data.into()).map_err(Error::from)
1440    }
1441
1442    /// Checks whether there is a model selected by the query in the table.
1443    async fn exists(query: &Query) -> Result<bool, Error> {
1444        Self::before_query(query).await?;
1445
1446        let table_name = query.format_table_name::<Self>();
1447        let filters = query.format_filters::<Self>();
1448        let sql = format!("SELECT 1 FROM {table_name} {filters} LIMIT 1;");
1449        let mut ctx = Self::before_scan(&sql).await?;
1450        ctx.set_query(sql);
1451
1452        let pool = Self::acquire_reader().await?.pool();
1453        let optional_row = pool.fetch_optional(ctx.query()).await?;
1454        let num_rows = if optional_row.is_some() { 1 } else { 0 };
1455        ctx.set_query_result(num_rows, true);
1456        Self::after_scan(&ctx).await?;
1457        Self::after_query(&ctx).await?;
1458        Ok(num_rows == 1)
1459    }
1460
1461    /// Counts the number of rows selected by the query in the table.
1462    async fn count(query: &Query) -> Result<u64, Error> {
1463        Self::before_count(query).await?;
1464
1465        let table_name = query.format_table_name::<Self>();
1466        let filters = query.format_filters::<Self>();
1467        let sql = format!("SELECT count(*) AS count FROM {table_name} {filters};");
1468        let mut ctx = Self::before_scan(&sql).await?;
1469        ctx.set_query(sql);
1470
1471        let pool = Self::acquire_reader().await?.pool();
1472        let row = pool.fetch_one(ctx.query()).await?;
1473        let map = Map::decode_row(&row)?;
1474
1475        // SQLite may return a string value for the count value.
1476        let count = map.parse_u64("count").transpose()?.unwrap_or_default();
1477        ctx.set_query_result(count, true);
1478        Self::after_scan(&ctx).await?;
1479        Self::after_count(&ctx).await?;
1480        Ok(count)
1481    }
1482
1483    /// Counts the number of rows selected by the query in the table.
1484    /// The boolean value determines whether it only counts distinct values or not.
1485    async fn count_many<C, T>(query: &Query, columns: &[(C, bool)]) -> Result<T, Error>
1486    where
1487        C: AsRef<str>,
1488        T: DecodeRow<DatabaseRow, Error = Error>,
1489    {
1490        Self::before_count(query).await?;
1491
1492        let table_name = query.format_table_name::<Self>();
1493        let filters = query.format_filters::<Self>();
1494        let projection = columns
1495            .iter()
1496            .map(|(col, distinct)| {
1497                let col_name = col.as_ref();
1498                let field = Query::format_field(col_name);
1499                if col_name != "*" {
1500                    if *distinct {
1501                        format!(r#"count(distinct {field}) AS {col_name}_distinct"#)
1502                    } else {
1503                        format!(r#"count({field}) AS {col_name}_count"#)
1504                    }
1505                } else {
1506                    "count(*)".to_owned()
1507                }
1508            })
1509            .collect::<Vec<_>>()
1510            .join(", ");
1511        let sql = format!("SELECT {projection} FROM {table_name} {filters};");
1512        let mut ctx = Self::before_scan(&sql).await?;
1513        ctx.set_query(sql);
1514
1515        let pool = Self::acquire_reader().await?.pool();
1516        let row = pool.fetch_one(ctx.query()).await?;
1517        ctx.set_query_result(1, true);
1518        Self::after_scan(&ctx).await?;
1519        Self::after_count(&ctx).await?;
1520        T::decode_row(&row)
1521    }
1522
1523    /// Counts the number of rows selected by the query in the table,
1524    /// and parses it as an instance of type `T`.
1525    async fn count_many_as<C, T>(query: &Query, columns: &[(C, bool)]) -> Result<T, Error>
1526    where
1527        C: AsRef<str>,
1528        T: DeserializeOwned,
1529    {
1530        let map = Self::count_many::<C, Map>(query, columns).await?;
1531        serde_json::from_value(map.into()).map_err(Error::from)
1532    }
1533
1534    /// Aggregates the rows selected by the query in the table.
1535    async fn aggregate<T>(query: &Query) -> Result<Vec<T>, Error>
1536    where
1537        T: DecodeRow<DatabaseRow, Error = Error>,
1538    {
1539        Self::before_aggregate(query).await?;
1540
1541        let table_name = query.format_table_name::<Self>();
1542        let projection = query.format_table_fields::<Self>();
1543        let filters = query.format_filters::<Self>();
1544        let sort = query.format_sort();
1545        let pagination = query.format_pagination();
1546        let sql = format!("SELECT {projection} FROM {table_name} {filters} {sort} {pagination};");
1547        let mut ctx = Self::before_scan(&sql).await?;
1548        ctx.set_query(sql);
1549
1550        let pool = Self::acquire_reader().await?.pool();
1551        let rows = pool.fetch(ctx.query()).await?;
1552        let mut data = Vec::with_capacity(rows.len());
1553        for row in rows {
1554            data.push(T::decode_row(&row)?);
1555        }
1556        ctx.set_query_result(u64::try_from(data.len())?, true);
1557        Self::after_scan(&ctx).await?;
1558        Self::after_aggregate(&ctx).await?;
1559        Ok(data)
1560    }
1561
1562    /// Aggregates the rows selected by the query in the table,
1563    /// and parses it as an instance of type `T`.
1564    async fn aggregate_as<T: DeserializeOwned>(query: &Query) -> Result<Vec<T>, Error> {
1565        let data = Self::aggregate::<Map>(query).await?;
1566        serde_json::from_value(data.into()).map_err(Error::from)
1567    }
1568
1569    /// Executes the query in the table, and returns the total number of rows affected.
1570    async fn execute(query: &str, params: Option<&Map>) -> Result<QueryContext, Error> {
1571        let (sql, values) = Query::prepare_query(query, params);
1572        let mut ctx = Self::before_scan(&sql).await?;
1573        ctx.set_query(sql);
1574        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1575            ctx.cancel();
1576        }
1577        if ctx.is_cancelled() {
1578            return Ok(ctx);
1579        }
1580
1581        let mut arguments = values
1582            .iter()
1583            .map(|v| v.to_string_unquoted())
1584            .collect::<Vec<_>>();
1585        let pool = Self::acquire_writer().await?.pool();
1586        let query_result = pool.execute_with(ctx.query(), &arguments).await?;
1587        ctx.append_arguments(&mut arguments);
1588        ctx.set_query_result(query_result.rows_affected(), true);
1589        Self::after_scan(&ctx).await?;
1590        Ok(ctx)
1591    }
1592
1593    /// Executes the query in the table, and decodes it as `Vec<T>`.
1594    async fn query<T>(query: &str, params: Option<&Map>) -> Result<Vec<T>, Error>
1595    where
1596        T: DecodeRow<DatabaseRow, Error = Error>,
1597    {
1598        let (sql, values) = Query::prepare_query(query, params);
1599        let mut ctx = Self::before_scan(&sql).await?;
1600        ctx.set_query(sql);
1601
1602        let mut arguments = values
1603            .iter()
1604            .map(|v| v.to_string_unquoted())
1605            .collect::<Vec<_>>();
1606        let pool = Self::acquire_reader().await?.pool();
1607        let rows = pool.fetch_with(ctx.query(), &arguments).await?;
1608        let mut data = Vec::with_capacity(rows.len());
1609        for row in rows {
1610            data.push(T::decode_row(&row)?);
1611        }
1612        ctx.append_arguments(&mut arguments);
1613        ctx.set_query_result(u64::try_from(data.len())?, true);
1614        Self::after_scan(&ctx).await?;
1615        Ok(data)
1616    }
1617
1618    /// Executes the query in the table, and parses it as `Vec<T>`.
1619    async fn query_as<T: DeserializeOwned>(
1620        query: &str,
1621        params: Option<&Map>,
1622    ) -> Result<Vec<T>, Error> {
1623        let mut data = Self::query::<Map>(query, params).await?;
1624        for model in data.iter_mut() {
1625            Self::after_decode(model).await?;
1626        }
1627        serde_json::from_value(data.into()).map_err(Error::from)
1628    }
1629
1630    /// Executes the query in the table, and decodes it as an instance of type `T`.
1631    async fn query_one<T>(query: &str, params: Option<&Map>) -> Result<Option<T>, Error>
1632    where
1633        T: DecodeRow<DatabaseRow, Error = Error>,
1634    {
1635        let (sql, values) = Query::prepare_query(query, params);
1636        let mut ctx = Self::before_scan(&sql).await?;
1637        ctx.set_query(sql);
1638
1639        let mut arguments = values
1640            .iter()
1641            .map(|v| v.to_string_unquoted())
1642            .collect::<Vec<_>>();
1643        let pool = Self::acquire_reader().await?.pool();
1644        let optional_row = pool.fetch_optional_with(ctx.query(), &arguments).await?;
1645        let (num_rows, data) = if let Some(row) = optional_row {
1646            (1, Some(T::decode_row(&row)?))
1647        } else {
1648            (0, None)
1649        };
1650        ctx.append_arguments(&mut arguments);
1651        ctx.set_query_result(num_rows, true);
1652        Self::after_scan(&ctx).await?;
1653        Ok(data)
1654    }
1655
1656    /// Executes the query in the table, and parses it as an instance of type `T`.
1657    async fn query_one_as<T: DeserializeOwned>(
1658        query: &str,
1659        params: Option<&Map>,
1660    ) -> Result<Option<T>, Error> {
1661        match Self::query_one::<Map>(query, params).await? {
1662            Some(mut data) => {
1663                Self::after_decode(&mut data).await?;
1664                serde_json::from_value(data.into()).map_err(Error::from)
1665            }
1666            None => Ok(None),
1667        }
1668    }
1669
1670    /// Prepares the SQL to delete a model selected by the primary key in the table.
1671    async fn prepare_delete_by_id() -> Result<QueryContext, Error> {
1672        let primary_key_name = Self::primary_key_name();
1673        let table_name = Query::escape_table_name(Self::table_name());
1674        let placeholder = Query::placeholder(1);
1675        let sql = if cfg!(feature = "orm-postgres") {
1676            let type_annotation = Self::primary_key_column().type_annotation();
1677            format!(
1678                "DELETE FROM {table_name} \
1679                    WHERE {primary_key_name} = ({placeholder}){type_annotation};"
1680            )
1681        } else {
1682            format!("DELETE FROM {table_name} WHERE {primary_key_name} = {placeholder};")
1683        };
1684        let mut ctx = Self::before_scan(&sql).await?;
1685        ctx.set_query(sql);
1686        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1687            ctx.cancel();
1688        }
1689        Ok(ctx)
1690    }
1691
1692    /// Deletes a model selected by the primary key in the table.
1693    async fn delete_by_id(primary_key: &Self::PrimaryKey) -> Result<QueryContext, Error> {
1694        let mut ctx = Self::prepare_delete_by_id().await?;
1695        if ctx.is_cancelled() {
1696            return Ok(ctx);
1697        }
1698
1699        let pool = Self::acquire_writer().await?.pool();
1700        let query_result = pool.execute_with(ctx.query(), &[primary_key]).await?;
1701        let rows_affected = query_result.rows_affected();
1702        let success = rows_affected == 1;
1703        ctx.add_argument(primary_key);
1704        ctx.set_query_result(rows_affected, success);
1705        Self::after_scan(&ctx).await?;
1706        if success {
1707            Ok(ctx)
1708        } else {
1709            bail!(
1710                "{} rows are affected while it is expected to affect 1 row",
1711                rows_affected
1712            );
1713        }
1714    }
1715
1716    /// Prepares the SQL to update a model selected by the primary key in the table.
1717    async fn prepare_update_by_id(mutation: &mut Mutation) -> Result<QueryContext, Error> {
1718        let primary_key_name = Self::primary_key_name();
1719        let table_name = Query::escape_table_name(Self::table_name());
1720        let updates = mutation.format_updates::<Self>();
1721        let placeholder = Query::placeholder(1);
1722        let sql = if cfg!(any(
1723            feature = "orm-mariadb",
1724            feature = "orm-mysql",
1725            feature = "orm-tidb"
1726        )) {
1727            format!(
1728                "UPDATE {table_name} SET {updates} \
1729                    WHERE {primary_key_name} = {placeholder};"
1730            )
1731        } else if cfg!(feature = "orm-postgres") {
1732            let type_annotation = Self::primary_key_column().type_annotation();
1733            format!(
1734                "UPDATE {table_name} SET {updates} \
1735                    WHERE {primary_key_name} = ({placeholder}){type_annotation} RETURNING *;"
1736            )
1737        } else {
1738            format!(
1739                "UPDATE {table_name} SET {updates} \
1740                    WHERE {primary_key_name} = {placeholder} RETURNING *;"
1741            )
1742        };
1743        let mut ctx = Self::before_scan(&sql).await?;
1744        ctx.set_query(sql);
1745        if cfg!(debug_assertions) && super::DEBUG_ONLY.load(Relaxed) {
1746            ctx.cancel();
1747        }
1748        Ok(ctx)
1749    }
1750
1751    /// Updates a model selected by the primary key in the table,
1752    /// and decodes it as an instance of type `T`.
1753    async fn update_by_id<T>(
1754        primary_key: &Self::PrimaryKey,
1755        mutation: &mut Mutation,
1756    ) -> Result<Option<T>, Error>
1757    where
1758        T: DecodeRow<DatabaseRow, Error = Error>,
1759    {
1760        let mut ctx = Self::prepare_update_by_id(mutation).await?;
1761        if ctx.is_cancelled() {
1762            return Ok(None);
1763        }
1764
1765        let pool = Self::acquire_writer().await?.pool();
1766        let optional_row = if cfg!(any(
1767            feature = "orm-mariadb",
1768            feature = "orm-mysql",
1769            feature = "orm-tidb"
1770        )) {
1771            use sqlx::Acquire;
1772
1773            let mut transaction = pool.begin().await?;
1774            let connection = transaction.acquire().await?;
1775            let query_result = connection.execute_with(ctx.query(), &[primary_key]).await?;
1776            let optional_row = if query_result.rows_affected() == 1 {
1777                let primary_key_name = Self::primary_key_name();
1778                let table_name = Query::escape_table_name(Self::table_name());
1779                let placeholder = Query::placeholder(1);
1780                let sql =
1781                    format!("SELECT * FROM {table_name} WHERE {primary_key_name} = {placeholder};");
1782                connection.fetch_optional_with(&sql, &[primary_key]).await?
1783            } else {
1784                None
1785            };
1786            transaction.commit().await?;
1787            optional_row
1788        } else {
1789            pool.fetch_optional_with(ctx.query(), &[primary_key])
1790                .await?
1791        };
1792        let (num_rows, data) = if let Some(row) = optional_row {
1793            (1, Some(T::decode_row(&row)?))
1794        } else {
1795            (0, None)
1796        };
1797        ctx.add_argument(primary_key);
1798        ctx.set_query_result(num_rows, true);
1799        Self::after_scan(&ctx).await?;
1800        Self::after_query(&ctx).await?;
1801        Ok(data)
1802    }
1803
1804    /// Finds a model selected by the primary key in the table,
1805    /// and decodes it as an instance of type `T`.
1806    async fn find_by_id<T>(primary_key: &Self::PrimaryKey) -> Result<Option<T>, Error>
1807    where
1808        T: DecodeRow<DatabaseRow, Error = Error>,
1809    {
1810        let primary_key_name = Self::primary_key_name();
1811        let query = Self::default_query();
1812        let table_name = query.format_table_name::<Self>();
1813        let projection = query.format_projection();
1814        let placeholder = Query::placeholder(1);
1815        let sql = if cfg!(feature = "orm-postgres") {
1816            let type_annotation = Self::primary_key_column().type_annotation();
1817            format!(
1818                "SELECT {projection} FROM {table_name} \
1819                    WHERE {primary_key_name} = ({placeholder}){type_annotation};"
1820            )
1821        } else {
1822            format!(
1823                "SELECT {projection} FROM {table_name} WHERE {primary_key_name} = {placeholder};"
1824            )
1825        };
1826        let mut ctx = Self::before_scan(&sql).await?;
1827        ctx.set_query(sql);
1828
1829        let pool = Self::acquire_reader().await?.pool();
1830        let optional_row = pool
1831            .fetch_optional_with(ctx.query(), &[primary_key])
1832            .await?;
1833        let (num_rows, data) = if let Some(row) = optional_row {
1834            (1, Some(T::decode_row(&row)?))
1835        } else {
1836            (0, None)
1837        };
1838        ctx.add_argument(primary_key);
1839        ctx.set_query_result(num_rows, true);
1840        Self::after_scan(&ctx).await?;
1841        Self::after_query(&ctx).await?;
1842        Ok(data)
1843    }
1844
1845    /// Finds a model selected by the primary key in the table, and parses it as `Self`.
1846    async fn try_get_model(primary_key: &Self::PrimaryKey) -> Result<Self, Error> {
1847        let primary_key_name = Self::primary_key_name();
1848        let query = Self::default_query();
1849        let table_name = query.format_table_name::<Self>();
1850        let projection = query.format_projection();
1851        let placeholder = Query::placeholder(1);
1852        let sql = if cfg!(feature = "orm-postgres") {
1853            let type_annotation = Self::primary_key_column().type_annotation();
1854            format!(
1855                "SELECT {projection} FROM {table_name} \
1856                    WHERE {primary_key_name} = ({placeholder}){type_annotation};"
1857            )
1858        } else {
1859            format!(
1860                "SELECT {projection} FROM {table_name} WHERE {primary_key_name} = {placeholder};"
1861            )
1862        };
1863        let mut ctx = Self::before_scan(&sql).await?;
1864        ctx.set_query(sql);
1865        ctx.add_argument(primary_key);
1866
1867        let pool = Self::acquire_reader().await?.pool();
1868        let optional_row = pool
1869            .fetch_optional_with(ctx.query(), &[primary_key])
1870            .await?;
1871        if let Some(row) = optional_row {
1872            ctx.set_query_result(1, true);
1873            Self::after_scan(&ctx).await?;
1874            Self::after_query(&ctx).await?;
1875
1876            let mut map = Map::decode_row(&row)?;
1877            Self::after_decode(&mut map).await?;
1878            Self::try_from_map(map).map_err(|err| {
1879                warn!(
1880                    "fail to decode the value as a model `{}`: {}",
1881                    Self::MODEL_NAME,
1882                    err
1883                )
1884            })
1885        } else {
1886            ctx.set_query_result(0, true);
1887            Self::after_scan(&ctx).await?;
1888            Self::after_query(&ctx).await?;
1889            bail!(
1890                "404 Not Found: no rows for the model `{}` with the key `{}`",
1891                Self::MODEL_NAME,
1892                primary_key
1893            );
1894        }
1895    }
1896
1897    /// Randomly selects the specified number of models from the table
1898    /// and returns a list of the primary key values.
1899    async fn sample(size: usize) -> Result<Vec<JsonValue>, Error> {
1900        if size == 0 {
1901            return Ok(Vec::new());
1902        }
1903
1904        let primary_key_name = Self::primary_key_name();
1905        let mut query = Query::default();
1906        query.allow_fields(&[primary_key_name]);
1907        query.add_filter("$rand", 0.05);
1908        query.set_limit(size);
1909
1910        let mut data = Self::find::<Map>(&query)
1911            .await?
1912            .into_iter()
1913            .filter_map(|mut map| map.remove(primary_key_name))
1914            .collect::<Vec<_>>();
1915        let remainder_size = size - data.len();
1916        if remainder_size > 0 {
1917            let mut query = Query::default();
1918            query.add_filter(primary_key_name, Map::from_entry("$nin", data.clone()));
1919            query.allow_fields(&[primary_key_name]);
1920            query.set_limit(remainder_size);
1921
1922            let remainder_data = Self::find::<Map>(&query).await?;
1923            for mut map in remainder_data {
1924                if let Some(value) = map.remove(primary_key_name) {
1925                    data.push(value);
1926                }
1927            }
1928        }
1929        Ok(data)
1930    }
1931
1932    /// Filters the values of the primary key.
1933    async fn filter<T: IntoSqlValue>(primary_key_values: Vec<T>) -> Result<Vec<JsonValue>, Error> {
1934        let primary_key_name = Self::primary_key_name();
1935        let limit = primary_key_values.len();
1936        let mut query = Query::default();
1937        query.allow_fields(&[primary_key_name]);
1938        query.add_filter(
1939            primary_key_name,
1940            Map::from_entry("$in", primary_key_values.into_sql_value()),
1941        );
1942        query.set_limit(limit);
1943
1944        let data = Self::find::<Map>(&query).await?;
1945        let mut primary_key_values = Vec::with_capacity(data.len());
1946        for map in data.into_iter() {
1947            for (_key, value) in map.into_iter() {
1948                primary_key_values.push(value);
1949            }
1950        }
1951        Ok(primary_key_values)
1952    }
1953
1954    /// Returns `true` if the model is unique on the column values.
1955    async fn is_unique_on<C, T>(&self, columns: Vec<(C, T)>) -> Result<bool, Error>
1956    where
1957        C: AsRef<str>,
1958        T: IntoSqlValue,
1959    {
1960        let primary_key_name = Self::primary_key_name();
1961        let mut query = Query::default();
1962        let mut fields = Vec::with_capacity(columns.len() + 1);
1963        fields.push(primary_key_name.to_owned());
1964        for (col, value) in columns.into_iter() {
1965            let field = col.as_ref();
1966            fields.push(field.to_owned());
1967            query.add_filter(field, value.into_sql_value());
1968        }
1969        query.set_fields(fields);
1970        query.set_limit(2);
1971
1972        let data = Self::find::<Map>(&query).await?;
1973        match data.len() {
1974            0 => Ok(true),
1975            1 => {
1976                if let Some(value) = data.first().and_then(|m| m.get(primary_key_name)) {
1977                    Ok(&self.primary_key_value() == value)
1978                } else {
1979                    Ok(true)
1980                }
1981            }
1982            _ => Ok(false),
1983        }
1984    }
1985}