sea_orm/executor/
insert.rs

1use super::ReturningSelector;
2use crate::{
3    ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, InsertMany,
4    IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, TryFromU64,
5    TryInsert, error::*,
6};
7use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ReturningClause, ValueTuple};
8use std::{future::Future, marker::PhantomData};
9
10type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
11
12/// Defines a structure to perform INSERT operations in an ActiveModel
13#[derive(Debug)]
14pub struct Inserter<A>
15where
16    A: ActiveModelTrait,
17{
18    primary_key: Option<ValueTuple>,
19    query: InsertStatement,
20    model: PhantomData<A>,
21}
22
23/// The result of an INSERT operation on an ActiveModel
24#[derive(Debug)]
25#[non_exhaustive]
26pub struct InsertResult<A>
27where
28    A: ActiveModelTrait,
29{
30    /// The primary key value of the last inserted row
31    pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
32}
33
34/// The result of an INSERT many operation for a set of ActiveModels
35#[derive(Debug)]
36#[non_exhaustive]
37pub struct InsertManyResult<A>
38where
39    A: ActiveModelTrait,
40{
41    /// The primary key value of the last inserted row
42    pub last_insert_id: Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>,
43}
44
45/// The types of results for an INSERT operation
46#[derive(Debug)]
47pub enum TryInsertResult<T> {
48    /// The INSERT statement did not have any value to insert
49    Empty,
50    /// The INSERT operation did not insert any valid value
51    Conflicted,
52    /// Successfully inserted
53    Inserted(T),
54}
55
56impl<A> TryInsertResult<InsertResult<A>>
57where
58    A: ActiveModelTrait,
59{
60    /// Empty: `Ok(None)`. Inserted: `Ok(Some(last_insert_id))`. Conflicted: `Err(DbErr::RecordNotInserted)`.
61    pub fn last_insert_id(
62        self,
63    ) -> Result<Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr> {
64        match self {
65            Self::Empty => Ok(None),
66            Self::Inserted(v) => Ok(Some(v.last_insert_id)),
67            Self::Conflicted => Err(DbErr::RecordNotInserted),
68        }
69    }
70}
71
72impl<A> TryInsert<A>
73where
74    A: ActiveModelTrait,
75{
76    /// Execute an insert operation
77    pub async fn exec<C>(self, db: &C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
78    where
79        C: ConnectionTrait,
80    {
81        if self.empty {
82            return Ok(TryInsertResult::Empty);
83        }
84        let res = self.insert_struct.exec(db).await;
85        match res {
86            Ok(res) => Ok(TryInsertResult::Inserted(res)),
87            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
88            Err(err) => Err(err),
89        }
90    }
91
92    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
93    /// Number of rows affected is returned
94    pub async fn exec_without_returning<C>(self, db: &C) -> Result<TryInsertResult<u64>, DbErr>
95    where
96        C: ConnectionTrait,
97    {
98        if self.empty {
99            return Ok(TryInsertResult::Empty);
100        }
101        let res = self.insert_struct.exec_without_returning(db).await;
102        match res {
103            Ok(res) => Ok(TryInsertResult::Inserted(res)),
104            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
105            Err(err) => Err(err),
106        }
107    }
108
109    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
110    pub async fn exec_with_returning<C>(
111        self,
112        db: &C,
113    ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
114    where
115        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
116        C: ConnectionTrait,
117    {
118        if self.empty {
119            return Ok(TryInsertResult::Empty);
120        }
121        let res = self.insert_struct.exec_with_returning(db).await;
122        match res {
123            Ok(res) => Ok(TryInsertResult::Inserted(res)),
124            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
125            Err(err) => Err(err),
126        }
127    }
128
129    /// Execute an insert operation and return primary keys of inserted models
130    pub async fn exec_with_returning_keys<C>(
131        self,
132        db: &C,
133    ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
134    where
135        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
136        C: ConnectionTrait,
137    {
138        if self.empty {
139            return Ok(TryInsertResult::Empty);
140        }
141
142        let res = self.insert_struct.exec_with_returning_keys(db).await;
143        match res {
144            Ok(res) => Ok(TryInsertResult::Inserted(res)),
145            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
146            Err(err) => Err(err),
147        }
148    }
149
150    /// Execute an insert operation and return all inserted models
151    pub async fn exec_with_returning_many<C>(
152        self,
153        db: &C,
154    ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
155    where
156        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
157        C: ConnectionTrait,
158    {
159        if self.empty {
160            return Ok(TryInsertResult::Empty);
161        }
162
163        let res = self.insert_struct.exec_with_returning_many(db).await;
164        match res {
165            Ok(res) => Ok(TryInsertResult::Inserted(res)),
166            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
167            Err(err) => Err(err),
168        }
169    }
170}
171
172impl<A> Insert<A>
173where
174    A: ActiveModelTrait,
175{
176    /// Execute an insert operation
177    pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
178    where
179        C: ConnectionTrait,
180        A: 'a,
181    {
182        // so that self is dropped before entering await
183        let mut query = self.query;
184        if db.support_returning() {
185            query.returning(returning_pk::<A>(db.get_database_backend()));
186        }
187        Inserter::<A>::new(self.primary_key, query).exec(db)
188    }
189
190    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
191    /// Number of rows affected is returned
192    pub fn exec_without_returning<'a, C>(
193        self,
194        db: &'a C,
195    ) -> impl Future<Output = Result<u64, DbErr>> + 'a
196    where
197        C: ConnectionTrait,
198        A: 'a,
199    {
200        Inserter::<A>::new(self.primary_key, self.query).exec_without_returning(db)
201    }
202
203    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
204    ///
205    /// + To get back all inserted models, use [`exec_with_returning_many`].
206    /// + To get back all inserted primary keys, use [`exec_with_returning_keys`].
207    pub fn exec_with_returning<'a, C>(
208        self,
209        db: &'a C,
210    ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
211    where
212        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
213        C: ConnectionTrait,
214        A: 'a,
215    {
216        Inserter::<A>::new(self.primary_key, self.query).exec_with_returning(db)
217    }
218
219    /// Execute an insert operation and return primary keys of inserted models
220    pub fn exec_with_returning_keys<'a, C>(
221        self,
222        db: &'a C,
223    ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
224    where
225        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
226        C: ConnectionTrait,
227        A: 'a,
228    {
229        Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_keys(db)
230    }
231
232    /// Execute an insert operation and return all inserted models
233    pub fn exec_with_returning_many<'a, C>(
234        self,
235        db: &'a C,
236    ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
237    where
238        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
239        C: ConnectionTrait,
240        A: 'a,
241    {
242        Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_many(db)
243    }
244}
245
246impl<A> InsertMany<A>
247where
248    A: ActiveModelTrait,
249{
250    /// Execute an insert operation
251    pub async fn exec<C>(self, db: &C) -> Result<InsertManyResult<A>, DbErr>
252    where
253        C: ConnectionTrait,
254    {
255        if self.empty {
256            return Ok(InsertManyResult {
257                last_insert_id: None,
258            });
259        }
260        let res = self.into_one().exec(db).await;
261        match res {
262            Ok(r) => Ok(InsertManyResult {
263                last_insert_id: Some(r.last_insert_id),
264            }),
265            Err(err) => Err(err),
266        }
267    }
268
269    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
270    /// Number of rows affected is returned
271    pub async fn exec_without_returning<C>(self, db: &C) -> Result<u64, DbErr>
272    where
273        C: ConnectionTrait,
274    {
275        if self.empty {
276            return Ok(0);
277        }
278        self.into_one().exec_without_returning(db).await
279    }
280
281    /// Execute an insert operation and return all inserted models
282    pub async fn exec_with_returning<C>(
283        self,
284        db: &C,
285    ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
286    where
287        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
288        C: ConnectionTrait,
289    {
290        if self.empty {
291            return Ok(Vec::new());
292        }
293
294        self.into_one().exec_with_returning_many(db).await
295    }
296
297    /// Alias to [`InsertMany::exec_with_returning`].
298    #[deprecated(
299        since = "1.2.0",
300        note = "Please use [`InsertMany::exec_with_returning`]"
301    )]
302    pub async fn exec_with_returning_many<C>(
303        self,
304        db: &C,
305    ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
306    where
307        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
308        C: ConnectionTrait,
309    {
310        if self.empty {
311            return Ok(Vec::new());
312        }
313
314        self.into_one().exec_with_returning_many(db).await
315    }
316
317    /// Execute an insert operation and return primary keys of inserted models
318    pub async fn exec_with_returning_keys<C>(
319        self,
320        db: &C,
321    ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
322    where
323        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
324        C: ConnectionTrait,
325    {
326        if self.empty {
327            return Ok(Vec::new());
328        }
329
330        self.into_one().exec_with_returning_keys(db).await
331    }
332}
333
334impl<A> Inserter<A>
335where
336    A: ActiveModelTrait,
337{
338    /// Instantiate a new insert operation
339    pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
340        Self {
341            primary_key,
342            query,
343            model: PhantomData,
344        }
345    }
346
347    /// Execute an insert operation, returning the last inserted id
348    pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
349    where
350        C: ConnectionTrait,
351        A: 'a,
352    {
353        exec_insert(self.primary_key, self.query, db)
354    }
355
356    /// Execute an insert operation
357    pub fn exec_without_returning<'a, C>(
358        self,
359        db: &'a C,
360    ) -> impl Future<Output = Result<u64, DbErr>> + 'a
361    where
362        C: ConnectionTrait,
363        A: 'a,
364    {
365        exec_insert_without_returning(self.query, db)
366    }
367
368    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
369    pub fn exec_with_returning<'a, C>(
370        self,
371        db: &'a C,
372    ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
373    where
374        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
375        C: ConnectionTrait,
376        A: 'a,
377    {
378        exec_insert_with_returning::<A, _>(self.primary_key, self.query, db)
379    }
380
381    /// Execute an insert operation and return primary keys of inserted models
382    pub fn exec_with_returning_keys<'a, C>(
383        self,
384        db: &'a C,
385    ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
386    where
387        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
388        C: ConnectionTrait,
389        A: 'a,
390    {
391        exec_insert_with_returning_keys::<A, _>(self.query, db)
392    }
393
394    /// Execute an insert operation and return all inserted models
395    pub fn exec_with_returning_many<'a, C>(
396        self,
397        db: &'a C,
398    ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
399    where
400        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
401        C: ConnectionTrait,
402        A: 'a,
403    {
404        exec_insert_with_returning_many::<A, _>(self.query, db)
405    }
406}
407
408async fn exec_insert<A, C>(
409    primary_key: Option<ValueTuple>,
410    statement: InsertStatement,
411    db: &C,
412) -> Result<InsertResult<A>, DbErr>
413where
414    C: ConnectionTrait,
415    A: ActiveModelTrait,
416{
417    type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
418
419    let db_backend = db.get_database_backend();
420
421    let last_insert_id = match (primary_key, db.support_returning()) {
422        (Some(value_tuple), _) => {
423            let res = db.execute(&statement).await?;
424            if res.rows_affected() == 0 {
425                return Err(DbErr::RecordNotInserted);
426            }
427            FromValueTuple::from_value_tuple(value_tuple)
428        }
429        (None, true) => {
430            let mut rows = db.query_all(&statement).await?;
431            let row = match rows.pop() {
432                Some(row) => row,
433                None => return Err(DbErr::RecordNotInserted),
434            };
435            let cols = PrimaryKey::<A>::iter()
436                .map(|col| col.to_string())
437                .collect::<Vec<_>>();
438            row.try_get_many("", cols.as_ref())
439                .map_err(|_| DbErr::UnpackInsertId)?
440        }
441        (None, false) => {
442            let res = db.execute(&statement).await?;
443            if res.rows_affected() == 0 {
444                return Err(DbErr::RecordNotInserted);
445            }
446            let last_insert_id = res.last_insert_id();
447            // For MySQL, the affected-rows number:
448            //   - The affected-rows value per row is `1` if the row is inserted as a new row,
449            //   - `2` if an existing row is updated,
450            //   - and `0` if an existing row is set to its current values.
451            // Reference: https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html
452            if db_backend == DbBackend::MySql && last_insert_id == 0 {
453                return Err(DbErr::RecordNotInserted);
454            }
455            ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
456        }
457    };
458
459    Ok(InsertResult { last_insert_id })
460}
461
462async fn exec_insert_without_returning<C>(
463    insert_statement: InsertStatement,
464    db: &C,
465) -> Result<u64, DbErr>
466where
467    C: ConnectionTrait,
468{
469    let exec_result = db.execute(&insert_statement).await?;
470    Ok(exec_result.rows_affected())
471}
472
473async fn exec_insert_with_returning<A, C>(
474    primary_key: Option<ValueTuple>,
475    mut insert_statement: InsertStatement,
476    db: &C,
477) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
478where
479    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
480    C: ConnectionTrait,
481    A: ActiveModelTrait,
482{
483    let db_backend = db.get_database_backend();
484    let found = match db.support_returning() {
485        true => {
486            let returning = Query::returning().exprs(
487                <A::Entity as EntityTrait>::Column::iter()
488                    .map(|c| c.select_as(c.into_returning_expr(db_backend))),
489            );
490            insert_statement.returning(returning);
491            ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
492                insert_statement,
493            )
494            .one(db)
495            .await?
496        }
497        false => {
498            let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
499            <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
500                .one(db)
501                .await?
502        }
503    };
504    match found {
505        Some(model) => Ok(model),
506        None => Err(DbErr::RecordNotFound(
507            "Failed to find inserted item".to_owned(),
508        )),
509    }
510}
511
512async fn exec_insert_with_returning_keys<A, C>(
513    mut insert_statement: InsertStatement,
514    db: &C,
515) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
516where
517    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
518    C: ConnectionTrait,
519    A: ActiveModelTrait,
520{
521    let db_backend = db.get_database_backend();
522    match db.support_returning() {
523        true => {
524            insert_statement.returning(returning_pk::<A>(db_backend));
525            let rows = db.query_all(&insert_statement).await?;
526            let cols = PrimaryKey::<A>::iter()
527                .map(|col| col.to_string())
528                .collect::<Vec<_>>();
529            let mut keys = Vec::new();
530            for row in rows {
531                keys.push(
532                    row.try_get_many("", cols.as_ref())
533                        .map_err(|_| DbErr::UnpackInsertId)?,
534                );
535            }
536            Ok(keys)
537        }
538        false => Err(DbErr::BackendNotSupported {
539            db: db_backend.as_str(),
540            ctx: "INSERT RETURNING",
541        }),
542    }
543}
544
545async fn exec_insert_with_returning_many<A, C>(
546    mut insert_statement: InsertStatement,
547    db: &C,
548) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
549where
550    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
551    C: ConnectionTrait,
552    A: ActiveModelTrait,
553{
554    let db_backend = db.get_database_backend();
555    match db.support_returning() {
556        true => {
557            let returning = Query::returning().exprs(
558                <A::Entity as EntityTrait>::Column::iter()
559                    .map(|c| c.select_as(c.into_returning_expr(db_backend))),
560            );
561            insert_statement.returning(returning);
562            ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
563                insert_statement,
564            )
565            .all(db)
566            .await
567        }
568        false => Err(DbErr::BackendNotSupported {
569            db: db_backend.as_str(),
570            ctx: "INSERT RETURNING",
571        }),
572    }
573}
574
575fn returning_pk<A>(db_backend: DbBackend) -> ReturningClause
576where
577    A: ActiveModelTrait,
578{
579    Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
580        c.into_column()
581            .select_as(c.into_column().into_returning_expr(db_backend))
582    }))
583}