1use super::ReturningSelector;
2use crate::{
3 ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, InsertMany,
4 IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, TryFromU64,
5 TryInsert, error::*,
6};
7use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ReturningClause, ValueTuple};
8use std::marker::PhantomData;
9
10type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
11
12#[derive(Debug)]
14pub struct Inserter<A>
15where
16 A: ActiveModelTrait,
17{
18 primary_key: Option<ValueTuple>,
19 query: InsertStatement,
20 model: PhantomData<A>,
21}
22
23#[derive(Debug)]
25#[non_exhaustive]
26pub struct InsertResult<A>
27where
28 A: ActiveModelTrait,
29{
30 pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
32}
33
34#[derive(Debug)]
36#[non_exhaustive]
37pub struct InsertManyResult<A>
38where
39 A: ActiveModelTrait,
40{
41 pub last_insert_id: Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>,
43}
44
45#[derive(Debug)]
47pub enum TryInsertResult<T> {
48 Empty,
50 Conflicted,
52 Inserted(T),
54}
55
56impl<A> TryInsertResult<InsertResult<A>>
57where
58 A: ActiveModelTrait,
59{
60 pub fn last_insert_id(
62 self,
63 ) -> Result<Option<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr> {
64 match self {
65 Self::Empty => Ok(None),
66 Self::Inserted(v) => Ok(Some(v.last_insert_id)),
67 Self::Conflicted => Err(DbErr::RecordNotInserted),
68 }
69 }
70}
71
72impl<A> TryInsert<A>
73where
74 A: ActiveModelTrait,
75{
76 pub async fn exec<C>(self, db: &C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
78 where
79 C: ConnectionTrait,
80 {
81 if self.empty {
82 return Ok(TryInsertResult::Empty);
83 }
84 let res = self.insert_struct.exec(db).await;
85 match res {
86 Ok(res) => Ok(TryInsertResult::Inserted(res)),
87 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
88 Err(err) => Err(err),
89 }
90 }
91
92 pub async fn exec_without_returning<C>(self, db: &C) -> Result<TryInsertResult<u64>, DbErr>
95 where
96 C: ConnectionTrait,
97 {
98 if self.empty {
99 return Ok(TryInsertResult::Empty);
100 }
101 let res = self.insert_struct.exec_without_returning(db).await;
102 match res {
103 Ok(res) => Ok(TryInsertResult::Inserted(res)),
104 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
105 Err(err) => Err(err),
106 }
107 }
108
109 pub async fn exec_with_returning<C>(
111 self,
112 db: &C,
113 ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
114 where
115 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
116 C: ConnectionTrait,
117 {
118 if self.empty {
119 return Ok(TryInsertResult::Empty);
120 }
121 let res = self.insert_struct.exec_with_returning(db).await;
122 match res {
123 Ok(res) => Ok(TryInsertResult::Inserted(res)),
124 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
125 Err(err) => Err(err),
126 }
127 }
128
129 pub async fn exec_with_returning_keys<C>(
131 self,
132 db: &C,
133 ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
134 where
135 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
136 C: ConnectionTrait,
137 {
138 if self.empty {
139 return Ok(TryInsertResult::Empty);
140 }
141
142 let res = self.insert_struct.exec_with_returning_keys(db).await;
143 match res {
144 Ok(res) => Ok(TryInsertResult::Inserted(res)),
145 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
146 Err(err) => Err(err),
147 }
148 }
149
150 pub async fn exec_with_returning_many<C>(
152 self,
153 db: &C,
154 ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
155 where
156 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
157 C: ConnectionTrait,
158 {
159 if self.empty {
160 return Ok(TryInsertResult::Empty);
161 }
162
163 let res = self.insert_struct.exec_with_returning_many(db).await;
164 match res {
165 Ok(res) => Ok(TryInsertResult::Inserted(res)),
166 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
167 Err(err) => Err(err),
168 }
169 }
170}
171
172impl<A> Insert<A>
173where
174 A: ActiveModelTrait,
175{
176 pub async fn exec<'a, C>(self, db: &'a C) -> Result<InsertResult<A>, DbErr>
178 where
179 C: ConnectionTrait,
180 A: 'a,
181 {
182 let mut query = self.query;
184 if db.support_returning() {
185 query.returning(returning_pk::<A>(db.get_database_backend()));
186 }
187 Inserter::<A>::new(self.primary_key, query).exec(db).await
188 }
189
190 pub async fn exec_without_returning<'a, C>(self, db: &'a C) -> Result<u64, DbErr>
193 where
194 C: ConnectionTrait,
195 A: 'a,
196 {
197 Inserter::<A>::new(self.primary_key, self.query)
198 .exec_without_returning(db)
199 .await
200 }
201
202 pub async fn exec_with_returning<'a, C>(
207 self,
208 db: &'a C,
209 ) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
210 where
211 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
212 C: ConnectionTrait,
213 A: 'a,
214 {
215 Inserter::<A>::new(self.primary_key, self.query)
216 .exec_with_returning(db)
217 .await
218 }
219
220 pub async fn exec_with_returning_keys<'a, C>(
222 self,
223 db: &'a C,
224 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
225 where
226 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
227 C: ConnectionTrait,
228 A: 'a,
229 {
230 Inserter::<A>::new(self.primary_key, self.query)
231 .exec_with_returning_keys(db)
232 .await
233 }
234
235 pub async fn exec_with_returning_many<'a, C>(
237 self,
238 db: &'a C,
239 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
240 where
241 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
242 C: ConnectionTrait,
243 A: 'a,
244 {
245 Inserter::<A>::new(self.primary_key, self.query)
246 .exec_with_returning_many(db)
247 .await
248 }
249}
250
251impl<A> InsertMany<A>
252where
253 A: ActiveModelTrait,
254{
255 pub async fn exec<C>(self, db: &C) -> Result<InsertManyResult<A>, DbErr>
257 where
258 C: ConnectionTrait,
259 {
260 if self.empty {
261 return Ok(InsertManyResult {
262 last_insert_id: None,
263 });
264 }
265 let res = self.into_one().exec(db).await;
266 match res {
267 Ok(r) => Ok(InsertManyResult {
268 last_insert_id: Some(r.last_insert_id),
269 }),
270 Err(err) => Err(err),
271 }
272 }
273
274 pub async fn exec_without_returning<C>(self, db: &C) -> Result<u64, DbErr>
277 where
278 C: ConnectionTrait,
279 {
280 if self.empty {
281 return Ok(0);
282 }
283 self.into_one().exec_without_returning(db).await
284 }
285
286 pub async fn exec_with_returning<C>(
288 self,
289 db: &C,
290 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
291 where
292 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
293 C: ConnectionTrait,
294 {
295 if self.empty {
296 return Ok(Vec::new());
297 }
298
299 self.into_one().exec_with_returning_many(db).await
300 }
301
302 #[deprecated(
304 since = "1.2.0",
305 note = "Please use [`InsertMany::exec_with_returning`]"
306 )]
307 pub async fn exec_with_returning_many<C>(
308 self,
309 db: &C,
310 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
311 where
312 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
313 C: ConnectionTrait,
314 {
315 if self.empty {
316 return Ok(Vec::new());
317 }
318
319 self.into_one().exec_with_returning_many(db).await
320 }
321
322 pub async fn exec_with_returning_keys<C>(
324 self,
325 db: &C,
326 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
327 where
328 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
329 C: ConnectionTrait,
330 {
331 if self.empty {
332 return Ok(Vec::new());
333 }
334
335 self.into_one().exec_with_returning_keys(db).await
336 }
337}
338
339impl<A> Inserter<A>
340where
341 A: ActiveModelTrait,
342{
343 pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
345 Self {
346 primary_key,
347 query,
348 model: PhantomData,
349 }
350 }
351
352 pub async fn exec<'a, C>(self, db: &'a C) -> Result<InsertResult<A>, DbErr>
354 where
355 C: ConnectionTrait,
356 A: 'a,
357 {
358 exec_insert(self.primary_key, self.query, db).await
359 }
360
361 pub async fn exec_without_returning<'a, C>(self, db: &'a C) -> Result<u64, DbErr>
363 where
364 C: ConnectionTrait,
365 A: 'a,
366 {
367 exec_insert_without_returning(self.query, db).await
368 }
369
370 pub async fn exec_with_returning<'a, C>(
372 self,
373 db: &'a C,
374 ) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
375 where
376 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
377 C: ConnectionTrait,
378 A: 'a,
379 {
380 exec_insert_with_returning::<A, _>(self.primary_key, self.query, db).await
381 }
382
383 pub async fn exec_with_returning_keys<'a, C>(
385 self,
386 db: &'a C,
387 ) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
388 where
389 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
390 C: ConnectionTrait,
391 A: 'a,
392 {
393 exec_insert_with_returning_keys::<A, _>(self.query, db).await
394 }
395
396 pub async fn exec_with_returning_many<'a, C>(
398 self,
399 db: &'a C,
400 ) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
401 where
402 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
403 C: ConnectionTrait,
404 A: 'a,
405 {
406 exec_insert_with_returning_many::<A, _>(self.query, db).await
407 }
408}
409
410async fn exec_insert<A, C>(
411 primary_key: Option<ValueTuple>,
412 statement: InsertStatement,
413 db: &C,
414) -> Result<InsertResult<A>, DbErr>
415where
416 C: ConnectionTrait,
417 A: ActiveModelTrait,
418{
419 type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
420
421 let db_backend = db.get_database_backend();
422
423 let last_insert_id = match (primary_key, db.support_returning()) {
424 (_, true) => {
425 let mut rows = db.query_all(&statement).await?;
426 let row = match rows.pop() {
427 Some(row) => row,
428 None => return Err(DbErr::RecordNotInserted),
429 };
430 let cols = PrimaryKey::<A>::iter()
431 .map(|col| col.to_string())
432 .collect::<Vec<_>>();
433 row.try_get_many("", cols.as_ref())
434 .map_err(|_| DbErr::UnpackInsertId)?
435 }
436 (Some(value_tuple), false) => {
437 let res = db.execute(&statement).await?;
438 if res.rows_affected() == 0 {
439 return Err(DbErr::RecordNotInserted);
440 }
441 FromValueTuple::from_value_tuple(value_tuple)
442 }
443 (None, false) => {
444 let res = db.execute(&statement).await?;
445 if res.rows_affected() == 0 {
446 return Err(DbErr::RecordNotInserted);
447 }
448 let last_insert_id = res.last_insert_id();
449 if db_backend == DbBackend::MySql && last_insert_id == 0 {
455 return Err(DbErr::RecordNotInserted);
456 }
457 ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
458 }
459 };
460
461 Ok(InsertResult { last_insert_id })
462}
463
464async fn exec_insert_without_returning<C>(
465 insert_statement: InsertStatement,
466 db: &C,
467) -> Result<u64, DbErr>
468where
469 C: ConnectionTrait,
470{
471 let exec_result = db.execute(&insert_statement).await?;
472 Ok(exec_result.rows_affected())
473}
474
475async fn exec_insert_with_returning<A, C>(
476 primary_key: Option<ValueTuple>,
477 mut insert_statement: InsertStatement,
478 db: &C,
479) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
480where
481 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
482 C: ConnectionTrait,
483 A: ActiveModelTrait,
484{
485 let db_backend = db.get_database_backend();
486 let found = match db.support_returning() {
487 true => {
488 let returning = Query::returning().exprs(
489 <A::Entity as EntityTrait>::Column::iter()
490 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
491 );
492 insert_statement.returning(returning);
493 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
494 insert_statement,
495 )
496 .one(db)
497 .await?
498 }
499 false => {
500 let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
501 <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
502 .one(db)
503 .await?
504 }
505 };
506 match found {
507 Some(model) => Ok(model),
508 None => Err(DbErr::RecordNotFound(
509 "Failed to find inserted item".to_owned(),
510 )),
511 }
512}
513
514async fn exec_insert_with_returning_keys<A, C>(
515 mut insert_statement: InsertStatement,
516 db: &C,
517) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
518where
519 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
520 C: ConnectionTrait,
521 A: ActiveModelTrait,
522{
523 let db_backend = db.get_database_backend();
524 match db.support_returning() {
525 true => {
526 insert_statement.returning(returning_pk::<A>(db_backend));
527 let rows = db.query_all(&insert_statement).await?;
528 let cols = PrimaryKey::<A>::iter()
529 .map(|col| col.to_string())
530 .collect::<Vec<_>>();
531 let mut keys = Vec::new();
532 for row in rows {
533 keys.push(
534 row.try_get_many("", cols.as_ref())
535 .map_err(|_| DbErr::UnpackInsertId)?,
536 );
537 }
538 Ok(keys)
539 }
540 false => Err(DbErr::BackendNotSupported {
541 db: db_backend.as_str(),
542 ctx: "INSERT RETURNING",
543 }),
544 }
545}
546
547async fn exec_insert_with_returning_many<A, C>(
548 mut insert_statement: InsertStatement,
549 db: &C,
550) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
551where
552 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
553 C: ConnectionTrait,
554 A: ActiveModelTrait,
555{
556 let db_backend = db.get_database_backend();
557 match db.support_returning() {
558 true => {
559 let returning = Query::returning().exprs(
560 <A::Entity as EntityTrait>::Column::iter()
561 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
562 );
563 insert_statement.returning(returning);
564 ReturningSelector::<SelectModel<<A::Entity as EntityTrait>::Model>, _>::from_query(
565 insert_statement,
566 )
567 .all(db)
568 .await
569 }
570 false => Err(DbErr::BackendNotSupported {
571 db: db_backend.as_str(),
572 ctx: "INSERT RETURNING",
573 }),
574 }
575}
576
577fn returning_pk<A>(db_backend: DbBackend) -> ReturningClause
578where
579 A: ActiveModelTrait,
580{
581 Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
582 c.into_column()
583 .select_as(c.into_column().into_returning_expr(db_backend))
584 }))
585}