sea_orm/executor/
insert.rs

1use super::ReturningSelector;
2use crate::{
3    ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, InsertMany,
4    IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, TryFromU64,
5    TryInsert, error::*,
6};
7use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ReturningClause, ValueTuple};
8use std::{future::Future, marker::PhantomData};
9
10type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
11
12/// Defines a structure to perform INSERT operations in an ActiveModel
13#[derive(Debug)]
14pub struct Inserter<A>
15where
16    A: ActiveModelTrait,
17{
18    primary_key: Option<ValueTuple>,
19    query: InsertStatement,
20    model: PhantomData<A>,
21}
22
23/// The result of an INSERT operation on an ActiveModel
24#[derive(Debug)]
25#[non_exhaustive]
26pub struct InsertResult<A>
27where
28    A: ActiveModelTrait,
29{
30    /// The primary key value of the last inserted row
31    pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
32}
33
34/// The result of an INSERT many operation for a set of ActiveModels
35#[derive(Debug)]
36#[non_exhaustive]
37pub struct InsertManyResult<A>
38where
39    A: ActiveModelTrait,
40{
41    /// The primary key value of the last inserted row
42    pub last_insert_id: Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>,
43}
44
45/// The types of results for an INSERT operation
46#[derive(Debug)]
47pub enum TryInsertResult<T> {
48    /// The INSERT statement did not have any value to insert
49    Empty,
50    /// The INSERT operation did not insert any valid value
51    Conflicted,
52    /// Successfully inserted
53    Inserted(T),
54}
55
56impl<A> TryInsertResult<InsertResult<A>>
57where
58    A: ActiveModelTrait,
59{
60    /// Empty: `Ok(None)`. Inserted: `Ok(Some(last_insert_id))`. Conflicted: `Err(DbErr::RecordNotInserted)`.
61    pub fn last_insert_id(
62        self,
63    ) -> Result<Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr> {
64        match self {
65            Self::Empty => Ok(None),
66            Self::Inserted(v) => Ok(Some(v.last_insert_id)),
67            Self::Conflicted => Err(DbErr::RecordNotInserted),
68        }
69    }
70}
71
72impl<A> TryInsert<A>
73where
74    A: ActiveModelTrait,
75{
76    /// Execute an insert operation
77    pub async fn exec<C>(self, db: &C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
78    where
79        C: ConnectionTrait,
80    {
81        if self.empty {
82            return Ok(TryInsertResult::Empty);
83        }
84        let res = self.insert_struct.exec(db).await;
85        match res {
86            Ok(res) => Ok(TryInsertResult::Inserted(res)),
87            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
88            Err(err) => Err(err),
89        }
90    }
91
92    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
93    /// Number of rows affected is returned
94    pub async fn exec_without_returning<C>(self, db: &C) -> Result<TryInsertResult<u64>, DbErr>
95    where
96        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
97        C: ConnectionTrait,
98    {
99        if self.empty {
100            return Ok(TryInsertResult::Empty);
101        }
102        let res = self.insert_struct.exec_without_returning(db).await;
103        match res {
104            Ok(res) => Ok(TryInsertResult::Inserted(res)),
105            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
106            Err(err) => Err(err),
107        }
108    }
109
110    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
111    pub async fn exec_with_returning<C>(
112        self,
113        db: &C,
114    ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
115    where
116        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
117        C: ConnectionTrait,
118    {
119        if self.empty {
120            return Ok(TryInsertResult::Empty);
121        }
122        let res = self.insert_struct.exec_with_returning(db).await;
123        match res {
124            Ok(res) => Ok(TryInsertResult::Inserted(res)),
125            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
126            Err(err) => Err(err),
127        }
128    }
129
130    /// Execute an insert operation and return primary keys of inserted models
131    pub async fn exec_with_returning_keys<C>(
132        self,
133        db: &C,
134    ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
135    where
136        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
137        C: ConnectionTrait,
138    {
139        if self.empty {
140            return Ok(TryInsertResult::Empty);
141        }
142
143        let res = self.insert_struct.exec_with_returning_keys(db).await;
144        match res {
145            Ok(res) => Ok(TryInsertResult::Inserted(res)),
146            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
147            Err(err) => Err(err),
148        }
149    }
150
151    /// Execute an insert operation and return all inserted models
152    pub async fn exec_with_returning_many<C>(
153        self,
154        db: &C,
155    ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
156    where
157        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
158        C: ConnectionTrait,
159    {
160        if self.empty {
161            return Ok(TryInsertResult::Empty);
162        }
163
164        let res = self.insert_struct.exec_with_returning_many(db).await;
165        match res {
166            Ok(res) => Ok(TryInsertResult::Inserted(res)),
167            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
168            Err(err) => Err(err),
169        }
170    }
171}
172
173impl<A> Insert<A>
174where
175    A: ActiveModelTrait,
176{
177    /// Execute an insert operation
178    pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
179    where
180        C: ConnectionTrait,
181        A: 'a,
182    {
183        // so that self is dropped before entering await
184        let mut query = self.query;
185        if db.support_returning() {
186            query.returning(returning_pk::<A>(db.get_database_backend()));
187        }
188        Inserter::<A>::new(self.primary_key, query).exec(db)
189    }
190
191    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
192    /// Number of rows affected is returned
193    pub fn exec_without_returning<'a, C>(
194        self,
195        db: &'a C,
196    ) -> impl Future<Output = Result<u64, DbErr>> + 'a
197    where
198        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
199        C: ConnectionTrait,
200        A: 'a,
201    {
202        Inserter::<A>::new(self.primary_key, self.query).exec_without_returning(db)
203    }
204
205    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
206    ///
207    /// + To get back all inserted models, use [`exec_with_returning_many`].
208    /// + To get back all inserted primary keys, use [`exec_with_returning_keys`].
209    pub fn exec_with_returning<'a, C>(
210        self,
211        db: &'a C,
212    ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
213    where
214        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
215        C: ConnectionTrait,
216        A: 'a,
217    {
218        Inserter::<A>::new(self.primary_key, self.query).exec_with_returning(db)
219    }
220
221    /// Execute an insert operation and return primary keys of inserted models
222    pub fn exec_with_returning_keys<'a, C>(
223        self,
224        db: &'a C,
225    ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
226    where
227        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
228        C: ConnectionTrait,
229        A: 'a,
230    {
231        Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_keys(db)
232    }
233
234    /// Execute an insert operation and return all inserted models
235    pub fn exec_with_returning_many<'a, C>(
236        self,
237        db: &'a C,
238    ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
239    where
240        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
241        C: ConnectionTrait,
242        A: 'a,
243    {
244        Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_many(db)
245    }
246}
247
248impl<A> InsertMany<A>
249where
250    A: ActiveModelTrait,
251{
252    /// Execute an insert operation
253    pub async fn exec<C>(self, db: &C) -> Result<InsertManyResult<A>, DbErr>
254    where
255        C: ConnectionTrait,
256    {
257        if self.empty {
258            return Ok(InsertManyResult {
259                last_insert_id: None,
260            });
261        }
262        let res = self.into_one().exec(db).await;
263        match res {
264            Ok(r) => Ok(InsertManyResult {
265                last_insert_id: Some(r.last_insert_id),
266            }),
267            Err(err) => Err(err),
268        }
269    }
270
271    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
272    /// Number of rows affected is returned
273    pub async fn exec_without_returning<C>(self, db: &C) -> Result<u64, DbErr>
274    where
275        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
276        C: ConnectionTrait,
277    {
278        if self.empty {
279            return Ok(0);
280        }
281        self.into_one().exec_without_returning(db).await
282    }
283
284    /// Execute an insert operation and return all inserted models
285    pub async fn exec_with_returning<C>(
286        self,
287        db: &C,
288    ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
289    where
290        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
291        C: ConnectionTrait,
292    {
293        if self.empty {
294            return Ok(Vec::new());
295        }
296
297        self.into_one().exec_with_returning_many(db).await
298    }
299
300    /// Alias to [`InsertMany::exec_with_returning`].
301    #[deprecated(
302        since = "1.2.0",
303        note = "Please use [`InsertMany::exec_with_returning`]"
304    )]
305    pub async fn exec_with_returning_many<C>(
306        self,
307        db: &C,
308    ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
309    where
310        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
311        C: ConnectionTrait,
312    {
313        if self.empty {
314            return Ok(Vec::new());
315        }
316
317        self.into_one().exec_with_returning_many(db).await
318    }
319
320    /// Execute an insert operation and return primary keys of inserted models
321    pub async fn exec_with_returning_keys<C>(
322        self,
323        db: &C,
324    ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
325    where
326        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
327        C: ConnectionTrait,
328    {
329        if self.empty {
330            return Ok(Vec::new());
331        }
332
333        self.into_one().exec_with_returning_keys(db).await
334    }
335}
336
337impl<A> Inserter<A>
338where
339    A: ActiveModelTrait,
340{
341    /// Instantiate a new insert operation
342    pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
343        Self {
344            primary_key,
345            query,
346            model: PhantomData,
347        }
348    }
349
350    /// Execute an insert operation, returning the last inserted id
351    pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
352    where
353        C: ConnectionTrait,
354        A: 'a,
355    {
356        exec_insert(self.primary_key, self.query, db)
357    }
358
359    /// Execute an insert operation
360    pub fn exec_without_returning<'a, C>(
361        self,
362        db: &'a C,
363    ) -> impl Future<Output = Result<u64, DbErr>> + 'a
364    where
365        C: ConnectionTrait,
366        A: 'a,
367    {
368        exec_insert_without_returning(self.query, db)
369    }
370
371    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
372    pub fn exec_with_returning<'a, C>(
373        self,
374        db: &'a C,
375    ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
376    where
377        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
378        C: ConnectionTrait,
379        A: 'a,
380    {
381        exec_insert_with_returning::<A, _>(self.primary_key, self.query, db)
382    }
383
384    /// Execute an insert operation and return primary keys of inserted models
385    pub fn exec_with_returning_keys<'a, C>(
386        self,
387        db: &'a C,
388    ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
389    where
390        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
391        C: ConnectionTrait,
392        A: 'a,
393    {
394        exec_insert_with_returning_keys::<A, _>(self.query, db)
395    }
396
397    /// Execute an insert operation and return all inserted models
398    pub fn exec_with_returning_many<'a, C>(
399        self,
400        db: &'a C,
401    ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
402    where
403        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
404        C: ConnectionTrait,
405        A: 'a,
406    {
407        exec_insert_with_returning_many::<A, _>(self.query, db)
408    }
409}
410
411async fn exec_insert<A, C>(
412    primary_key: Option<ValueTuple>,
413    statement: InsertStatement,
414    db: &C,
415) -> Result<InsertResult<A>, DbErr>
416where
417    C: ConnectionTrait,
418    A: ActiveModelTrait,
419{
420    type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
421
422    let db_backend = db.get_database_backend();
423
424    let last_insert_id = match (primary_key, db.support_returning()) {
425        (Some(value_tuple), _) => {
426            let res = db.execute(&statement).await?;
427            if res.rows_affected() == 0 {
428                return Err(DbErr::RecordNotInserted);
429            }
430            FromValueTuple::from_value_tuple(value_tuple)
431        }
432        (None, true) => {
433            let mut rows = db.query_all(&statement).await?;
434            let row = match rows.pop() {
435                Some(row) => row,
436                None => return Err(DbErr::RecordNotInserted),
437            };
438            let cols = PrimaryKey::<A>::iter()
439                .map(|col| col.to_string())
440                .collect::<Vec<_>>();
441            row.try_get_many("", cols.as_ref())
442                .map_err(|_| DbErr::UnpackInsertId)?
443        }
444        (None, false) => {
445            let res = db.execute(&statement).await?;
446            if res.rows_affected() == 0 {
447                return Err(DbErr::RecordNotInserted);
448            }
449            let last_insert_id = res.last_insert_id();
450            // For MySQL, the affected-rows number:
451            //   - The affected-rows value per row is `1` if the row is inserted as a new row,
452            //   - `2` if an existing row is updated,
453            //   - and `0` if an existing row is set to its current values.
454            // Reference: https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html
455            if db_backend == DbBackend::MySql && last_insert_id == 0 {
456                return Err(DbErr::RecordNotInserted);
457            }
458            ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
459        }
460    };
461
462    Ok(InsertResult { last_insert_id })
463}
464
465async fn exec_insert_without_returning<C>(
466    insert_statement: InsertStatement,
467    db: &C,
468) -> Result<u64, DbErr>
469where
470    C: ConnectionTrait,
471{
472    let exec_result = db.execute(&insert_statement).await?;
473    Ok(exec_result.rows_affected())
474}
475
476async fn exec_insert_with_returning<A, C>(
477    primary_key: Option<ValueTuple>,
478    mut insert_statement: InsertStatement,
479    db: &C,
480) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
481where
482    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
483    C: ConnectionTrait,
484    A: ActiveModelTrait,
485{
486    let db_backend = db.get_database_backend();
487    let found = match db.support_returning() {
488        true => {
489            let returning = Query::returning().exprs(
490                <A::Entity as EntityTrait>::Column::iter()
491                    .map(|c| c.select_as(c.into_returning_expr(db_backend))),
492            );
493            insert_statement.returning(returning);
494            ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
495                insert_statement,
496            )
497            .one(db)
498            .await?
499        }
500        false => {
501            let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
502            <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
503                .one(db)
504                .await?
505        }
506    };
507    match found {
508        Some(model) => Ok(model),
509        None => Err(DbErr::RecordNotFound(
510            "Failed to find inserted item".to_owned(),
511        )),
512    }
513}
514
515async fn exec_insert_with_returning_keys<A, C>(
516    mut insert_statement: InsertStatement,
517    db: &C,
518) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
519where
520    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
521    C: ConnectionTrait,
522    A: ActiveModelTrait,
523{
524    let db_backend = db.get_database_backend();
525    match db.support_returning() {
526        true => {
527            insert_statement.returning(returning_pk::<A>(db_backend));
528            let rows = db.query_all(&insert_statement).await?;
529            let cols = PrimaryKey::<A>::iter()
530                .map(|col| col.to_string())
531                .collect::<Vec<_>>();
532            let mut keys = Vec::new();
533            for row in rows {
534                keys.push(
535                    row.try_get_many("", cols.as_ref())
536                        .map_err(|_| DbErr::UnpackInsertId)?,
537                );
538            }
539            Ok(keys)
540        }
541        false => Err(DbErr::BackendNotSupported {
542            db: db_backend.as_str(),
543            ctx: "INSERT RETURNING",
544        }),
545    }
546}
547
548async fn exec_insert_with_returning_many<A, C>(
549    mut insert_statement: InsertStatement,
550    db: &C,
551) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
552where
553    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
554    C: ConnectionTrait,
555    A: ActiveModelTrait,
556{
557    let db_backend = db.get_database_backend();
558    match db.support_returning() {
559        true => {
560            let returning = Query::returning().exprs(
561                <A::Entity as EntityTrait>::Column::iter()
562                    .map(|c| c.select_as(c.into_returning_expr(db_backend))),
563            );
564            insert_statement.returning(returning);
565            ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
566                insert_statement,
567            )
568            .all(db)
569            .await
570        }
571        false => Err(DbErr::BackendNotSupported {
572            db: db_backend.as_str(),
573            ctx: "INSERT RETURNING",
574        }),
575    }
576}
577
578fn returning_pk<A>(db_backend: DbBackend) -> ReturningClause
579where
580    A: ActiveModelTrait,
581{
582    Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
583        c.into_column()
584            .select_as(c.into_column().into_returning_expr(db_backend))
585    }))
586}