1use anyhow::Result;
2use reblessive::tree::Stk;
3
4use super::IgnoreError;
5use crate::ctx::Context;
6use crate::dbs::{Options, Statement};
7use crate::doc::Document;
8use crate::err;
9use crate::err::Error;
10use crate::expr::statements::InsertStatement;
11use crate::val::Value;
12
13impl Document {
14 pub(super) async fn insert(
15 &mut self,
16 stk: &mut Stk,
17 ctx: &Context,
18 opt: &Options,
19 stm: &InsertStatement,
20 ) -> Result<Value, IgnoreError> {
21 if !self.is_iteration_initial() {
27 return self.insert_update(stk, ctx, opt, &Statement::Insert(stm)).await;
28 }
29
30 let retryable = stm.update.is_some();
33 if retryable {
34 ctx.tx().lock().await.new_save_point();
36 }
37
38 let retry = match self.insert_create(stk, ctx, opt, &Statement::Insert(stm)).await {
44 Err(IgnoreError::Error(e)) => match e.downcast_ref::<err::Error>() {
49 Some(Error::IndexExists {
50 ..
51 }) => {
52 if !retryable || self.is_specific_record_id() {
57 if retryable {
58 ctx.tx().lock().await.rollback_to_save_point().await?;
59 }
60
61 if stm.ignore {
64 return Err(IgnoreError::Ignore);
65 }
66
67 return Err(IgnoreError::Error(e));
68 }
69 let Ok(Error::IndexExists {
70 thing,
71 ..
72 }) = e.downcast()
73 else {
74 unreachable!()
76 };
77 thing
78 }
79 Some(Error::RecordExists {
84 ..
85 }) => {
86 if !retryable {
88 if stm.ignore {
91 return Err(IgnoreError::Ignore);
92 }
93 return Err(IgnoreError::Error(e));
94 }
95 let Ok(Error::RecordExists {
96 thing,
97 ..
98 }) = e.downcast()
99 else {
100 unreachable!()
102 };
103 thing
104 }
105 _ => {
106 if retryable {
108 ctx.tx().lock().await.rollback_to_save_point().await?;
109 }
110 return Err(IgnoreError::Error(e));
111 }
112 },
113 Err(IgnoreError::Ignore) => {
114 if retryable {
115 ctx.tx().lock().await.release_last_save_point()?;
116 }
117 return Err(IgnoreError::Ignore);
118 }
119 Ok(x) => {
120 if retryable {
121 ctx.tx().lock().await.release_last_save_point()?;
122 }
123 return Ok(x);
124 }
125 };
126
127 ctx.tx().lock().await.rollback_to_save_point().await?;
129
130 if ctx.is_done(true).await? {
131 return Err(IgnoreError::Ignore);
133 }
134
135 let (ns, db) = ctx.expect_ns_db_ids(opt).await?;
136 let val = ctx.tx().get_record(ns, db, &retry.table, &retry.key, opt.version).await?;
137
138 self.modify_for_update_retry(retry, val);
139
140 self.generate_record_id(stk, ctx, opt, &Statement::Insert(stm)).await?;
142
143 self.insert_update(stk, ctx, opt, &Statement::Insert(stm)).await
144 }
145
146 async fn insert_create(
149 &mut self,
150 stk: &mut Stk,
151 ctx: &Context,
152 opt: &Options,
153 stm: &Statement<'_>,
154 ) -> Result<Value, IgnoreError> {
155 self.check_permissions_quick(stk, ctx, opt, stm).await?;
156 self.check_table_type(ctx, opt, stm).await?;
157 self.check_data_fields(stk, ctx, opt, stm).await?;
158 self.process_merge_data().await?;
159 self.store_edges_data(ctx, opt, stm).await?;
160 self.process_table_fields(stk, ctx, opt, stm).await?;
161 self.cleanup_table_fields(ctx, opt, stm).await?;
162 self.default_record_data(ctx, opt, stm).await?;
163 self.check_permissions_table(stk, ctx, opt, stm).await?;
164 self.store_record_data(ctx, opt, stm).await?;
165 self.store_index_data(stk, ctx, opt, stm).await?;
166 self.process_table_views(stk, ctx, opt, stm).await?;
167 self.process_table_lives(stk, ctx, opt, stm).await?;
168 self.process_table_events(stk, ctx, opt, stm).await?;
169 self.process_changefeeds(ctx, opt, stm).await?;
170 self.pluck(stk, ctx, opt, stm).await
171 }
172 async fn insert_update(
175 &mut self,
176 stk: &mut Stk,
177 ctx: &Context,
178 opt: &Options,
179 stm: &Statement<'_>,
180 ) -> Result<Value, IgnoreError> {
181 self.check_permissions_quick(stk, ctx, opt, stm).await?;
182 self.check_table_type(ctx, opt, stm).await?;
183 self.check_data_fields(stk, ctx, opt, stm).await?;
184 self.check_permissions_table(stk, ctx, opt, stm).await?;
185 self.process_record_data(stk, ctx, opt, stm).await?;
186 self.process_table_fields(stk, ctx, opt, stm).await?;
187 self.cleanup_table_fields(ctx, opt, stm).await?;
188 self.default_record_data(ctx, opt, stm).await?;
189 self.check_permissions_table(stk, ctx, opt, stm).await?;
190 self.store_record_data(ctx, opt, stm).await?;
191 self.store_index_data(stk, ctx, opt, stm).await?;
192 self.process_table_views(stk, ctx, opt, stm).await?;
193 self.process_table_lives(stk, ctx, opt, stm).await?;
194 self.process_table_events(stk, ctx, opt, stm).await?;
195 self.process_changefeeds(ctx, opt, stm).await?;
196 self.pluck(stk, ctx, opt, stm).await
197 }
198}