1use super::ReturningSelector;
2use crate::{
3 ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, InsertMany,
4 IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, TryFromU64,
5 TryInsert, error::*,
6};
7use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ReturningClause, ValueTuple};
8use std::marker::PhantomData;
9
10type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
11
12#[derive(Debug)]
14pub struct Inserter<A>
15where
16 A: ActiveModelTrait,
17{
18 primary_key: Option<ValueTuple>,
19 query: InsertStatement,
20 model: PhantomData<A>,
21}
22
23#[derive(Debug)]
25#[non_exhaustive]
26pub struct InsertResult<A>
27where
28 A: ActiveModelTrait,
29{
30 pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
32}
33
34#[derive(Debug)]
36#[non_exhaustive]
37pub struct InsertManyResult<A>
38where
39 A: ActiveModelTrait,
40{
41 pub last_insert_id: Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>,
43}
44
45#[derive(Debug)]
50pub enum TryInsertResult<T> {
51 Empty,
55 Conflicted,
66 Inserted(T),
68}
69
70impl<A> TryInsertResult<InsertResult<A>>
71where
72 A: ActiveModelTrait,
73{
74 pub fn last_insert_id(
80 self,
81 ) -> Result<Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr> {
82 match self {
83 Self::Empty => Ok(None),
84 Self::Inserted(v) => Ok(Some(v.last_insert_id)),
85 Self::Conflicted => Err(DbErr::RecordNotInserted),
86 }
87 }
88}
89
90impl<A> TryInsert<A>
91where
92 A: ActiveModelTrait,
93{
94 pub async fn exec<C>(self, db: &C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
96 where
97 C: ConnectionTrait,
98 {
99 if self.empty {
100 return Ok(TryInsertResult::Empty);
101 }
102 let res = self.insert_struct.exec(db).await;
103 match res {
104 Ok(res) => Ok(TryInsertResult::Inserted(res)),
105 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
106 Err(err) => Err(err),
107 }
108 }
109
110 pub async fn exec_without_returning<C>(self, db: &C) -> Result<TryInsertResult<u64>, DbErr>
113 where
114 C: ConnectionTrait,
115 {
116 if self.empty {
117 return Ok(TryInsertResult::Empty);
118 }
119 let res = self.insert_struct.exec_without_returning(db).await;
120 match res {
121 Ok(res) => Ok(TryInsertResult::Inserted(res)),
122 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
123 Err(err) => Err(err),
124 }
125 }
126
127 pub async fn exec_with_returning<C>(
129 self,
130 db: &C,
131 ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
132 where
133 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
134 C: ConnectionTrait,
135 {
136 if self.empty {
137 return Ok(TryInsertResult::Empty);
138 }
139 let res = self.insert_struct.exec_with_returning(db).await;
140 match res {
141 Ok(res) => Ok(TryInsertResult::Inserted(res)),
142 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
143 Err(err) => Err(err),
144 }
145 }
146
147 pub async fn exec_with_returning_keys<C>(
149 self,
150 db: &C,
151 ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
152 where
153 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
154 C: ConnectionTrait,
155 {
156 if self.empty {
157 return Ok(TryInsertResult::Empty);
158 }
159
160 let res = self.insert_struct.exec_with_returning_keys(db).await;
161 match res {
162 Ok(res) => Ok(TryInsertResult::Inserted(res)),
163 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
164 Err(err) => Err(err),
165 }
166 }
167
168 pub async fn exec_with_returning_many<C>(
170 self,
171 db: &C,
172 ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
173 where
174 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
175 C: ConnectionTrait,
176 {
177 if self.empty {
178 return Ok(TryInsertResult::Empty);
179 }
180
181 let res = self.insert_struct.exec_with_returning_many(db).await;
182 match res {
183 Ok(res) => Ok(TryInsertResult::Inserted(res)),
184 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
185 Err(err) => Err(err),
186 }
187 }
188}
189
190impl<A> Insert<A>
191where
192 A: ActiveModelTrait,
193{
194 pub async fn exec<'a, C>(self, db: &'a C) -> Result<InsertResult<A>, DbErr>
196 where
197 C: ConnectionTrait,
198 A: 'a,
199 {
200 let mut query = self.query;
202 if db.support_returning() {
203 query.returning(returning_pk::<A>(db.get_database_backend()));
204 }
205 Inserter::<A>::new(self.primary_key, query).exec(db).await
206 }
207
208 pub async fn exec_without_returning<'a, C>(self, db: &'a C) -> Result<u64, DbErr>
211 where
212 C: ConnectionTrait,
213 A: 'a,
214 {
215 Inserter::<A>::new(self.primary_key, self.query)
216 .exec_without_returning(db)
217 .await
218 }
219
220 pub async fn exec_with_returning<'a, C>(
225 self,
226 db: &'a C,
227 ) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
228 where
229 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
230 C: ConnectionTrait,
231 A: 'a,
232 {
233 Inserter::<A>::new(self.primary_key, self.query)
234 .exec_with_returning(db)
235 .await
236 }
237
238 pub async fn exec_with_returning_keys<'a, C>(
240 self,
241 db: &'a C,
242 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
243 where
244 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
245 C: ConnectionTrait,
246 A: 'a,
247 {
248 Inserter::<A>::new(self.primary_key, self.query)
249 .exec_with_returning_keys(db)
250 .await
251 }
252
253 pub async fn exec_with_returning_many<'a, C>(
255 self,
256 db: &'a C,
257 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
258 where
259 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
260 C: ConnectionTrait,
261 A: 'a,
262 {
263 Inserter::<A>::new(self.primary_key, self.query)
264 .exec_with_returning_many(db)
265 .await
266 }
267}
268
269impl<A> InsertMany<A>
270where
271 A: ActiveModelTrait,
272{
273 pub async fn exec<C>(self, db: &C) -> Result<InsertManyResult<A>, DbErr>
275 where
276 C: ConnectionTrait,
277 {
278 if self.empty {
279 return Ok(InsertManyResult {
280 last_insert_id: None,
281 });
282 }
283 let res = self.into_one().exec(db).await;
284 match res {
285 Ok(r) => Ok(InsertManyResult {
286 last_insert_id: Some(r.last_insert_id),
287 }),
288 Err(err) => Err(err),
289 }
290 }
291
292 pub async fn exec_without_returning<C>(self, db: &C) -> Result<u64, DbErr>
295 where
296 C: ConnectionTrait,
297 {
298 if self.empty {
299 return Ok(0);
300 }
301 self.into_one().exec_without_returning(db).await
302 }
303
304 pub async fn exec_with_returning<C>(
306 self,
307 db: &C,
308 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
309 where
310 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
311 C: ConnectionTrait,
312 {
313 if self.empty {
314 return Ok(Vec::new());
315 }
316
317 self.into_one().exec_with_returning_many(db).await
318 }
319
320 #[deprecated(
322 since = "2.0.0",
323 note = "Please use [`InsertMany::exec_with_returning`]"
324 )]
325 pub async fn exec_with_returning_many<C>(
326 self,
327 db: &C,
328 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
329 where
330 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
331 C: ConnectionTrait,
332 {
333 if self.empty {
334 return Ok(Vec::new());
335 }
336
337 self.into_one().exec_with_returning_many(db).await
338 }
339
340 pub async fn exec_with_returning_keys<C>(
342 self,
343 db: &C,
344 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
345 where
346 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
347 C: ConnectionTrait,
348 {
349 if self.empty {
350 return Ok(Vec::new());
351 }
352
353 self.into_one().exec_with_returning_keys(db).await
354 }
355}
356
357impl<A> Inserter<A>
358where
359 A: ActiveModelTrait,
360{
361 pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
363 Self {
364 primary_key,
365 query,
366 model: PhantomData,
367 }
368 }
369
370 pub async fn exec<'a, C>(self, db: &'a C) -> Result<InsertResult<A>, DbErr>
372 where
373 C: ConnectionTrait,
374 A: 'a,
375 {
376 exec_insert(self.primary_key, self.query, db).await
377 }
378
379 pub async fn exec_without_returning<'a, C>(self, db: &'a C) -> Result<u64, DbErr>
381 where
382 C: ConnectionTrait,
383 A: 'a,
384 {
385 exec_insert_without_returning(self.query, db).await
386 }
387
388 pub async fn exec_with_returning<'a, C>(
390 self,
391 db: &'a C,
392 ) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
393 where
394 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
395 C: ConnectionTrait,
396 A: 'a,
397 {
398 exec_insert_with_returning::<A, _>(self.primary_key, self.query, db).await
399 }
400
401 pub async fn exec_with_returning_keys<'a, C>(
403 self,
404 db: &'a C,
405 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
406 where
407 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
408 C: ConnectionTrait,
409 A: 'a,
410 {
411 exec_insert_with_returning_keys::<A, _>(self.query, db).await
412 }
413
414 pub async fn exec_with_returning_many<'a, C>(
416 self,
417 db: &'a C,
418 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
419 where
420 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
421 C: ConnectionTrait,
422 A: 'a,
423 {
424 exec_insert_with_returning_many::<A, _>(self.query, db).await
425 }
426}
427
428async fn exec_insert<A, C>(
429 primary_key: Option<ValueTuple>,
430 statement: InsertStatement,
431 db: &C,
432) -> Result<InsertResult<A>, DbErr>
433where
434 C: ConnectionTrait,
435 A: ActiveModelTrait,
436{
437 type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
438
439 let db_backend = db.get_database_backend();
440
441 let last_insert_id = match (primary_key, db.support_returning()) {
442 (_, true) => {
443 let mut rows = db.query_all(&statement).await?;
444 let row = match rows.pop() {
445 Some(row) => row,
446 None => return Err(DbErr::RecordNotInserted),
447 };
448 let cols = PrimaryKey::<A>::iter()
449 .map(|col| col.to_string())
450 .collect::<Vec<_>>();
451 row.try_get_many("", cols.as_ref())
452 .map_err(|_| DbErr::UnpackInsertId)?
453 }
454 (Some(value_tuple), false) => {
455 let res = db.execute(&statement).await?;
456 if res.rows_affected() == 0 {
457 return Err(DbErr::RecordNotInserted);
458 }
459 FromValueTuple::from_value_tuple(value_tuple)
460 }
461 (None, false) => {
462 let res = db.execute(&statement).await?;
463 if res.rows_affected() == 0 {
464 return Err(DbErr::RecordNotInserted);
465 }
466 let last_insert_id = res.last_insert_id();
467 if db_backend == DbBackend::MySql && last_insert_id == 0 {
473 return Err(DbErr::RecordNotInserted);
474 }
475 ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
476 }
477 };
478
479 Ok(InsertResult { last_insert_id })
480}
481
482async fn exec_insert_without_returning<C>(
483 insert_statement: InsertStatement,
484 db: &C,
485) -> Result<u64, DbErr>
486where
487 C: ConnectionTrait,
488{
489 let exec_result = db.execute(&insert_statement).await?;
490 Ok(exec_result.rows_affected())
491}
492
493async fn exec_insert_with_returning<A, C>(
494 primary_key: Option<ValueTuple>,
495 mut insert_statement: InsertStatement,
496 db: &C,
497) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
498where
499 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
500 C: ConnectionTrait,
501 A: ActiveModelTrait,
502{
503 let db_backend = db.get_database_backend();
504 let found = match db.support_returning() {
505 true => {
506 let returning = Query::returning().exprs(
507 <A::Entity as EntityTrait>::Column::iter()
508 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
509 );
510 insert_statement.returning(returning);
511 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
512 insert_statement,
513 )
514 .one(db)
515 .await?
516 }
517 false => {
518 let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
519 <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
520 .one(db)
521 .await?
522 }
523 };
524 match found {
525 Some(model) => Ok(model),
526 None => Err(DbErr::RecordNotFound(
527 "Failed to find inserted item".to_owned(),
528 )),
529 }
530}
531
532async fn exec_insert_with_returning_keys<A, C>(
533 mut insert_statement: InsertStatement,
534 db: &C,
535) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
536where
537 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
538 C: ConnectionTrait,
539 A: ActiveModelTrait,
540{
541 let db_backend = db.get_database_backend();
542 match db.support_returning() {
543 true => {
544 insert_statement.returning(returning_pk::<A>(db_backend));
545 let rows = db.query_all(&insert_statement).await?;
546 let cols = PrimaryKey::<A>::iter()
547 .map(|col| col.to_string())
548 .collect::<Vec<_>>();
549 let mut keys = Vec::new();
550 for row in rows {
551 keys.push(
552 row.try_get_many("", cols.as_ref())
553 .map_err(|_| DbErr::UnpackInsertId)?,
554 );
555 }
556 Ok(keys)
557 }
558 false => Err(DbErr::BackendNotSupported {
559 db: db_backend.as_str(),
560 ctx: "INSERT RETURNING",
561 }),
562 }
563}
564
565async fn exec_insert_with_returning_many<A, C>(
566 mut insert_statement: InsertStatement,
567 db: &C,
568) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
569where
570 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
571 C: ConnectionTrait,
572 A: ActiveModelTrait,
573{
574 let db_backend = db.get_database_backend();
575 match db.support_returning() {
576 true => {
577 let returning = Query::returning().exprs(
578 <A::Entity as EntityTrait>::Column::iter()
579 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
580 );
581 insert_statement.returning(returning);
582 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
583 insert_statement,
584 )
585 .all(db)
586 .await
587 }
588 false => Err(DbErr::BackendNotSupported {
589 db: db_backend.as_str(),
590 ctx: "INSERT RETURNING",
591 }),
592 }
593}
594
595fn returning_pk<A>(db_backend: DbBackend) -> ReturningClause
596where
597 A: ActiveModelTrait,
598{
599 Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
600 c.into_column()
601 .select_as(c.into_column().into_returning_expr(db_backend))
602 }))
603}