1use super::ReturningSelector;
2use crate::{
3 ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, InsertMany,
4 IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, TryFromU64,
5 TryInsert, error::*,
6};
7use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ReturningClause, ValueTuple};
8use std::{future::Future, marker::PhantomData};
9
10type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
11
12#[derive(Debug)]
14pub struct Inserter<A>
15where
16 A: ActiveModelTrait,
17{
18 primary_key: Option<ValueTuple>,
19 query: InsertStatement,
20 model: PhantomData<A>,
21}
22
23#[derive(Debug)]
25#[non_exhaustive]
26pub struct InsertResult<A>
27where
28 A: ActiveModelTrait,
29{
30 pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
32}
33
34#[derive(Debug)]
36#[non_exhaustive]
37pub struct InsertManyResult<A>
38where
39 A: ActiveModelTrait,
40{
41 pub last_insert_id: Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>,
43}
44
45#[derive(Debug)]
47pub enum TryInsertResult<T> {
48 Empty,
50 Conflicted,
52 Inserted(T),
54}
55
56impl<A> TryInsertResult<InsertResult<A>>
57where
58 A: ActiveModelTrait,
59{
60 pub fn last_insert_id(
62 self,
63 ) -> Result<Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr> {
64 match self {
65 Self::Empty => Ok(None),
66 Self::Inserted(v) => Ok(Some(v.last_insert_id)),
67 Self::Conflicted => Err(DbErr::RecordNotInserted),
68 }
69 }
70}
71
72impl<A> TryInsert<A>
73where
74 A: ActiveModelTrait,
75{
76 pub async fn exec<C>(self, db: &C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
78 where
79 C: ConnectionTrait,
80 {
81 if self.empty {
82 return Ok(TryInsertResult::Empty);
83 }
84 let res = self.insert_struct.exec(db).await;
85 match res {
86 Ok(res) => Ok(TryInsertResult::Inserted(res)),
87 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
88 Err(err) => Err(err),
89 }
90 }
91
92 pub async fn exec_without_returning<C>(self, db: &C) -> Result<TryInsertResult<u64>, DbErr>
95 where
96 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
97 C: ConnectionTrait,
98 {
99 if self.empty {
100 return Ok(TryInsertResult::Empty);
101 }
102 let res = self.insert_struct.exec_without_returning(db).await;
103 match res {
104 Ok(res) => Ok(TryInsertResult::Inserted(res)),
105 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
106 Err(err) => Err(err),
107 }
108 }
109
110 pub async fn exec_with_returning<C>(
112 self,
113 db: &C,
114 ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
115 where
116 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
117 C: ConnectionTrait,
118 {
119 if self.empty {
120 return Ok(TryInsertResult::Empty);
121 }
122 let res = self.insert_struct.exec_with_returning(db).await;
123 match res {
124 Ok(res) => Ok(TryInsertResult::Inserted(res)),
125 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
126 Err(err) => Err(err),
127 }
128 }
129
130 pub async fn exec_with_returning_keys<C>(
132 self,
133 db: &C,
134 ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
135 where
136 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
137 C: ConnectionTrait,
138 {
139 if self.empty {
140 return Ok(TryInsertResult::Empty);
141 }
142
143 let res = self.insert_struct.exec_with_returning_keys(db).await;
144 match res {
145 Ok(res) => Ok(TryInsertResult::Inserted(res)),
146 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
147 Err(err) => Err(err),
148 }
149 }
150
151 pub async fn exec_with_returning_many<C>(
153 self,
154 db: &C,
155 ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
156 where
157 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
158 C: ConnectionTrait,
159 {
160 if self.empty {
161 return Ok(TryInsertResult::Empty);
162 }
163
164 let res = self.insert_struct.exec_with_returning_many(db).await;
165 match res {
166 Ok(res) => Ok(TryInsertResult::Inserted(res)),
167 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
168 Err(err) => Err(err),
169 }
170 }
171}
172
173impl<A> Insert<A>
174where
175 A: ActiveModelTrait,
176{
177 pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
179 where
180 C: ConnectionTrait,
181 A: 'a,
182 {
183 let mut query = self.query;
185 if db.support_returning() {
186 query.returning(returning_pk::<A>(db.get_database_backend()));
187 }
188 Inserter::<A>::new(self.primary_key, query).exec(db)
189 }
190
191 pub fn exec_without_returning<'a, C>(
194 self,
195 db: &'a C,
196 ) -> impl Future<Output = Result<u64, DbErr>> + 'a
197 where
198 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
199 C: ConnectionTrait,
200 A: 'a,
201 {
202 Inserter::<A>::new(self.primary_key, self.query).exec_without_returning(db)
203 }
204
205 pub fn exec_with_returning<'a, C>(
210 self,
211 db: &'a C,
212 ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
213 where
214 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
215 C: ConnectionTrait,
216 A: 'a,
217 {
218 Inserter::<A>::new(self.primary_key, self.query).exec_with_returning(db)
219 }
220
221 pub fn exec_with_returning_keys<'a, C>(
223 self,
224 db: &'a C,
225 ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
226 where
227 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
228 C: ConnectionTrait,
229 A: 'a,
230 {
231 Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_keys(db)
232 }
233
234 pub fn exec_with_returning_many<'a, C>(
236 self,
237 db: &'a C,
238 ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
239 where
240 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
241 C: ConnectionTrait,
242 A: 'a,
243 {
244 Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_many(db)
245 }
246}
247
248impl<A> InsertMany<A>
249where
250 A: ActiveModelTrait,
251{
252 pub async fn exec<C>(self, db: &C) -> Result<InsertManyResult<A>, DbErr>
254 where
255 C: ConnectionTrait,
256 {
257 if self.empty {
258 return Ok(InsertManyResult {
259 last_insert_id: None,
260 });
261 }
262 let res = self.into_one().exec(db).await;
263 match res {
264 Ok(r) => Ok(InsertManyResult {
265 last_insert_id: Some(r.last_insert_id),
266 }),
267 Err(err) => Err(err),
268 }
269 }
270
271 pub async fn exec_without_returning<C>(self, db: &C) -> Result<u64, DbErr>
274 where
275 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
276 C: ConnectionTrait,
277 {
278 if self.empty {
279 return Ok(0);
280 }
281 self.into_one().exec_without_returning(db).await
282 }
283
284 pub async fn exec_with_returning<C>(
286 self,
287 db: &C,
288 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
289 where
290 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
291 C: ConnectionTrait,
292 {
293 if self.empty {
294 return Ok(Vec::new());
295 }
296
297 self.into_one().exec_with_returning_many(db).await
298 }
299
300 #[deprecated(
302 since = "1.2.0",
303 note = "Please use [`InsertMany::exec_with_returning`]"
304 )]
305 pub async fn exec_with_returning_many<C>(
306 self,
307 db: &C,
308 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
309 where
310 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
311 C: ConnectionTrait,
312 {
313 if self.empty {
314 return Ok(Vec::new());
315 }
316
317 self.into_one().exec_with_returning_many(db).await
318 }
319
320 pub async fn exec_with_returning_keys<C>(
322 self,
323 db: &C,
324 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
325 where
326 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
327 C: ConnectionTrait,
328 {
329 if self.empty {
330 return Ok(Vec::new());
331 }
332
333 self.into_one().exec_with_returning_keys(db).await
334 }
335}
336
337impl<A> Inserter<A>
338where
339 A: ActiveModelTrait,
340{
341 pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
343 Self {
344 primary_key,
345 query,
346 model: PhantomData,
347 }
348 }
349
350 pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
352 where
353 C: ConnectionTrait,
354 A: 'a,
355 {
356 exec_insert(self.primary_key, self.query, db)
357 }
358
359 pub fn exec_without_returning<'a, C>(
361 self,
362 db: &'a C,
363 ) -> impl Future<Output = Result<u64, DbErr>> + 'a
364 where
365 C: ConnectionTrait,
366 A: 'a,
367 {
368 exec_insert_without_returning(self.query, db)
369 }
370
371 pub fn exec_with_returning<'a, C>(
373 self,
374 db: &'a C,
375 ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
376 where
377 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
378 C: ConnectionTrait,
379 A: 'a,
380 {
381 exec_insert_with_returning::<A, _>(self.primary_key, self.query, db)
382 }
383
384 pub fn exec_with_returning_keys<'a, C>(
386 self,
387 db: &'a C,
388 ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
389 where
390 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
391 C: ConnectionTrait,
392 A: 'a,
393 {
394 exec_insert_with_returning_keys::<A, _>(self.query, db)
395 }
396
397 pub fn exec_with_returning_many<'a, C>(
399 self,
400 db: &'a C,
401 ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
402 where
403 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
404 C: ConnectionTrait,
405 A: 'a,
406 {
407 exec_insert_with_returning_many::<A, _>(self.query, db)
408 }
409}
410
411async fn exec_insert<A, C>(
412 primary_key: Option<ValueTuple>,
413 statement: InsertStatement,
414 db: &C,
415) -> Result<InsertResult<A>, DbErr>
416where
417 C: ConnectionTrait,
418 A: ActiveModelTrait,
419{
420 type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
421
422 let db_backend = db.get_database_backend();
423
424 let last_insert_id = match (primary_key, db.support_returning()) {
425 (Some(value_tuple), _) => {
426 let res = db.execute(&statement).await?;
427 if res.rows_affected() == 0 {
428 return Err(DbErr::RecordNotInserted);
429 }
430 FromValueTuple::from_value_tuple(value_tuple)
431 }
432 (None, true) => {
433 let mut rows = db.query_all(&statement).await?;
434 let row = match rows.pop() {
435 Some(row) => row,
436 None => return Err(DbErr::RecordNotInserted),
437 };
438 let cols = PrimaryKey::<A>::iter()
439 .map(|col| col.to_string())
440 .collect::<Vec<_>>();
441 row.try_get_many("", cols.as_ref())
442 .map_err(|_| DbErr::UnpackInsertId)?
443 }
444 (None, false) => {
445 let res = db.execute(&statement).await?;
446 if res.rows_affected() == 0 {
447 return Err(DbErr::RecordNotInserted);
448 }
449 let last_insert_id = res.last_insert_id();
450 if db_backend == DbBackend::MySql && last_insert_id == 0 {
456 return Err(DbErr::RecordNotInserted);
457 }
458 ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
459 }
460 };
461
462 Ok(InsertResult { last_insert_id })
463}
464
465async fn exec_insert_without_returning<C>(
466 insert_statement: InsertStatement,
467 db: &C,
468) -> Result<u64, DbErr>
469where
470 C: ConnectionTrait,
471{
472 let exec_result = db.execute(&insert_statement).await?;
473 Ok(exec_result.rows_affected())
474}
475
476async fn exec_insert_with_returning<A, C>(
477 primary_key: Option<ValueTuple>,
478 mut insert_statement: InsertStatement,
479 db: &C,
480) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
481where
482 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
483 C: ConnectionTrait,
484 A: ActiveModelTrait,
485{
486 let db_backend = db.get_database_backend();
487 let found = match db.support_returning() {
488 true => {
489 let returning = Query::returning().exprs(
490 <A::Entity as EntityTrait>::Column::iter()
491 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
492 );
493 insert_statement.returning(returning);
494 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
495 insert_statement,
496 )
497 .one(db)
498 .await?
499 }
500 false => {
501 let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
502 <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
503 .one(db)
504 .await?
505 }
506 };
507 match found {
508 Some(model) => Ok(model),
509 None => Err(DbErr::RecordNotFound(
510 "Failed to find inserted item".to_owned(),
511 )),
512 }
513}
514
515async fn exec_insert_with_returning_keys<A, C>(
516 mut insert_statement: InsertStatement,
517 db: &C,
518) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
519where
520 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
521 C: ConnectionTrait,
522 A: ActiveModelTrait,
523{
524 let db_backend = db.get_database_backend();
525 match db.support_returning() {
526 true => {
527 insert_statement.returning(returning_pk::<A>(db_backend));
528 let rows = db.query_all(&insert_statement).await?;
529 let cols = PrimaryKey::<A>::iter()
530 .map(|col| col.to_string())
531 .collect::<Vec<_>>();
532 let mut keys = Vec::new();
533 for row in rows {
534 keys.push(
535 row.try_get_many("", cols.as_ref())
536 .map_err(|_| DbErr::UnpackInsertId)?,
537 );
538 }
539 Ok(keys)
540 }
541 false => Err(DbErr::BackendNotSupported {
542 db: db_backend.as_str(),
543 ctx: "INSERT RETURNING",
544 }),
545 }
546}
547
548async fn exec_insert_with_returning_many<A, C>(
549 mut insert_statement: InsertStatement,
550 db: &C,
551) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
552where
553 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
554 C: ConnectionTrait,
555 A: ActiveModelTrait,
556{
557 let db_backend = db.get_database_backend();
558 match db.support_returning() {
559 true => {
560 let returning = Query::returning().exprs(
561 <A::Entity as EntityTrait>::Column::iter()
562 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
563 );
564 insert_statement.returning(returning);
565 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
566 insert_statement,
567 )
568 .all(db)
569 .await
570 }
571 false => Err(DbErr::BackendNotSupported {
572 db: db_backend.as_str(),
573 ctx: "INSERT RETURNING",
574 }),
575 }
576}
577
578fn returning_pk<A>(db_backend: DbBackend) -> ReturningClause
579where
580 A: ActiveModelTrait,
581{
582 Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
583 c.into_column()
584 .select_as(c.into_column().into_returning_expr(db_backend))
585 }))
586}