sea_orm/executor/
insert.rs

1use super::ReturningSelector;
2use crate::{
3    ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, InsertMany,
4    IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, TryFromU64,
5    TryInsert, error::*,
6};
7use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ReturningClause, ValueTuple};
8use std::marker::PhantomData;
9
10type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
11
12/// Defines a structure to perform INSERT operations in an ActiveModel
13#[derive(Debug)]
14pub struct Inserter<A>
15where
16    A: ActiveModelTrait,
17{
18    primary_key: Option<ValueTuple>,
19    query: InsertStatement,
20    model: PhantomData<A>,
21}
22
23/// The result of an INSERT operation on an ActiveModel
24#[derive(Debug)]
25#[non_exhaustive]
26pub struct InsertResult<A>
27where
28    A: ActiveModelTrait,
29{
30    /// The primary key value of the last inserted row
31    pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
32}
33
34/// The result of an INSERT many operation for a set of ActiveModels
35#[derive(Debug)]
36#[non_exhaustive]
37pub struct InsertManyResult<A>
38where
39    A: ActiveModelTrait,
40{
41    /// The primary key value of the last inserted row
42    pub last_insert_id: Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>,
43}
44
45/// The types of results for an INSERT operation
46#[derive(Debug)]
47pub enum TryInsertResult<T> {
48    /// The INSERT statement did not have any value to insert
49    Empty,
50    /// The INSERT operation did not insert any valid value
51    Conflicted,
52    /// Successfully inserted
53    Inserted(T),
54}
55
56impl<A> TryInsertResult<InsertResult<A>>
57where
58    A: ActiveModelTrait,
59{
60    /// Empty: `Ok(None)`. Inserted: `Ok(Some(last_insert_id))`. Conflicted: `Err(DbErr::RecordNotInserted)`.
61    pub fn last_insert_id(
62        self,
63    ) -> Result<Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr> {
64        match self {
65            Self::Empty => Ok(None),
66            Self::Inserted(v) => Ok(Some(v.last_insert_id)),
67            Self::Conflicted => Err(DbErr::RecordNotInserted),
68        }
69    }
70}
71
72impl<A> TryInsert<A>
73where
74    A: ActiveModelTrait,
75{
76    /// Execute an insert operation
77    pub async fn exec<C>(self, db: &C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
78    where
79        C: ConnectionTrait,
80    {
81        if self.empty {
82            return Ok(TryInsertResult::Empty);
83        }
84        let res = self.insert_struct.exec(db).await;
85        match res {
86            Ok(res) => Ok(TryInsertResult::Inserted(res)),
87            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
88            Err(err) => Err(err),
89        }
90    }
91
92    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
93    /// Number of rows affected is returned
94    pub async fn exec_without_returning<C>(self, db: &C) -> Result<TryInsertResult<u64>, DbErr>
95    where
96        C: ConnectionTrait,
97    {
98        if self.empty {
99            return Ok(TryInsertResult::Empty);
100        }
101        let res = self.insert_struct.exec_without_returning(db).await;
102        match res {
103            Ok(res) => Ok(TryInsertResult::Inserted(res)),
104            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
105            Err(err) => Err(err),
106        }
107    }
108
109    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
110    pub async fn exec_with_returning<C>(
111        self,
112        db: &C,
113    ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
114    where
115        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
116        C: ConnectionTrait,
117    {
118        if self.empty {
119            return Ok(TryInsertResult::Empty);
120        }
121        let res = self.insert_struct.exec_with_returning(db).await;
122        match res {
123            Ok(res) => Ok(TryInsertResult::Inserted(res)),
124            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
125            Err(err) => Err(err),
126        }
127    }
128
129    /// Execute an insert operation and return primary keys of inserted models
130    pub async fn exec_with_returning_keys<C>(
131        self,
132        db: &C,
133    ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
134    where
135        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
136        C: ConnectionTrait,
137    {
138        if self.empty {
139            return Ok(TryInsertResult::Empty);
140        }
141
142        let res = self.insert_struct.exec_with_returning_keys(db).await;
143        match res {
144            Ok(res) => Ok(TryInsertResult::Inserted(res)),
145            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
146            Err(err) => Err(err),
147        }
148    }
149
150    /// Execute an insert operation and return all inserted models
151    pub async fn exec_with_returning_many<C>(
152        self,
153        db: &C,
154    ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
155    where
156        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
157        C: ConnectionTrait,
158    {
159        if self.empty {
160            return Ok(TryInsertResult::Empty);
161        }
162
163        let res = self.insert_struct.exec_with_returning_many(db).await;
164        match res {
165            Ok(res) => Ok(TryInsertResult::Inserted(res)),
166            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
167            Err(err) => Err(err),
168        }
169    }
170}
171
172impl<A> Insert<A>
173where
174    A: ActiveModelTrait,
175{
176    /// Execute an insert operation
177    pub async fn exec<'a, C>(self, db: &'a C) -> Result<InsertResult<A>, DbErr>
178    where
179        C: ConnectionTrait,
180        A: 'a,
181    {
182        // so that self is dropped before entering await
183        let mut query = self.query;
184        if db.support_returning() {
185            query.returning(returning_pk::<A>(db.get_database_backend()));
186        }
187        Inserter::<A>::new(self.primary_key, query).exec(db).await
188    }
189
190    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
191    /// Number of rows affected is returned
192    pub async fn exec_without_returning<'a, C>(self, db: &'a C) -> Result<u64, DbErr>
193    where
194        C: ConnectionTrait,
195        A: 'a,
196    {
197        Inserter::<A>::new(self.primary_key, self.query)
198            .exec_without_returning(db)
199            .await
200    }
201
202    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
203    ///
204    /// + To get back all inserted models, use [`exec_with_returning_many`].
205    /// + To get back all inserted primary keys, use [`exec_with_returning_keys`].
206    pub async fn exec_with_returning<'a, C>(
207        self,
208        db: &'a C,
209    ) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
210    where
211        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
212        C: ConnectionTrait,
213        A: 'a,
214    {
215        Inserter::<A>::new(self.primary_key, self.query)
216            .exec_with_returning(db)
217            .await
218    }
219
220    /// Execute an insert operation and return primary keys of inserted models
221    pub async fn exec_with_returning_keys<'a, C>(
222        self,
223        db: &'a C,
224    ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
225    where
226        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
227        C: ConnectionTrait,
228        A: 'a,
229    {
230        Inserter::<A>::new(self.primary_key, self.query)
231            .exec_with_returning_keys(db)
232            .await
233    }
234
235    /// Execute an insert operation and return all inserted models
236    pub async fn exec_with_returning_many<'a, C>(
237        self,
238        db: &'a C,
239    ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
240    where
241        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
242        C: ConnectionTrait,
243        A: 'a,
244    {
245        Inserter::<A>::new(self.primary_key, self.query)
246            .exec_with_returning_many(db)
247            .await
248    }
249}
250
251impl<A> InsertMany<A>
252where
253    A: ActiveModelTrait,
254{
255    /// Execute an insert operation
256    pub async fn exec<C>(self, db: &C) -> Result<InsertManyResult<A>, DbErr>
257    where
258        C: ConnectionTrait,
259    {
260        if self.empty {
261            return Ok(InsertManyResult {
262                last_insert_id: None,
263            });
264        }
265        let res = self.into_one().exec(db).await;
266        match res {
267            Ok(r) => Ok(InsertManyResult {
268                last_insert_id: Some(r.last_insert_id),
269            }),
270            Err(err) => Err(err),
271        }
272    }
273
274    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
275    /// Number of rows affected is returned
276    pub async fn exec_without_returning<C>(self, db: &C) -> Result<u64, DbErr>
277    where
278        C: ConnectionTrait,
279    {
280        if self.empty {
281            return Ok(0);
282        }
283        self.into_one().exec_without_returning(db).await
284    }
285
286    /// Execute an insert operation and return all inserted models
287    pub async fn exec_with_returning<C>(
288        self,
289        db: &C,
290    ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
291    where
292        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
293        C: ConnectionTrait,
294    {
295        if self.empty {
296            return Ok(Vec::new());
297        }
298
299        self.into_one().exec_with_returning_many(db).await
300    }
301
302    /// Alias to [`InsertMany::exec_with_returning`].
303    #[deprecated(
304        since = "1.2.0",
305        note = "Please use [`InsertMany::exec_with_returning`]"
306    )]
307    pub async fn exec_with_returning_many<C>(
308        self,
309        db: &C,
310    ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
311    where
312        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
313        C: ConnectionTrait,
314    {
315        if self.empty {
316            return Ok(Vec::new());
317        }
318
319        self.into_one().exec_with_returning_many(db).await
320    }
321
322    /// Execute an insert operation and return primary keys of inserted models
323    pub async fn exec_with_returning_keys<C>(
324        self,
325        db: &C,
326    ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
327    where
328        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
329        C: ConnectionTrait,
330    {
331        if self.empty {
332            return Ok(Vec::new());
333        }
334
335        self.into_one().exec_with_returning_keys(db).await
336    }
337}
338
339impl<A> Inserter<A>
340where
341    A: ActiveModelTrait,
342{
343    /// Instantiate a new insert operation
344    pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
345        Self {
346            primary_key,
347            query,
348            model: PhantomData,
349        }
350    }
351
352    /// Execute an insert operation, returning the last inserted id
353    pub async fn exec<'a, C>(self, db: &'a C) -> Result<InsertResult<A>, DbErr>
354    where
355        C: ConnectionTrait,
356        A: 'a,
357    {
358        exec_insert(self.primary_key, self.query, db).await
359    }
360
361    /// Execute an insert operation
362    pub async fn exec_without_returning<'a, C>(self, db: &'a C) -> Result<u64, DbErr>
363    where
364        C: ConnectionTrait,
365        A: 'a,
366    {
367        exec_insert_without_returning(self.query, db).await
368    }
369
370    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
371    pub async fn exec_with_returning<'a, C>(
372        self,
373        db: &'a C,
374    ) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
375    where
376        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
377        C: ConnectionTrait,
378        A: 'a,
379    {
380        exec_insert_with_returning::<A, _>(self.primary_key, self.query, db).await
381    }
382
383    /// Execute an insert operation and return primary keys of inserted models
384    pub async fn exec_with_returning_keys<'a, C>(
385        self,
386        db: &'a C,
387    ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
388    where
389        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
390        C: ConnectionTrait,
391        A: 'a,
392    {
393        exec_insert_with_returning_keys::<A, _>(self.query, db).await
394    }
395
396    /// Execute an insert operation and return all inserted models
397    pub async fn exec_with_returning_many<'a, C>(
398        self,
399        db: &'a C,
400    ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
401    where
402        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
403        C: ConnectionTrait,
404        A: 'a,
405    {
406        exec_insert_with_returning_many::<A, _>(self.query, db).await
407    }
408}
409
410async fn exec_insert<A, C>(
411    primary_key: Option<ValueTuple>,
412    statement: InsertStatement,
413    db: &C,
414) -> Result<InsertResult<A>, DbErr>
415where
416    C: ConnectionTrait,
417    A: ActiveModelTrait,
418{
419    type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
420
421    let db_backend = db.get_database_backend();
422
423    let last_insert_id = match (primary_key, db.support_returning()) {
424        (_, true) => {
425            let mut rows = db.query_all(&statement).await?;
426            let row = match rows.pop() {
427                Some(row) => row,
428                None => return Err(DbErr::RecordNotInserted),
429            };
430            let cols = PrimaryKey::<A>::iter()
431                .map(|col| col.to_string())
432                .collect::<Vec<_>>();
433            row.try_get_many("", cols.as_ref())
434                .map_err(|_| DbErr::UnpackInsertId)?
435        }
436        (Some(value_tuple), false) => {
437            let res = db.execute(&statement).await?;
438            if res.rows_affected() == 0 {
439                return Err(DbErr::RecordNotInserted);
440            }
441            FromValueTuple::from_value_tuple(value_tuple)
442        }
443        (None, false) => {
444            let res = db.execute(&statement).await?;
445            if res.rows_affected() == 0 {
446                return Err(DbErr::RecordNotInserted);
447            }
448            let last_insert_id = res.last_insert_id();
449            // For MySQL, the affected-rows number:
450            //   - The affected-rows value per row is `1` if the row is inserted as a new row,
451            //   - `2` if an existing row is updated,
452            //   - and `0` if an existing row is set to its current values.
453            // Reference: https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html
454            if db_backend == DbBackend::MySql && last_insert_id == 0 {
455                return Err(DbErr::RecordNotInserted);
456            }
457            ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
458        }
459    };
460
461    Ok(InsertResult { last_insert_id })
462}
463
464async fn exec_insert_without_returning<C>(
465    insert_statement: InsertStatement,
466    db: &C,
467) -> Result<u64, DbErr>
468where
469    C: ConnectionTrait,
470{
471    let exec_result = db.execute(&insert_statement).await?;
472    Ok(exec_result.rows_affected())
473}
474
475async fn exec_insert_with_returning<A, C>(
476    primary_key: Option<ValueTuple>,
477    mut insert_statement: InsertStatement,
478    db: &C,
479) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
480where
481    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
482    C: ConnectionTrait,
483    A: ActiveModelTrait,
484{
485    let db_backend = db.get_database_backend();
486    let found = match db.support_returning() {
487        true => {
488            let returning = Query::returning().exprs(
489                <A::Entity as EntityTrait>::Column::iter()
490                    .map(|c| c.select_as(c.into_returning_expr(db_backend))),
491            );
492            insert_statement.returning(returning);
493            ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
494                insert_statement,
495            )
496            .one(db)
497            .await?
498        }
499        false => {
500            let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
501            <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
502                .one(db)
503                .await?
504        }
505    };
506    match found {
507        Some(model) => Ok(model),
508        None => Err(DbErr::RecordNotFound(
509            "Failed to find inserted item".to_owned(),
510        )),
511    }
512}
513
514async fn exec_insert_with_returning_keys<A, C>(
515    mut insert_statement: InsertStatement,
516    db: &C,
517) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
518where
519    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
520    C: ConnectionTrait,
521    A: ActiveModelTrait,
522{
523    let db_backend = db.get_database_backend();
524    match db.support_returning() {
525        true => {
526            insert_statement.returning(returning_pk::<A>(db_backend));
527            let rows = db.query_all(&insert_statement).await?;
528            let cols = PrimaryKey::<A>::iter()
529                .map(|col| col.to_string())
530                .collect::<Vec<_>>();
531            let mut keys = Vec::new();
532            for row in rows {
533                keys.push(
534                    row.try_get_many("", cols.as_ref())
535                        .map_err(|_| DbErr::UnpackInsertId)?,
536                );
537            }
538            Ok(keys)
539        }
540        false => Err(DbErr::BackendNotSupported {
541            db: db_backend.as_str(),
542            ctx: "INSERT RETURNING",
543        }),
544    }
545}
546
547async fn exec_insert_with_returning_many<A, C>(
548    mut insert_statement: InsertStatement,
549    db: &C,
550) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
551where
552    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
553    C: ConnectionTrait,
554    A: ActiveModelTrait,
555{
556    let db_backend = db.get_database_backend();
557    match db.support_returning() {
558        true => {
559            let returning = Query::returning().exprs(
560                <A::Entity as EntityTrait>::Column::iter()
561                    .map(|c| c.select_as(c.into_returning_expr(db_backend))),
562            );
563            insert_statement.returning(returning);
564            ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
565                insert_statement,
566            )
567            .all(db)
568            .await
569        }
570        false => Err(DbErr::BackendNotSupported {
571            db: db_backend.as_str(),
572            ctx: "INSERT RETURNING",
573        }),
574    }
575}
576
577fn returning_pk<A>(db_backend: DbBackend) -> ReturningClause
578where
579    A: ActiveModelTrait,
580{
581    Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
582        c.into_column()
583            .select_as(c.into_column().into_returning_expr(db_backend))
584    }))
585}