surrealdb_core/expr/statements/
insert.rs1use std::fmt;
2
3use anyhow::{Result, bail, ensure};
4use reblessive::tree::Stk;
5use revision::revisioned;
6use serde::{Deserialize, Serialize};
7
8use crate::ctx::{Context, MutableContext};
9use crate::dbs::{Iterable, Iterator, Options, Statement};
10use crate::doc::CursorDoc;
11use crate::err::Error;
12use crate::expr::paths::{IN, OUT};
13use crate::expr::{Data, Expr, FlowResultExt as _, Output, Timeout, Value};
14use crate::idx::planner::RecordStrategy;
15use crate::val::{Datetime, RecordId, Table};
16
17#[revisioned(revision = 1)]
18#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Hash)]
19pub struct InsertStatement {
20 pub into: Option<Expr>,
21 pub data: Data,
22 pub ignore: bool,
24 pub update: Option<Data>,
25 pub output: Option<Output>,
26 pub timeout: Option<Timeout>,
27 pub parallel: bool,
28 pub relation: bool,
29 pub version: Option<Expr>,
30}
31
32impl InsertStatement {
33 pub(crate) async fn compute(
35 &self,
36 stk: &mut Stk,
37 ctx: &Context,
38 opt: &Options,
39 doc: Option<&CursorDoc>,
40 ) -> Result<Value> {
41 opt.valid_for_db()?;
43 let mut i = Iterator::new();
45 let version = match &self.version {
47 Some(v) => Some(
48 stk.run(|stk| v.compute(stk, ctx, opt, doc))
49 .await
50 .catch_return()?
51 .cast_to::<Datetime>()?
52 .to_version_stamp()?,
53 ),
54 _ => None,
55 };
56 let opt = &opt.clone().with_version(version);
57 let ctx = match self.timeout.as_ref() {
59 Some(timeout) => {
60 let mut ctx = MutableContext::new(ctx);
61 ctx.add_timeout(*timeout.0)?;
62 ctx.freeze()
63 }
64 None => ctx.clone(),
65 };
66 let into = match &self.into {
68 None => None,
69 Some(into) => {
70 match stk.run(|stk| into.compute(stk, &ctx, opt, doc)).await.catch_return()? {
71 Value::Table(into) => Some(into),
72 v => {
73 bail!(Error::InsertStatement {
74 value: v.to_string(),
75 })
76 }
77 }
78 }
79 };
80
81 match &self.data {
83 Data::ValuesExpression(v) => {
85 for v in v {
86 let mut o = Value::empty_object();
88 for (k, v) in v.iter() {
90 let v =
91 stk.run(|stk| v.compute(stk, &ctx, opt, None)).await.catch_return()?;
92 o.set(stk, &ctx, opt, k, v).await?;
93 }
94 let id = gen_id(&o, &into)?;
96 i.ingest(iterable(id, o, self.relation)?)
98 }
99 }
100 Data::SingleExpression(v) => {
102 let v = stk.run(|stk| v.compute(stk, &ctx, opt, doc)).await.catch_return()?;
103 match v {
104 Value::Array(v) => {
105 for v in v {
106 let id = gen_id(&v, &into)?;
108 i.ingest(iterable(id, v, self.relation)?)
110 }
111 }
112 Value::Object(_) => {
113 let id = gen_id(&v, &into)?;
115 i.ingest(iterable(id, v, self.relation)?)
117 }
118 v => {
119 bail!(Error::InsertStatement {
120 value: v.to_string(),
121 })
122 }
123 }
124 }
125 v => fail!("Unknown data clause type in INSERT statement: {v:?}"),
126 }
127 let stm = Statement::from(self);
129
130 ctx.get_db(opt).await?;
132
133 let res = i.output(stk, &ctx, opt, &stm, RecordStrategy::KeysAndValues).await?;
135 ensure!(!ctx.is_timedout().await?, Error::QueryTimedout);
137 Ok(res)
139 }
140}
141
142impl fmt::Display for InsertStatement {
143 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144 f.write_str("INSERT")?;
145 if self.relation {
146 f.write_str(" RELATION")?
147 }
148 if self.ignore {
149 f.write_str(" IGNORE")?
150 }
151 if let Some(into) = &self.into {
152 write!(f, " INTO {}", into)?;
153 }
154 write!(f, " {}", self.data)?;
155 if let Some(ref v) = self.update {
156 write!(f, " {v}")?
157 }
158 if let Some(ref v) = self.output {
159 write!(f, " {v}")?
160 }
161 if let Some(ref v) = self.version {
162 write!(f, "VERSION {v}")?
163 }
164 if let Some(ref v) = self.timeout {
165 write!(f, " {v}")?
166 }
167 if self.parallel {
168 f.write_str(" PARALLEL")?
169 }
170 Ok(())
171 }
172}
173
174fn iterable(id: RecordId, v: Value, relation: bool) -> Result<Iterable> {
175 if relation {
176 let f = match v.pick(&*IN) {
177 Value::RecordId(v) => v,
178 v => {
179 bail!(Error::InsertStatementIn {
180 value: v.to_string(),
181 })
182 }
183 };
184 let w = match v.pick(&*OUT) {
185 Value::RecordId(v) => v,
186 v => {
187 bail!(Error::InsertStatementOut {
188 value: v.to_string(),
189 })
190 }
191 };
192 Ok(Iterable::Relatable(f, id, w, Some(v)))
193 } else {
194 Ok(Iterable::Mergeable(id, v))
195 }
196}
197
198fn gen_id(v: &Value, into: &Option<Table>) -> Result<RecordId> {
199 match into {
200 Some(into) => v.rid().generate(into.clone().into_strand(), true),
201 None => match v.rid() {
202 Value::RecordId(v) => Ok(v),
203 v => Err(anyhow::Error::new(Error::InsertStatementId {
204 value: v.to_string(),
205 })),
206 },
207 }
208}