1use super::ReturningSelector;
2use crate::{
3 ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, InsertMany,
4 IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, TryFromU64,
5 TryInsert, error::*,
6};
7use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ReturningClause, ValueTuple};
8use std::marker::PhantomData;
9
10type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
11
12#[derive(Debug)]
18pub struct Inserter<A>
19where
20 A: ActiveModelTrait,
21{
22 primary_key: Option<ValueTuple>,
23 query: InsertStatement,
24 model: PhantomData<A>,
25}
26
27#[derive(Debug)]
30#[non_exhaustive]
31pub struct InsertResult<A>
32where
33 A: ActiveModelTrait,
34{
35 pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
37}
38
39#[derive(Debug)]
42#[non_exhaustive]
43pub struct InsertManyResult<A>
44where
45 A: ActiveModelTrait,
46{
47 pub last_insert_id: Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>,
49}
50
51#[derive(Debug)]
56pub enum TryInsertResult<T> {
57 Empty,
61 Conflicted,
72 Inserted(T),
74}
75
76impl<A> TryInsertResult<InsertResult<A>>
77where
78 A: ActiveModelTrait,
79{
80 pub fn last_insert_id(
86 self,
87 ) -> Result<Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr> {
88 match self {
89 Self::Empty => Ok(None),
90 Self::Inserted(v) => Ok(Some(v.last_insert_id)),
91 Self::Conflicted => Err(DbErr::RecordNotInserted),
92 }
93 }
94}
95
96impl<A> TryInsert<A>
97where
98 A: ActiveModelTrait,
99{
100 pub async fn exec<C>(self, db: &C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
102 where
103 C: ConnectionTrait,
104 {
105 if self.empty {
106 return Ok(TryInsertResult::Empty);
107 }
108 let res = self.insert_struct.exec(db).await;
109 match res {
110 Ok(res) => Ok(TryInsertResult::Inserted(res)),
111 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
112 Err(err) => Err(err),
113 }
114 }
115
116 pub async fn exec_without_returning<C>(self, db: &C) -> Result<TryInsertResult<u64>, DbErr>
119 where
120 C: ConnectionTrait,
121 {
122 if self.empty {
123 return Ok(TryInsertResult::Empty);
124 }
125 let res = self.insert_struct.exec_without_returning(db).await;
126 match res {
127 Ok(res) => Ok(TryInsertResult::Inserted(res)),
128 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
129 Err(err) => Err(err),
130 }
131 }
132
133 pub async fn exec_with_returning<C>(
135 self,
136 db: &C,
137 ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
138 where
139 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
140 C: ConnectionTrait,
141 {
142 if self.empty {
143 return Ok(TryInsertResult::Empty);
144 }
145 let res = self.insert_struct.exec_with_returning(db).await;
146 match res {
147 Ok(res) => Ok(TryInsertResult::Inserted(res)),
148 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
149 Err(err) => Err(err),
150 }
151 }
152
153 pub async fn exec_with_returning_keys<C>(
155 self,
156 db: &C,
157 ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
158 where
159 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
160 C: ConnectionTrait,
161 {
162 if self.empty {
163 return Ok(TryInsertResult::Empty);
164 }
165
166 let res = self.insert_struct.exec_with_returning_keys(db).await;
167 match res {
168 Ok(res) => Ok(TryInsertResult::Inserted(res)),
169 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
170 Err(err) => Err(err),
171 }
172 }
173
174 pub async fn exec_with_returning_many<C>(
176 self,
177 db: &C,
178 ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
179 where
180 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
181 C: ConnectionTrait,
182 {
183 if self.empty {
184 return Ok(TryInsertResult::Empty);
185 }
186
187 let res = self.insert_struct.exec_with_returning_many(db).await;
188 match res {
189 Ok(res) => Ok(TryInsertResult::Inserted(res)),
190 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
191 Err(err) => Err(err),
192 }
193 }
194}
195
196impl<A> Insert<A>
197where
198 A: ActiveModelTrait,
199{
200 pub async fn exec<'a, C>(self, db: &'a C) -> Result<InsertResult<A>, DbErr>
202 where
203 C: ConnectionTrait,
204 A: 'a,
205 {
206 let mut query = self.query;
208 if db.support_returning() {
209 query.returning(returning_pk::<A>(db.get_database_backend()));
210 }
211 Inserter::<A>::new(self.primary_key, query).exec(db).await
212 }
213
214 pub async fn exec_without_returning<'a, C>(self, db: &'a C) -> Result<u64, DbErr>
217 where
218 C: ConnectionTrait,
219 A: 'a,
220 {
221 Inserter::<A>::new(self.primary_key, self.query)
222 .exec_without_returning(db)
223 .await
224 }
225
226 pub async fn exec_with_returning<'a, C>(
232 self,
233 db: &'a C,
234 ) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
235 where
236 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
237 C: ConnectionTrait,
238 A: 'a,
239 {
240 Inserter::<A>::new(self.primary_key, self.query)
241 .exec_with_returning(db)
242 .await
243 }
244
245 pub async fn exec_with_returning_keys<'a, C>(
247 self,
248 db: &'a C,
249 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
250 where
251 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
252 C: ConnectionTrait,
253 A: 'a,
254 {
255 Inserter::<A>::new(self.primary_key, self.query)
256 .exec_with_returning_keys(db)
257 .await
258 }
259
260 pub async fn exec_with_returning_many<'a, C>(
262 self,
263 db: &'a C,
264 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
265 where
266 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
267 C: ConnectionTrait,
268 A: 'a,
269 {
270 Inserter::<A>::new(self.primary_key, self.query)
271 .exec_with_returning_many(db)
272 .await
273 }
274}
275
276impl<A> InsertMany<A>
277where
278 A: ActiveModelTrait,
279{
280 pub async fn exec<C>(self, db: &C) -> Result<InsertManyResult<A>, DbErr>
282 where
283 C: ConnectionTrait,
284 {
285 if self.empty {
286 return Ok(InsertManyResult {
287 last_insert_id: None,
288 });
289 }
290 let res = self.into_one().exec(db).await;
291 match res {
292 Ok(r) => Ok(InsertManyResult {
293 last_insert_id: Some(r.last_insert_id),
294 }),
295 Err(err) => Err(err),
296 }
297 }
298
299 pub async fn exec_without_returning<C>(self, db: &C) -> Result<u64, DbErr>
302 where
303 C: ConnectionTrait,
304 {
305 if self.empty {
306 return Ok(0);
307 }
308 self.into_one().exec_without_returning(db).await
309 }
310
311 pub async fn exec_with_returning<C>(
313 self,
314 db: &C,
315 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
316 where
317 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
318 C: ConnectionTrait,
319 {
320 if self.empty {
321 return Ok(Vec::new());
322 }
323
324 self.into_one().exec_with_returning_many(db).await
325 }
326
327 #[deprecated(
329 since = "2.0.0",
330 note = "Please use [`InsertMany::exec_with_returning`]"
331 )]
332 pub async fn exec_with_returning_many<C>(
333 self,
334 db: &C,
335 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
336 where
337 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
338 C: ConnectionTrait,
339 {
340 if self.empty {
341 return Ok(Vec::new());
342 }
343
344 self.into_one().exec_with_returning_many(db).await
345 }
346
347 pub async fn exec_with_returning_keys<C>(
349 self,
350 db: &C,
351 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
352 where
353 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
354 C: ConnectionTrait,
355 {
356 if self.empty {
357 return Ok(Vec::new());
358 }
359
360 self.into_one().exec_with_returning_keys(db).await
361 }
362}
363
364impl<A> Inserter<A>
365where
366 A: ActiveModelTrait,
367{
368 pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
370 Self {
371 primary_key,
372 query,
373 model: PhantomData,
374 }
375 }
376
377 pub async fn exec<'a, C>(self, db: &'a C) -> Result<InsertResult<A>, DbErr>
379 where
380 C: ConnectionTrait,
381 A: 'a,
382 {
383 exec_insert(self.primary_key, self.query, db).await
384 }
385
386 pub async fn exec_without_returning<'a, C>(self, db: &'a C) -> Result<u64, DbErr>
388 where
389 C: ConnectionTrait,
390 A: 'a,
391 {
392 exec_insert_without_returning(self.query, db).await
393 }
394
395 pub async fn exec_with_returning<'a, C>(
397 self,
398 db: &'a C,
399 ) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
400 where
401 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
402 C: ConnectionTrait,
403 A: 'a,
404 {
405 exec_insert_with_returning::<A, _>(self.primary_key, self.query, db).await
406 }
407
408 pub async fn exec_with_returning_keys<'a, C>(
410 self,
411 db: &'a C,
412 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
413 where
414 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
415 C: ConnectionTrait,
416 A: 'a,
417 {
418 exec_insert_with_returning_keys::<A, _>(self.query, db).await
419 }
420
421 pub async fn exec_with_returning_many<'a, C>(
423 self,
424 db: &'a C,
425 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
426 where
427 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
428 C: ConnectionTrait,
429 A: 'a,
430 {
431 exec_insert_with_returning_many::<A, _>(self.query, db).await
432 }
433}
434
435async fn exec_insert<A, C>(
436 primary_key: Option<ValueTuple>,
437 statement: InsertStatement,
438 db: &C,
439) -> Result<InsertResult<A>, DbErr>
440where
441 C: ConnectionTrait,
442 A: ActiveModelTrait,
443{
444 type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
445
446 let db_backend = db.get_database_backend();
447
448 let last_insert_id = match (primary_key, db.support_returning()) {
449 (_, true) => {
450 let mut rows = db.query_all(&statement).await?;
451 let row = match rows.pop() {
452 Some(row) => row,
453 None => return Err(DbErr::RecordNotInserted),
454 };
455 let cols = PrimaryKey::<A>::iter()
456 .map(|col| col.to_string())
457 .collect::<Vec<_>>();
458 row.try_get_many("", cols.as_ref())
459 .map_err(|_| DbErr::UnpackInsertId)?
460 }
461 (Some(value_tuple), false) => {
462 let res = db.execute(&statement).await?;
463 if res.rows_affected() == 0 {
464 return Err(DbErr::RecordNotInserted);
465 }
466 FromValueTuple::from_value_tuple(value_tuple)
467 }
468 (None, false) => {
469 let res = db.execute(&statement).await?;
470 if res.rows_affected() == 0 {
471 return Err(DbErr::RecordNotInserted);
472 }
473 let last_insert_id = res.last_insert_id();
474 if db_backend == DbBackend::MySql && last_insert_id == 0 {
480 return Err(DbErr::RecordNotInserted);
481 }
482 ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
483 }
484 };
485
486 Ok(InsertResult { last_insert_id })
487}
488
489async fn exec_insert_without_returning<C>(
490 insert_statement: InsertStatement,
491 db: &C,
492) -> Result<u64, DbErr>
493where
494 C: ConnectionTrait,
495{
496 let exec_result = db.execute(&insert_statement).await?;
497 Ok(exec_result.rows_affected())
498}
499
500async fn exec_insert_with_returning<A, C>(
501 primary_key: Option<ValueTuple>,
502 mut insert_statement: InsertStatement,
503 db: &C,
504) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
505where
506 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
507 C: ConnectionTrait,
508 A: ActiveModelTrait,
509{
510 let db_backend = db.get_database_backend();
511 let found = match db.support_returning() {
512 true => {
513 let returning = Query::returning().exprs(
514 <A::Entity as EntityTrait>::Column::iter()
515 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
516 );
517 insert_statement.returning(returning);
518 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
519 insert_statement,
520 )
521 .one(db)
522 .await?
523 }
524 false => {
525 let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
526 <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
527 .one(db)
528 .await?
529 }
530 };
531 match found {
532 Some(model) => Ok(model),
533 None => Err(DbErr::RecordNotFound(
534 "Failed to find inserted item".to_owned(),
535 )),
536 }
537}
538
539async fn exec_insert_with_returning_keys<A, C>(
540 mut insert_statement: InsertStatement,
541 db: &C,
542) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
543where
544 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
545 C: ConnectionTrait,
546 A: ActiveModelTrait,
547{
548 let db_backend = db.get_database_backend();
549 match db.support_returning() {
550 true => {
551 insert_statement.returning(returning_pk::<A>(db_backend));
552 let rows = db.query_all(&insert_statement).await?;
553 let cols = PrimaryKey::<A>::iter()
554 .map(|col| col.to_string())
555 .collect::<Vec<_>>();
556 let mut keys = Vec::new();
557 for row in rows {
558 keys.push(
559 row.try_get_many("", cols.as_ref())
560 .map_err(|_| DbErr::UnpackInsertId)?,
561 );
562 }
563 Ok(keys)
564 }
565 false => Err(DbErr::BackendNotSupported {
566 db: db_backend.as_str(),
567 ctx: "INSERT RETURNING",
568 }),
569 }
570}
571
572async fn exec_insert_with_returning_many<A, C>(
573 mut insert_statement: InsertStatement,
574 db: &C,
575) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
576where
577 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
578 C: ConnectionTrait,
579 A: ActiveModelTrait,
580{
581 let db_backend = db.get_database_backend();
582 match db.support_returning() {
583 true => {
584 let returning = Query::returning().exprs(
585 <A::Entity as EntityTrait>::Column::iter()
586 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
587 );
588 insert_statement.returning(returning);
589 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
590 insert_statement,
591 )
592 .all(db)
593 .await
594 }
595 false => Err(DbErr::BackendNotSupported {
596 db: db_backend.as_str(),
597 ctx: "INSERT RETURNING",
598 }),
599 }
600}
601
602fn returning_pk<A>(db_backend: DbBackend) -> ReturningClause
603where
604 A: ActiveModelTrait,
605{
606 Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
607 c.into_column()
608 .select_as(c.into_column().into_returning_expr(db_backend))
609 }))
610}