sea_orm/executor/
insert.rs

1use crate::{
2    error::*, ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert,
3    IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, SelectorRaw,
4    TryFromU64, TryInsert,
5};
6use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ValueTuple};
7use std::{future::Future, marker::PhantomData};
8
9type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
10
11/// Defines a structure to perform INSERT operations in an ActiveModel
12#[derive(Debug)]
13pub struct Inserter<A>
14where
15    A: ActiveModelTrait,
16{
17    primary_key: Option<ValueTuple>,
18    query: InsertStatement,
19    model: PhantomData<A>,
20}
21
22/// The result of an INSERT operation on an ActiveModel
23#[derive(Debug)]
24pub struct InsertResult<A>
25where
26    A: ActiveModelTrait,
27{
28    /// The id performed when AUTOINCREMENT was performed on the PrimaryKey
29    pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
30}
31
32/// The types of results for an INSERT operation
33#[derive(Debug)]
34pub enum TryInsertResult<T> {
35    /// The INSERT statement did not have any value to insert
36    Empty,
37    /// The INSERT operation did not insert any valid value
38    Conflicted,
39    /// Successfully inserted
40    Inserted(T),
41}
42
43impl<A> TryInsert<A>
44where
45    A: ActiveModelTrait,
46{
47    /// Execute an insert operation
48    #[allow(unused_mut)]
49    pub async fn exec<'a, C>(self, db: &'a C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
50    where
51        C: ConnectionTrait,
52        A: 'a,
53    {
54        if self.insert_struct.columns.is_empty() {
55            return Ok(TryInsertResult::Empty);
56        }
57        let res = self.insert_struct.exec(db).await;
58        match res {
59            Ok(res) => Ok(TryInsertResult::Inserted(res)),
60            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
61            Err(err) => Err(err),
62        }
63    }
64
65    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
66    /// Number of rows affected is returned
67    pub async fn exec_without_returning<'a, C>(
68        self,
69        db: &'a C,
70    ) -> Result<TryInsertResult<u64>, DbErr>
71    where
72        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
73        C: ConnectionTrait,
74        A: 'a,
75    {
76        if self.insert_struct.columns.is_empty() {
77            return Ok(TryInsertResult::Empty);
78        }
79        let res = self.insert_struct.exec_without_returning(db).await;
80        match res {
81            Ok(res) => Ok(TryInsertResult::Inserted(res)),
82            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
83            Err(err) => Err(err),
84        }
85    }
86
87    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
88    pub async fn exec_with_returning<'a, C>(
89        self,
90        db: &'a C,
91    ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
92    where
93        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
94        C: ConnectionTrait,
95        A: 'a,
96    {
97        if self.insert_struct.columns.is_empty() {
98            return Ok(TryInsertResult::Empty);
99        }
100        let res = self.insert_struct.exec_with_returning(db).await;
101        match res {
102            Ok(res) => Ok(TryInsertResult::Inserted(res)),
103            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
104            Err(err) => Err(err),
105        }
106    }
107
108    /// Execute an insert operation and return primary keys of inserted models
109    ///
110    /// # Panics
111    ///
112    /// Panics if the database backend does not support `INSERT RETURNING`.
113    pub async fn exec_with_returning_keys<'a, C>(
114        self,
115        db: &'a C,
116    ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
117    where
118        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
119        C: ConnectionTrait,
120        A: 'a,
121    {
122        if self.insert_struct.columns.is_empty() {
123            return Ok(TryInsertResult::Empty);
124        }
125
126        let res = self.insert_struct.exec_with_returning_keys(db).await;
127        match res {
128            Ok(res) => Ok(TryInsertResult::Inserted(res)),
129            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
130            Err(err) => Err(err),
131        }
132    }
133
134    /// Execute an insert operation and return all inserted models
135    ///
136    /// # Panics
137    ///
138    /// Panics if the database backend does not support `INSERT RETURNING`.
139    pub async fn exec_with_returning_many<'a, C>(
140        self,
141        db: &'a C,
142    ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
143    where
144        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
145        C: ConnectionTrait,
146        A: 'a,
147    {
148        if self.insert_struct.columns.is_empty() {
149            return Ok(TryInsertResult::Empty);
150        }
151
152        let res = self.insert_struct.exec_with_returning_many(db).await;
153        match res {
154            Ok(res) => Ok(TryInsertResult::Inserted(res)),
155            Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
156            Err(err) => Err(err),
157        }
158    }
159}
160
161impl<A> Insert<A>
162where
163    A: ActiveModelTrait,
164{
165    /// Execute an insert operation
166    #[allow(unused_mut)]
167    pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
168    where
169        C: ConnectionTrait,
170        A: 'a,
171    {
172        // so that self is dropped before entering await
173        let mut query = self.query;
174        if db.support_returning() {
175            let db_backend = db.get_database_backend();
176            let returning =
177                Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
178                    c.into_column()
179                        .select_as(c.into_column().into_returning_expr(db_backend))
180                }));
181            query.returning(returning);
182        }
183        Inserter::<A>::new(self.primary_key, query).exec(db)
184    }
185
186    /// Execute an insert operation without returning (don't use `RETURNING` syntax)
187    /// Number of rows affected is returned
188    pub fn exec_without_returning<'a, C>(
189        self,
190        db: &'a C,
191    ) -> impl Future<Output = Result<u64, DbErr>> + 'a
192    where
193        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
194        C: ConnectionTrait,
195        A: 'a,
196    {
197        Inserter::<A>::new(self.primary_key, self.query).exec_without_returning(db)
198    }
199
200    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
201    ///
202    /// + To get back all inserted models, use [`exec_with_returning_many`].
203    /// + To get back all inserted primary keys, use [`exec_with_returning_keys`].
204    pub fn exec_with_returning<'a, C>(
205        self,
206        db: &'a C,
207    ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
208    where
209        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
210        C: ConnectionTrait,
211        A: 'a,
212    {
213        Inserter::<A>::new(self.primary_key, self.query).exec_with_returning(db)
214    }
215
216    /// Execute an insert operation and return primary keys of inserted models
217    ///
218    /// # Panics
219    ///
220    /// Panics if the database backend does not support `INSERT RETURNING`.
221    pub fn exec_with_returning_keys<'a, C>(
222        self,
223        db: &'a C,
224    ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
225    where
226        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
227        C: ConnectionTrait,
228        A: 'a,
229    {
230        Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_keys(db)
231    }
232
233    /// Execute an insert operation and return all inserted models
234    ///
235    /// # Panics
236    ///
237    /// Panics if the database backend does not support `INSERT RETURNING`.
238    pub fn exec_with_returning_many<'a, C>(
239        self,
240        db: &'a C,
241    ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
242    where
243        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
244        C: ConnectionTrait,
245        A: 'a,
246    {
247        Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_many(db)
248    }
249}
250
251impl<A> Inserter<A>
252where
253    A: ActiveModelTrait,
254{
255    /// Instantiate a new insert operation
256    pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
257        Self {
258            primary_key,
259            query,
260            model: PhantomData,
261        }
262    }
263
264    /// Execute an insert operation, returning the last inserted id
265    pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
266    where
267        C: ConnectionTrait,
268        A: 'a,
269    {
270        exec_insert(self.primary_key, self.query, db)
271    }
272
273    /// Execute an insert operation
274    pub fn exec_without_returning<'a, C>(
275        self,
276        db: &'a C,
277    ) -> impl Future<Output = Result<u64, DbErr>> + 'a
278    where
279        C: ConnectionTrait,
280        A: 'a,
281    {
282        exec_insert_without_returning(self.query, db)
283    }
284
285    /// Execute an insert operation and return the inserted model (use `RETURNING` syntax if supported)
286    pub fn exec_with_returning<'a, C>(
287        self,
288        db: &'a C,
289    ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
290    where
291        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
292        C: ConnectionTrait,
293        A: 'a,
294    {
295        exec_insert_with_returning::<A, _>(self.primary_key, self.query, db)
296    }
297
298    /// Execute an insert operation and return primary keys of inserted models
299    ///
300    /// # Panics
301    ///
302    /// Panics if the database backend does not support `INSERT RETURNING`.
303    pub fn exec_with_returning_keys<'a, C>(
304        self,
305        db: &'a C,
306    ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
307    where
308        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
309        C: ConnectionTrait,
310        A: 'a,
311    {
312        exec_insert_with_returning_keys::<A, _>(self.query, db)
313    }
314
315    /// Execute an insert operation and return all inserted models
316    ///
317    /// # Panics
318    ///
319    /// Panics if the database backend does not support `INSERT RETURNING`.
320    pub fn exec_with_returning_many<'a, C>(
321        self,
322        db: &'a C,
323    ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
324    where
325        <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
326        C: ConnectionTrait,
327        A: 'a,
328    {
329        exec_insert_with_returning_many::<A, _>(self.query, db)
330    }
331}
332
333async fn exec_insert<A, C>(
334    primary_key: Option<ValueTuple>,
335    statement: InsertStatement,
336    db: &C,
337) -> Result<InsertResult<A>, DbErr>
338where
339    C: ConnectionTrait,
340    A: ActiveModelTrait,
341{
342    type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
343
344    let db_backend = db.get_database_backend();
345    let statement = db_backend.build(&statement);
346
347    let last_insert_id = match (primary_key, db.support_returning()) {
348        (Some(value_tuple), _) => {
349            let res = db.execute(statement).await?;
350            if res.rows_affected() == 0 {
351                return Err(DbErr::RecordNotInserted);
352            }
353            FromValueTuple::from_value_tuple(value_tuple)
354        }
355        (None, true) => {
356            let mut rows = db.query_all(statement).await?;
357            let row = match rows.pop() {
358                Some(row) => row,
359                None => return Err(DbErr::RecordNotInserted),
360            };
361            let cols = PrimaryKey::<A>::iter()
362                .map(|col| col.to_string())
363                .collect::<Vec<_>>();
364            row.try_get_many("", cols.as_ref())
365                .map_err(|_| DbErr::UnpackInsertId)?
366        }
367        (None, false) => {
368            let res = db.execute(statement).await?;
369            if res.rows_affected() == 0 {
370                return Err(DbErr::RecordNotInserted);
371            }
372            let last_insert_id = res.last_insert_id();
373            // For MySQL, the affected-rows number:
374            //   - The affected-rows value per row is `1` if the row is inserted as a new row,
375            //   - `2` if an existing row is updated,
376            //   - and `0` if an existing row is set to its current values.
377            // Reference: https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html
378            if db_backend == DbBackend::MySql && last_insert_id == 0 {
379                return Err(DbErr::RecordNotInserted);
380            }
381            ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
382        }
383    };
384
385    Ok(InsertResult { last_insert_id })
386}
387
388async fn exec_insert_without_returning<C>(
389    insert_statement: InsertStatement,
390    db: &C,
391) -> Result<u64, DbErr>
392where
393    C: ConnectionTrait,
394{
395    let db_backend = db.get_database_backend();
396    let insert_statement = db_backend.build(&insert_statement);
397    let exec_result = db.execute(insert_statement).await?;
398    Ok(exec_result.rows_affected())
399}
400
401async fn exec_insert_with_returning<A, C>(
402    primary_key: Option<ValueTuple>,
403    mut insert_statement: InsertStatement,
404    db: &C,
405) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
406where
407    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
408    C: ConnectionTrait,
409    A: ActiveModelTrait,
410{
411    let db_backend = db.get_database_backend();
412    let found = match db.support_returning() {
413        true => {
414            let returning = Query::returning().exprs(
415                <A::Entity as EntityTrait>::Column::iter()
416                    .map(|c| c.select_as(c.into_returning_expr(db_backend))),
417            );
418            insert_statement.returning(returning);
419            let insert_statement = db_backend.build(&insert_statement);
420            SelectorRaw::<SelectModel<<A::Entity as EntityTrait>::Model>>::from_statement(
421                insert_statement,
422            )
423            .one(db)
424            .await?
425        }
426        false => {
427            let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
428            <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
429                .one(db)
430                .await?
431        }
432    };
433    match found {
434        Some(model) => Ok(model),
435        None => Err(DbErr::RecordNotFound(
436            "Failed to find inserted item".to_owned(),
437        )),
438    }
439}
440
441async fn exec_insert_with_returning_keys<A, C>(
442    mut insert_statement: InsertStatement,
443    db: &C,
444) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
445where
446    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
447    C: ConnectionTrait,
448    A: ActiveModelTrait,
449{
450    let db_backend = db.get_database_backend();
451    match db.support_returning() {
452        true => {
453            let returning =
454                Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
455                    c.into_column()
456                        .select_as(c.into_column().into_returning_expr(db_backend))
457                }));
458            insert_statement.returning(returning);
459            let statement = db_backend.build(&insert_statement);
460            let rows = db.query_all(statement).await?;
461            let cols = PrimaryKey::<A>::iter()
462                .map(|col| col.to_string())
463                .collect::<Vec<_>>();
464            let mut keys = Vec::new();
465            for row in rows {
466                keys.push(
467                    row.try_get_many("", cols.as_ref())
468                        .map_err(|_| DbErr::UnpackInsertId)?,
469                );
470            }
471            Ok(keys)
472        }
473        false => unimplemented!("Database backend doesn't support RETURNING"),
474    }
475}
476
477async fn exec_insert_with_returning_many<A, C>(
478    mut insert_statement: InsertStatement,
479    db: &C,
480) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
481where
482    <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
483    C: ConnectionTrait,
484    A: ActiveModelTrait,
485{
486    let db_backend = db.get_database_backend();
487    match db.support_returning() {
488        true => {
489            let returning = Query::returning().exprs(
490                <A::Entity as EntityTrait>::Column::iter()
491                    .map(|c| c.select_as(c.into_returning_expr(db_backend))),
492            );
493            insert_statement.returning(returning);
494            let insert_statement = db_backend.build(&insert_statement);
495            SelectorRaw::<SelectModel<<A::Entity as EntityTrait>::Model>>::from_statement(
496                insert_statement,
497            )
498            .all(db)
499            .await
500        }
501        false => unimplemented!("Database backend doesn't support RETURNING"),
502    }
503}