surrealdb_core/expr/statements/
insert.rs

1use std::fmt;
2
3use anyhow::{Result, bail, ensure};
4use reblessive::tree::Stk;
5use revision::revisioned;
6use serde::{Deserialize, Serialize};
7
8use crate::ctx::{Context, MutableContext};
9use crate::dbs::{Iterable, Iterator, Options, Statement};
10use crate::doc::CursorDoc;
11use crate::err::Error;
12use crate::expr::paths::{IN, OUT};
13use crate::expr::{Data, Expr, FlowResultExt as _, Output, Timeout, Value};
14use crate::idx::planner::RecordStrategy;
15use crate::val::{Datetime, RecordId, Table};
16
17#[revisioned(revision = 1)]
18#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Hash)]
19pub struct InsertStatement {
20	pub into: Option<Expr>,
21	pub data: Data,
22	/// Does the statement have the ignore clause.
23	pub ignore: bool,
24	pub update: Option<Data>,
25	pub output: Option<Output>,
26	pub timeout: Option<Timeout>,
27	pub parallel: bool,
28	pub relation: bool,
29	pub version: Option<Expr>,
30}
31
32impl InsertStatement {
33	/// Process this type returning a computed simple Value
34	pub(crate) async fn compute(
35		&self,
36		stk: &mut Stk,
37		ctx: &Context,
38		opt: &Options,
39		doc: Option<&CursorDoc>,
40	) -> Result<Value> {
41		// Valid options?
42		opt.valid_for_db()?;
43		// Create a new iterator
44		let mut i = Iterator::new();
45		// Propagate the version to the underlying datastore
46		let version = match &self.version {
47			Some(v) => Some(
48				stk.run(|stk| v.compute(stk, ctx, opt, doc))
49					.await
50					.catch_return()?
51					.cast_to::<Datetime>()?
52					.to_version_stamp()?,
53			),
54			_ => None,
55		};
56		let opt = &opt.clone().with_version(version);
57		// Check if there is a timeout
58		let ctx = match self.timeout.as_ref() {
59			Some(timeout) => {
60				let mut ctx = MutableContext::new(ctx);
61				ctx.add_timeout(*timeout.0)?;
62				ctx.freeze()
63			}
64			None => ctx.clone(),
65		};
66		// Parse the INTO expression
67		let into = match &self.into {
68			None => None,
69			Some(into) => {
70				match stk.run(|stk| into.compute(stk, &ctx, opt, doc)).await.catch_return()? {
71					Value::Table(into) => Some(into),
72					v => {
73						bail!(Error::InsertStatement {
74							value: v.to_string(),
75						})
76					}
77				}
78			}
79		};
80
81		// Parse the data expression
82		match &self.data {
83			// Check if this is a traditional statement
84			Data::ValuesExpression(v) => {
85				for v in v {
86					// Create a new empty base object
87					let mut o = Value::empty_object();
88					// Set each field from the expression
89					for (k, v) in v.iter() {
90						let v =
91							stk.run(|stk| v.compute(stk, &ctx, opt, None)).await.catch_return()?;
92						o.set(stk, &ctx, opt, k, v).await?;
93					}
94					// Specify the new table record id
95					let id = gen_id(&o, &into)?;
96					// Pass the value to the iterator
97					i.ingest(iterable(id, o, self.relation)?)
98				}
99			}
100			// Check if this is a modern statement
101			Data::SingleExpression(v) => {
102				let v = stk.run(|stk| v.compute(stk, &ctx, opt, doc)).await.catch_return()?;
103				match v {
104					Value::Array(v) => {
105						for v in v {
106							// Specify the new table record id
107							let id = gen_id(&v, &into)?;
108							// Pass the value to the iterator
109							i.ingest(iterable(id, v, self.relation)?)
110						}
111					}
112					Value::Object(_) => {
113						// Specify the new table record id
114						let id = gen_id(&v, &into)?;
115						// Pass the value to the iterator
116						i.ingest(iterable(id, v, self.relation)?)
117					}
118					v => {
119						bail!(Error::InsertStatement {
120							value: v.to_string(),
121						})
122					}
123				}
124			}
125			v => fail!("Unknown data clause type in INSERT statement: {v:?}"),
126		}
127		// Assign the statement
128		let stm = Statement::from(self);
129
130		// Ensure the database exists.
131		ctx.get_db(opt).await?;
132
133		// Process the statement
134		let res = i.output(stk, &ctx, opt, &stm, RecordStrategy::KeysAndValues).await?;
135		// Catch statement timeout
136		ensure!(!ctx.is_timedout().await?, Error::QueryTimedout);
137		// Output the results
138		Ok(res)
139	}
140}
141
142impl fmt::Display for InsertStatement {
143	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144		f.write_str("INSERT")?;
145		if self.relation {
146			f.write_str(" RELATION")?
147		}
148		if self.ignore {
149			f.write_str(" IGNORE")?
150		}
151		if let Some(into) = &self.into {
152			write!(f, " INTO {}", into)?;
153		}
154		write!(f, " {}", self.data)?;
155		if let Some(ref v) = self.update {
156			write!(f, " {v}")?
157		}
158		if let Some(ref v) = self.output {
159			write!(f, " {v}")?
160		}
161		if let Some(ref v) = self.version {
162			write!(f, "VERSION {v}")?
163		}
164		if let Some(ref v) = self.timeout {
165			write!(f, " {v}")?
166		}
167		if self.parallel {
168			f.write_str(" PARALLEL")?
169		}
170		Ok(())
171	}
172}
173
174fn iterable(id: RecordId, v: Value, relation: bool) -> Result<Iterable> {
175	if relation {
176		let f = match v.pick(&*IN) {
177			Value::RecordId(v) => v,
178			v => {
179				bail!(Error::InsertStatementIn {
180					value: v.to_string(),
181				})
182			}
183		};
184		let w = match v.pick(&*OUT) {
185			Value::RecordId(v) => v,
186			v => {
187				bail!(Error::InsertStatementOut {
188					value: v.to_string(),
189				})
190			}
191		};
192		Ok(Iterable::Relatable(f, id, w, Some(v)))
193	} else {
194		Ok(Iterable::Mergeable(id, v))
195	}
196}
197
198fn gen_id(v: &Value, into: &Option<Table>) -> Result<RecordId> {
199	match into {
200		Some(into) => v.rid().generate(into.clone().into_strand(), true),
201		None => match v.rid() {
202			Value::RecordId(v) => Ok(v),
203			v => Err(anyhow::Error::new(Error::InsertStatementId {
204				value: v.to_string(),
205			})),
206		},
207	}
208}