1use super::ReturningSelector;
2use crate::{
3 ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, InsertMany,
4 IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, TryFromU64,
5 TryInsert, error::*,
6};
7use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ReturningClause, ValueTuple};
8use std::{future::Future, marker::PhantomData};
9
10type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
11
12#[derive(Debug)]
14pub struct Inserter<A>
15where
16 A: ActiveModelTrait,
17{
18 primary_key: Option<ValueTuple>,
19 query: InsertStatement,
20 model: PhantomData<A>,
21}
22
23#[derive(Debug)]
25#[non_exhaustive]
26pub struct InsertResult<A>
27where
28 A: ActiveModelTrait,
29{
30 pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
32}
33
34#[derive(Debug)]
36#[non_exhaustive]
37pub struct InsertManyResult<A>
38where
39 A: ActiveModelTrait,
40{
41 pub last_insert_id: Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>,
43}
44
45#[derive(Debug)]
47pub enum TryInsertResult<T> {
48 Empty,
50 Conflicted,
52 Inserted(T),
54}
55
56impl<A> TryInsertResult<InsertResult<A>>
57where
58 A: ActiveModelTrait,
59{
60 pub fn last_insert_id(
62 self,
63 ) -> Result<Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr> {
64 match self {
65 Self::Empty => Ok(None),
66 Self::Inserted(v) => Ok(Some(v.last_insert_id)),
67 Self::Conflicted => Err(DbErr::RecordNotInserted),
68 }
69 }
70}
71
72impl<A> TryInsert<A>
73where
74 A: ActiveModelTrait,
75{
76 pub async fn exec<C>(self, db: &C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
78 where
79 C: ConnectionTrait,
80 {
81 if self.empty {
82 return Ok(TryInsertResult::Empty);
83 }
84 let res = self.insert_struct.exec(db).await;
85 match res {
86 Ok(res) => Ok(TryInsertResult::Inserted(res)),
87 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
88 Err(err) => Err(err),
89 }
90 }
91
92 pub async fn exec_without_returning<C>(self, db: &C) -> Result<TryInsertResult<u64>, DbErr>
95 where
96 C: ConnectionTrait,
97 {
98 if self.empty {
99 return Ok(TryInsertResult::Empty);
100 }
101 let res = self.insert_struct.exec_without_returning(db).await;
102 match res {
103 Ok(res) => Ok(TryInsertResult::Inserted(res)),
104 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
105 Err(err) => Err(err),
106 }
107 }
108
109 pub async fn exec_with_returning<C>(
111 self,
112 db: &C,
113 ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
114 where
115 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
116 C: ConnectionTrait,
117 {
118 if self.empty {
119 return Ok(TryInsertResult::Empty);
120 }
121 let res = self.insert_struct.exec_with_returning(db).await;
122 match res {
123 Ok(res) => Ok(TryInsertResult::Inserted(res)),
124 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
125 Err(err) => Err(err),
126 }
127 }
128
129 pub async fn exec_with_returning_keys<C>(
131 self,
132 db: &C,
133 ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
134 where
135 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
136 C: ConnectionTrait,
137 {
138 if self.empty {
139 return Ok(TryInsertResult::Empty);
140 }
141
142 let res = self.insert_struct.exec_with_returning_keys(db).await;
143 match res {
144 Ok(res) => Ok(TryInsertResult::Inserted(res)),
145 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
146 Err(err) => Err(err),
147 }
148 }
149
150 pub async fn exec_with_returning_many<C>(
152 self,
153 db: &C,
154 ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
155 where
156 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
157 C: ConnectionTrait,
158 {
159 if self.empty {
160 return Ok(TryInsertResult::Empty);
161 }
162
163 let res = self.insert_struct.exec_with_returning_many(db).await;
164 match res {
165 Ok(res) => Ok(TryInsertResult::Inserted(res)),
166 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
167 Err(err) => Err(err),
168 }
169 }
170}
171
172impl<A> Insert<A>
173where
174 A: ActiveModelTrait,
175{
176 pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
178 where
179 C: ConnectionTrait,
180 A: 'a,
181 {
182 let mut query = self.query;
184 if db.support_returning() {
185 query.returning(returning_pk::<A>(db.get_database_backend()));
186 }
187 Inserter::<A>::new(self.primary_key, query).exec(db)
188 }
189
190 pub fn exec_without_returning<'a, C>(
193 self,
194 db: &'a C,
195 ) -> impl Future<Output = Result<u64, DbErr>> + 'a
196 where
197 C: ConnectionTrait,
198 A: 'a,
199 {
200 Inserter::<A>::new(self.primary_key, self.query).exec_without_returning(db)
201 }
202
203 pub fn exec_with_returning<'a, C>(
208 self,
209 db: &'a C,
210 ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
211 where
212 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
213 C: ConnectionTrait,
214 A: 'a,
215 {
216 Inserter::<A>::new(self.primary_key, self.query).exec_with_returning(db)
217 }
218
219 pub fn exec_with_returning_keys<'a, C>(
221 self,
222 db: &'a C,
223 ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
224 where
225 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
226 C: ConnectionTrait,
227 A: 'a,
228 {
229 Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_keys(db)
230 }
231
232 pub fn exec_with_returning_many<'a, C>(
234 self,
235 db: &'a C,
236 ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
237 where
238 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
239 C: ConnectionTrait,
240 A: 'a,
241 {
242 Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_many(db)
243 }
244}
245
246impl<A> InsertMany<A>
247where
248 A: ActiveModelTrait,
249{
250 pub async fn exec<C>(self, db: &C) -> Result<InsertManyResult<A>, DbErr>
252 where
253 C: ConnectionTrait,
254 {
255 if self.empty {
256 return Ok(InsertManyResult {
257 last_insert_id: None,
258 });
259 }
260 let res = self.into_one().exec(db).await;
261 match res {
262 Ok(r) => Ok(InsertManyResult {
263 last_insert_id: Some(r.last_insert_id),
264 }),
265 Err(err) => Err(err),
266 }
267 }
268
269 pub async fn exec_without_returning<C>(self, db: &C) -> Result<u64, DbErr>
272 where
273 C: ConnectionTrait,
274 {
275 if self.empty {
276 return Ok(0);
277 }
278 self.into_one().exec_without_returning(db).await
279 }
280
281 pub async fn exec_with_returning<C>(
283 self,
284 db: &C,
285 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
286 where
287 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
288 C: ConnectionTrait,
289 {
290 if self.empty {
291 return Ok(Vec::new());
292 }
293
294 self.into_one().exec_with_returning_many(db).await
295 }
296
297 #[deprecated(
299 since = "1.2.0",
300 note = "Please use [`InsertMany::exec_with_returning`]"
301 )]
302 pub async fn exec_with_returning_many<C>(
303 self,
304 db: &C,
305 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
306 where
307 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
308 C: ConnectionTrait,
309 {
310 if self.empty {
311 return Ok(Vec::new());
312 }
313
314 self.into_one().exec_with_returning_many(db).await
315 }
316
317 pub async fn exec_with_returning_keys<C>(
319 self,
320 db: &C,
321 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
322 where
323 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
324 C: ConnectionTrait,
325 {
326 if self.empty {
327 return Ok(Vec::new());
328 }
329
330 self.into_one().exec_with_returning_keys(db).await
331 }
332}
333
334impl<A> Inserter<A>
335where
336 A: ActiveModelTrait,
337{
338 pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
340 Self {
341 primary_key,
342 query,
343 model: PhantomData,
344 }
345 }
346
347 pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
349 where
350 C: ConnectionTrait,
351 A: 'a,
352 {
353 exec_insert(self.primary_key, self.query, db)
354 }
355
356 pub fn exec_without_returning<'a, C>(
358 self,
359 db: &'a C,
360 ) -> impl Future<Output = Result<u64, DbErr>> + 'a
361 where
362 C: ConnectionTrait,
363 A: 'a,
364 {
365 exec_insert_without_returning(self.query, db)
366 }
367
368 pub fn exec_with_returning<'a, C>(
370 self,
371 db: &'a C,
372 ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
373 where
374 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
375 C: ConnectionTrait,
376 A: 'a,
377 {
378 exec_insert_with_returning::<A, _>(self.primary_key, self.query, db)
379 }
380
381 pub fn exec_with_returning_keys<'a, C>(
383 self,
384 db: &'a C,
385 ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
386 where
387 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
388 C: ConnectionTrait,
389 A: 'a,
390 {
391 exec_insert_with_returning_keys::<A, _>(self.query, db)
392 }
393
394 pub fn exec_with_returning_many<'a, C>(
396 self,
397 db: &'a C,
398 ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
399 where
400 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
401 C: ConnectionTrait,
402 A: 'a,
403 {
404 exec_insert_with_returning_many::<A, _>(self.query, db)
405 }
406}
407
408async fn exec_insert<A, C>(
409 primary_key: Option<ValueTuple>,
410 statement: InsertStatement,
411 db: &C,
412) -> Result<InsertResult<A>, DbErr>
413where
414 C: ConnectionTrait,
415 A: ActiveModelTrait,
416{
417 type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
418
419 let db_backend = db.get_database_backend();
420
421 let last_insert_id = match (primary_key, db.support_returning()) {
422 (Some(value_tuple), _) => {
423 let res = db.execute(&statement).await?;
424 if res.rows_affected() == 0 {
425 return Err(DbErr::RecordNotInserted);
426 }
427 FromValueTuple::from_value_tuple(value_tuple)
428 }
429 (None, true) => {
430 let mut rows = db.query_all(&statement).await?;
431 let row = match rows.pop() {
432 Some(row) => row,
433 None => return Err(DbErr::RecordNotInserted),
434 };
435 let cols = PrimaryKey::<A>::iter()
436 .map(|col| col.to_string())
437 .collect::<Vec<_>>();
438 row.try_get_many("", cols.as_ref())
439 .map_err(|_| DbErr::UnpackInsertId)?
440 }
441 (None, false) => {
442 let res = db.execute(&statement).await?;
443 if res.rows_affected() == 0 {
444 return Err(DbErr::RecordNotInserted);
445 }
446 let last_insert_id = res.last_insert_id();
447 if db_backend == DbBackend::MySql && last_insert_id == 0 {
453 return Err(DbErr::RecordNotInserted);
454 }
455 ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
456 }
457 };
458
459 Ok(InsertResult { last_insert_id })
460}
461
462async fn exec_insert_without_returning<C>(
463 insert_statement: InsertStatement,
464 db: &C,
465) -> Result<u64, DbErr>
466where
467 C: ConnectionTrait,
468{
469 let exec_result = db.execute(&insert_statement).await?;
470 Ok(exec_result.rows_affected())
471}
472
473async fn exec_insert_with_returning<A, C>(
474 primary_key: Option<ValueTuple>,
475 mut insert_statement: InsertStatement,
476 db: &C,
477) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
478where
479 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
480 C: ConnectionTrait,
481 A: ActiveModelTrait,
482{
483 let db_backend = db.get_database_backend();
484 let found = match db.support_returning() {
485 true => {
486 let returning = Query::returning().exprs(
487 <A::Entity as EntityTrait>::Column::iter()
488 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
489 );
490 insert_statement.returning(returning);
491 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
492 insert_statement,
493 )
494 .one(db)
495 .await?
496 }
497 false => {
498 let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
499 <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
500 .one(db)
501 .await?
502 }
503 };
504 match found {
505 Some(model) => Ok(model),
506 None => Err(DbErr::RecordNotFound(
507 "Failed to find inserted item".to_owned(),
508 )),
509 }
510}
511
512async fn exec_insert_with_returning_keys<A, C>(
513 mut insert_statement: InsertStatement,
514 db: &C,
515) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
516where
517 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
518 C: ConnectionTrait,
519 A: ActiveModelTrait,
520{
521 let db_backend = db.get_database_backend();
522 match db.support_returning() {
523 true => {
524 insert_statement.returning(returning_pk::<A>(db_backend));
525 let rows = db.query_all(&insert_statement).await?;
526 let cols = PrimaryKey::<A>::iter()
527 .map(|col| col.to_string())
528 .collect::<Vec<_>>();
529 let mut keys = Vec::new();
530 for row in rows {
531 keys.push(
532 row.try_get_many("", cols.as_ref())
533 .map_err(|_| DbErr::UnpackInsertId)?,
534 );
535 }
536 Ok(keys)
537 }
538 false => Err(DbErr::BackendNotSupported {
539 db: db_backend.as_str(),
540 ctx: "INSERT RETURNING",
541 }),
542 }
543}
544
545async fn exec_insert_with_returning_many<A, C>(
546 mut insert_statement: InsertStatement,
547 db: &C,
548) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
549where
550 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
551 C: ConnectionTrait,
552 A: ActiveModelTrait,
553{
554 let db_backend = db.get_database_backend();
555 match db.support_returning() {
556 true => {
557 let returning = Query::returning().exprs(
558 <A::Entity as EntityTrait>::Column::iter()
559 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
560 );
561 insert_statement.returning(returning);
562 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
563 insert_statement,
564 )
565 .all(db)
566 .await
567 }
568 false => Err(DbErr::BackendNotSupported {
569 db: db_backend.as_str(),
570 ctx: "INSERT RETURNING",
571 }),
572 }
573}
574
575fn returning_pk<A>(db_backend: DbBackend) -> ReturningClause
576where
577 A: ActiveModelTrait,
578{
579 Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
580 c.into_column()
581 .select_as(c.into_column().into_returning_expr(db_backend))
582 }))
583}