surrealdb_core/doc/
insert.rs

1use anyhow::Result;
2use reblessive::tree::Stk;
3
4use super::IgnoreError;
5use crate::ctx::Context;
6use crate::dbs::{Options, Statement};
7use crate::doc::Document;
8use crate::err;
9use crate::err::Error;
10use crate::expr::statements::InsertStatement;
11use crate::val::Value;
12
13impl Document {
14	pub(super) async fn insert(
15		&mut self,
16		stk: &mut Stk,
17		ctx: &Context,
18		opt: &Options,
19		stm: &InsertStatement,
20	) -> Result<Value, IgnoreError> {
21		// Even though we haven't tried to create first this can still not be the
22		// 'initial iteration' if the initial doc is not set.
23		//
24		// If this is not the initial iteration we immediatly skip trying to create and
25		// go straight to updating.
26		if !self.is_iteration_initial() {
27			return self.insert_update(stk, ctx, opt, &Statement::Insert(stm)).await;
28		}
29
30		// is this retryable?
31		// it is retryable when some data is present on the insert statement to update.
32		let retryable = stm.update.is_some();
33		if retryable {
34			// it is retryable so generate a save point we can roll back to.
35			ctx.tx().lock().await.new_save_point();
36		}
37
38		// First try to create the value and if that is not possible due to an existing
39		// value fall back to update instead.
40		//
41		// This is done this way to make the create path fast and take priority over the
42		// update path.
43		let retry = match self.insert_create(stk, ctx, opt, &Statement::Insert(stm)).await {
44			// We received an index exists error, so we
45			// ignore the error, and attempt to update the
46			// record using the ON DUPLICATE KEY UPDATE
47			// clause with the ID received in the error
48			Err(IgnoreError::Error(e)) => match e.downcast_ref::<err::Error>() {
49				Some(Error::IndexExists {
50					..
51				}) => {
52					// if not retryable return the error.
53					//
54					// or if the statement contained a specific record id, we
55					// don't retry to
56					if !retryable || self.is_specific_record_id() {
57						if retryable {
58							ctx.tx().lock().await.rollback_to_save_point().await?;
59						}
60
61						// Ignore flag; disables error.
62						// Error::Ignore is never raised to the user.
63						if stm.ignore {
64							return Err(IgnoreError::Ignore);
65						}
66
67						return Err(IgnoreError::Error(e));
68					}
69					let Ok(Error::IndexExists {
70						thing,
71						..
72					}) = e.downcast()
73					else {
74						// Checked above
75						unreachable!()
76					};
77					thing
78				}
79				// We attempted to INSERT a document with an ID,
80				// and this ID already exists in the database,
81				// so we need to update the record instead using
82				// the ON DUPLICATE KEY UPDATE statement clause
83				Some(Error::RecordExists {
84					..
85				}) => {
86					// if not retryable return the error.
87					if !retryable {
88						// Ignore flag; disables error.
89						// Error::Ignore is never raised to the user.
90						if stm.ignore {
91							return Err(IgnoreError::Ignore);
92						}
93						return Err(IgnoreError::Error(e));
94					}
95					let Ok(Error::RecordExists {
96						thing,
97						..
98					}) = e.downcast()
99					else {
100						// Checked above
101						unreachable!()
102					};
103					thing
104				}
105				_ => {
106					// if retryable we need to do something with the savepoint.
107					if retryable {
108						ctx.tx().lock().await.rollback_to_save_point().await?;
109					}
110					return Err(IgnoreError::Error(e));
111				}
112			},
113			Err(IgnoreError::Ignore) => {
114				if retryable {
115					ctx.tx().lock().await.release_last_save_point()?;
116				}
117				return Err(IgnoreError::Ignore);
118			}
119			Ok(x) => {
120				if retryable {
121					ctx.tx().lock().await.release_last_save_point()?;
122				}
123				return Ok(x);
124			}
125		};
126
127		// Insertion failed so instead do an update.
128		ctx.tx().lock().await.rollback_to_save_point().await?;
129
130		if ctx.is_done(true).await? {
131			// Don't process the document
132			return Err(IgnoreError::Ignore);
133		}
134
135		let (ns, db) = ctx.expect_ns_db_ids(opt).await?;
136		let val = ctx.tx().get_record(ns, db, &retry.table, &retry.key, opt.version).await?;
137
138		self.modify_for_update_retry(retry, val);
139
140		// we restarted, so we might need to generate a record id again?
141		self.generate_record_id(stk, ctx, opt, &Statement::Insert(stm)).await?;
142
143		self.insert_update(stk, ctx, opt, &Statement::Insert(stm)).await
144	}
145
146	/// Attempt to run an INSERT statement to
147	/// create a record which does not exist
148	async fn insert_create(
149		&mut self,
150		stk: &mut Stk,
151		ctx: &Context,
152		opt: &Options,
153		stm: &Statement<'_>,
154	) -> Result<Value, IgnoreError> {
155		self.check_permissions_quick(stk, ctx, opt, stm).await?;
156		self.check_table_type(ctx, opt, stm).await?;
157		self.check_data_fields(stk, ctx, opt, stm).await?;
158		self.process_merge_data().await?;
159		self.store_edges_data(ctx, opt, stm).await?;
160		self.process_table_fields(stk, ctx, opt, stm).await?;
161		self.cleanup_table_fields(ctx, opt, stm).await?;
162		self.default_record_data(ctx, opt, stm).await?;
163		self.check_permissions_table(stk, ctx, opt, stm).await?;
164		self.store_record_data(ctx, opt, stm).await?;
165		self.store_index_data(stk, ctx, opt, stm).await?;
166		self.process_table_views(stk, ctx, opt, stm).await?;
167		self.process_table_lives(stk, ctx, opt, stm).await?;
168		self.process_table_events(stk, ctx, opt, stm).await?;
169		self.process_changefeeds(ctx, opt, stm).await?;
170		self.pluck(stk, ctx, opt, stm).await
171	}
172	/// Attempt to run an INSERT statement to
173	/// update a record which already exists
174	async fn insert_update(
175		&mut self,
176		stk: &mut Stk,
177		ctx: &Context,
178		opt: &Options,
179		stm: &Statement<'_>,
180	) -> Result<Value, IgnoreError> {
181		self.check_permissions_quick(stk, ctx, opt, stm).await?;
182		self.check_table_type(ctx, opt, stm).await?;
183		self.check_data_fields(stk, ctx, opt, stm).await?;
184		self.check_permissions_table(stk, ctx, opt, stm).await?;
185		self.process_record_data(stk, ctx, opt, stm).await?;
186		self.process_table_fields(stk, ctx, opt, stm).await?;
187		self.cleanup_table_fields(ctx, opt, stm).await?;
188		self.default_record_data(ctx, opt, stm).await?;
189		self.check_permissions_table(stk, ctx, opt, stm).await?;
190		self.store_record_data(ctx, opt, stm).await?;
191		self.store_index_data(stk, ctx, opt, stm).await?;
192		self.process_table_views(stk, ctx, opt, stm).await?;
193		self.process_table_lives(stk, ctx, opt, stm).await?;
194		self.process_table_events(stk, ctx, opt, stm).await?;
195		self.process_changefeeds(ctx, opt, stm).await?;
196		self.pluck(stk, ctx, opt, stm).await
197	}
198}