1use crate::{
2 error::*, ActiveModelTrait, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, Insert,
3 IntoActiveModel, Iterable, PrimaryKeyToColumn, PrimaryKeyTrait, SelectModel, SelectorRaw,
4 TryFromU64, TryInsert,
5};
6use sea_query::{FromValueTuple, Iden, InsertStatement, Query, ValueTuple};
7use std::{future::Future, marker::PhantomData};
8
9type PrimaryKey<A> = <<A as ActiveModelTrait>::Entity as EntityTrait>::PrimaryKey;
10
11#[derive(Debug)]
13pub struct Inserter<A>
14where
15 A: ActiveModelTrait,
16{
17 primary_key: Option<ValueTuple>,
18 query: InsertStatement,
19 model: PhantomData<A>,
20}
21
22#[derive(Debug)]
24pub struct InsertResult<A>
25where
26 A: ActiveModelTrait,
27{
28 pub last_insert_id: <PrimaryKey<A> as PrimaryKeyTrait>::ValueType,
30}
31
32#[derive(Debug)]
34pub enum TryInsertResult<T> {
35 Empty,
37 Conflicted,
39 Inserted(T),
41}
42
43impl<A> TryInsert<A>
44where
45 A: ActiveModelTrait,
46{
47 #[allow(unused_mut)]
49 pub async fn exec<'a, C>(self, db: &'a C) -> Result<TryInsertResult<InsertResult<A>>, DbErr>
50 where
51 C: ConnectionTrait,
52 A: 'a,
53 {
54 if self.insert_struct.columns.is_empty() {
55 return Ok(TryInsertResult::Empty);
56 }
57 let res = self.insert_struct.exec(db).await;
58 match res {
59 Ok(res) => Ok(TryInsertResult::Inserted(res)),
60 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
61 Err(err) => Err(err),
62 }
63 }
64
65 pub async fn exec_without_returning<'a, C>(
68 self,
69 db: &'a C,
70 ) -> Result<TryInsertResult<u64>, DbErr>
71 where
72 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
73 C: ConnectionTrait,
74 A: 'a,
75 {
76 if self.insert_struct.columns.is_empty() {
77 return Ok(TryInsertResult::Empty);
78 }
79 let res = self.insert_struct.exec_without_returning(db).await;
80 match res {
81 Ok(res) => Ok(TryInsertResult::Inserted(res)),
82 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
83 Err(err) => Err(err),
84 }
85 }
86
87 pub async fn exec_with_returning<'a, C>(
89 self,
90 db: &'a C,
91 ) -> Result<TryInsertResult<<A::Entity as EntityTrait>::Model>, DbErr>
92 where
93 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
94 C: ConnectionTrait,
95 A: 'a,
96 {
97 if self.insert_struct.columns.is_empty() {
98 return Ok(TryInsertResult::Empty);
99 }
100 let res = self.insert_struct.exec_with_returning(db).await;
101 match res {
102 Ok(res) => Ok(TryInsertResult::Inserted(res)),
103 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
104 Err(err) => Err(err),
105 }
106 }
107
108 pub async fn exec_with_returning_keys<'a, C>(
114 self,
115 db: &'a C,
116 ) -> Result<TryInsertResult<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>>, DbErr>
117 where
118 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
119 C: ConnectionTrait,
120 A: 'a,
121 {
122 if self.insert_struct.columns.is_empty() {
123 return Ok(TryInsertResult::Empty);
124 }
125
126 let res = self.insert_struct.exec_with_returning_keys(db).await;
127 match res {
128 Ok(res) => Ok(TryInsertResult::Inserted(res)),
129 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
130 Err(err) => Err(err),
131 }
132 }
133
134 pub async fn exec_with_returning_many<'a, C>(
140 self,
141 db: &'a C,
142 ) -> Result<TryInsertResult<Vec<<A::Entity as EntityTrait>::Model>>, DbErr>
143 where
144 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
145 C: ConnectionTrait,
146 A: 'a,
147 {
148 if self.insert_struct.columns.is_empty() {
149 return Ok(TryInsertResult::Empty);
150 }
151
152 let res = self.insert_struct.exec_with_returning_many(db).await;
153 match res {
154 Ok(res) => Ok(TryInsertResult::Inserted(res)),
155 Err(DbErr::RecordNotInserted) => Ok(TryInsertResult::Conflicted),
156 Err(err) => Err(err),
157 }
158 }
159}
160
161impl<A> Insert<A>
162where
163 A: ActiveModelTrait,
164{
165 #[allow(unused_mut)]
167 pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
168 where
169 C: ConnectionTrait,
170 A: 'a,
171 {
172 let mut query = self.query;
174 if db.support_returning() {
175 let db_backend = db.get_database_backend();
176 let returning =
177 Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
178 c.into_column()
179 .select_as(c.into_column().into_returning_expr(db_backend))
180 }));
181 query.returning(returning);
182 }
183 Inserter::<A>::new(self.primary_key, query).exec(db)
184 }
185
186 pub fn exec_without_returning<'a, C>(
189 self,
190 db: &'a C,
191 ) -> impl Future<Output = Result<u64, DbErr>> + 'a
192 where
193 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
194 C: ConnectionTrait,
195 A: 'a,
196 {
197 Inserter::<A>::new(self.primary_key, self.query).exec_without_returning(db)
198 }
199
200 pub fn exec_with_returning<'a, C>(
205 self,
206 db: &'a C,
207 ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
208 where
209 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
210 C: ConnectionTrait,
211 A: 'a,
212 {
213 Inserter::<A>::new(self.primary_key, self.query).exec_with_returning(db)
214 }
215
216 pub fn exec_with_returning_keys<'a, C>(
222 self,
223 db: &'a C,
224 ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
225 where
226 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
227 C: ConnectionTrait,
228 A: 'a,
229 {
230 Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_keys(db)
231 }
232
233 pub fn exec_with_returning_many<'a, C>(
239 self,
240 db: &'a C,
241 ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
242 where
243 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
244 C: ConnectionTrait,
245 A: 'a,
246 {
247 Inserter::<A>::new(self.primary_key, self.query).exec_with_returning_many(db)
248 }
249}
250
251impl<A> Inserter<A>
252where
253 A: ActiveModelTrait,
254{
255 pub fn new(primary_key: Option<ValueTuple>, query: InsertStatement) -> Self {
257 Self {
258 primary_key,
259 query,
260 model: PhantomData,
261 }
262 }
263
264 pub fn exec<'a, C>(self, db: &'a C) -> impl Future<Output = Result<InsertResult<A>, DbErr>> + 'a
266 where
267 C: ConnectionTrait,
268 A: 'a,
269 {
270 exec_insert(self.primary_key, self.query, db)
271 }
272
273 pub fn exec_without_returning<'a, C>(
275 self,
276 db: &'a C,
277 ) -> impl Future<Output = Result<u64, DbErr>> + 'a
278 where
279 C: ConnectionTrait,
280 A: 'a,
281 {
282 exec_insert_without_returning(self.query, db)
283 }
284
285 pub fn exec_with_returning<'a, C>(
287 self,
288 db: &'a C,
289 ) -> impl Future<Output = Result<<A::Entity as EntityTrait>::Model, DbErr>> + 'a
290 where
291 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
292 C: ConnectionTrait,
293 A: 'a,
294 {
295 exec_insert_with_returning::<A, _>(self.primary_key, self.query, db)
296 }
297
298 pub fn exec_with_returning_keys<'a, C>(
304 self,
305 db: &'a C,
306 ) -> impl Future<Output = Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>> + 'a
307 where
308 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
309 C: ConnectionTrait,
310 A: 'a,
311 {
312 exec_insert_with_returning_keys::<A, _>(self.query, db)
313 }
314
315 pub fn exec_with_returning_many<'a, C>(
321 self,
322 db: &'a C,
323 ) -> impl Future<Output = Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>> + 'a
324 where
325 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
326 C: ConnectionTrait,
327 A: 'a,
328 {
329 exec_insert_with_returning_many::<A, _>(self.query, db)
330 }
331}
332
333async fn exec_insert<A, C>(
334 primary_key: Option<ValueTuple>,
335 statement: InsertStatement,
336 db: &C,
337) -> Result<InsertResult<A>, DbErr>
338where
339 C: ConnectionTrait,
340 A: ActiveModelTrait,
341{
342 type ValueTypeOf<A> = <PrimaryKey<A> as PrimaryKeyTrait>::ValueType;
343
344 let db_backend = db.get_database_backend();
345 let statement = db_backend.build(&statement);
346
347 let last_insert_id = match (primary_key, db.support_returning()) {
348 (Some(value_tuple), _) => {
349 let res = db.execute(statement).await?;
350 if res.rows_affected() == 0 {
351 return Err(DbErr::RecordNotInserted);
352 }
353 FromValueTuple::from_value_tuple(value_tuple)
354 }
355 (None, true) => {
356 let mut rows = db.query_all(statement).await?;
357 let row = match rows.pop() {
358 Some(row) => row,
359 None => return Err(DbErr::RecordNotInserted),
360 };
361 let cols = PrimaryKey::<A>::iter()
362 .map(|col| col.to_string())
363 .collect::<Vec<_>>();
364 row.try_get_many("", cols.as_ref())
365 .map_err(|_| DbErr::UnpackInsertId)?
366 }
367 (None, false) => {
368 let res = db.execute(statement).await?;
369 if res.rows_affected() == 0 {
370 return Err(DbErr::RecordNotInserted);
371 }
372 let last_insert_id = res.last_insert_id();
373 if db_backend == DbBackend::MySql && last_insert_id == 0 {
379 return Err(DbErr::RecordNotInserted);
380 }
381 ValueTypeOf::<A>::try_from_u64(last_insert_id).map_err(|_| DbErr::UnpackInsertId)?
382 }
383 };
384
385 Ok(InsertResult { last_insert_id })
386}
387
388async fn exec_insert_without_returning<C>(
389 insert_statement: InsertStatement,
390 db: &C,
391) -> Result<u64, DbErr>
392where
393 C: ConnectionTrait,
394{
395 let db_backend = db.get_database_backend();
396 let insert_statement = db_backend.build(&insert_statement);
397 let exec_result = db.execute(insert_statement).await?;
398 Ok(exec_result.rows_affected())
399}
400
401async fn exec_insert_with_returning<A, C>(
402 primary_key: Option<ValueTuple>,
403 mut insert_statement: InsertStatement,
404 db: &C,
405) -> Result<<A::Entity as EntityTrait>::Model, DbErr>
406where
407 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
408 C: ConnectionTrait,
409 A: ActiveModelTrait,
410{
411 let db_backend = db.get_database_backend();
412 let found = match db.support_returning() {
413 true => {
414 let returning = Query::returning().exprs(
415 <A::Entity as EntityTrait>::Column::iter()
416 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
417 );
418 insert_statement.returning(returning);
419 let insert_statement = db_backend.build(&insert_statement);
420 SelectorRaw::<SelectModel<<A::Entity as EntityTrait>::Model>>::from_statement(
421 insert_statement,
422 )
423 .one(db)
424 .await?
425 }
426 false => {
427 let insert_res = exec_insert::<A, _>(primary_key, insert_statement, db).await?;
428 <A::Entity as EntityTrait>::find_by_id(insert_res.last_insert_id)
429 .one(db)
430 .await?
431 }
432 };
433 match found {
434 Some(model) => Ok(model),
435 None => Err(DbErr::RecordNotFound(
436 "Failed to find inserted item".to_owned(),
437 )),
438 }
439}
440
441async fn exec_insert_with_returning_keys<A, C>(
442 mut insert_statement: InsertStatement,
443 db: &C,
444) -> Result<Vec<<PrimaryKey<A> as PrimaryKeyTrait>::ValueType>, DbErr>
445where
446 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
447 C: ConnectionTrait,
448 A: ActiveModelTrait,
449{
450 let db_backend = db.get_database_backend();
451 match db.support_returning() {
452 true => {
453 let returning =
454 Query::returning().exprs(<A::Entity as EntityTrait>::PrimaryKey::iter().map(|c| {
455 c.into_column()
456 .select_as(c.into_column().into_returning_expr(db_backend))
457 }));
458 insert_statement.returning(returning);
459 let statement = db_backend.build(&insert_statement);
460 let rows = db.query_all(statement).await?;
461 let cols = PrimaryKey::<A>::iter()
462 .map(|col| col.to_string())
463 .collect::<Vec<_>>();
464 let mut keys = Vec::new();
465 for row in rows {
466 keys.push(
467 row.try_get_many("", cols.as_ref())
468 .map_err(|_| DbErr::UnpackInsertId)?,
469 );
470 }
471 Ok(keys)
472 }
473 false => unimplemented!("Database backend doesn't support RETURNING"),
474 }
475}
476
477async fn exec_insert_with_returning_many<A, C>(
478 mut insert_statement: InsertStatement,
479 db: &C,
480) -> Result<Vec<<A::Entity as EntityTrait>::Model>, DbErr>
481where
482 <A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
483 C: ConnectionTrait,
484 A: ActiveModelTrait,
485{
486 let db_backend = db.get_database_backend();
487 match db.support_returning() {
488 true => {
489 let returning = Query::returning().exprs(
490 <A::Entity as EntityTrait>::Column::iter()
491 .map(|c| c.select_as(c.into_returning_expr(db_backend))),
492 );
493 insert_statement.returning(returning);
494 let insert_statement = db_backend.build(&insert_statement);
495 SelectorRaw::<SelectModel<<A::Entity as EntityTrait>::Model>>::from_statement(
496 insert_statement,
497 )
498 .all(db)
499 .await
500 }
501 false => unimplemented!("Database backend doesn't support RETURNING"),
502 }
503}