surrealdb_core/sql/statements/
upsert.rs

1use crate::ctx::Context;
2use crate::dbs::{Iterator, Options, Statement};
3use crate::doc::CursorDoc;
4use crate::err::Error;
5use crate::idx::planner::{QueryPlanner, RecordStrategy, StatementContext};
6use crate::sql::{Cond, Data, Explain, Output, Timeout, Value, Values, With};
7
8use reblessive::tree::Stk;
9use revision::revisioned;
10use serde::{Deserialize, Serialize};
11use std::fmt;
12
13#[revisioned(revision = 2)]
14#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
15#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
16#[non_exhaustive]
17pub struct UpsertStatement {
18	pub only: bool,
19	pub what: Values,
20	#[revision(start = 2)]
21	pub with: Option<With>,
22	pub data: Option<Data>,
23	pub cond: Option<Cond>,
24	pub output: Option<Output>,
25	pub timeout: Option<Timeout>,
26	pub parallel: bool,
27	#[revision(start = 2)]
28	pub explain: Option<Explain>,
29}
30
31impl UpsertStatement {
32	/// Check if we require a writeable transaction
33	pub(crate) fn writeable(&self) -> bool {
34		true
35	}
36	/// Process this type returning a computed simple Value
37	pub(crate) async fn compute(
38		&self,
39		stk: &mut Stk,
40		ctx: &Context,
41		opt: &Options,
42		doc: Option<&CursorDoc>,
43	) -> Result<Value, Error> {
44		// Valid options?
45		opt.valid_for_db()?;
46		// Create a new iterator
47		let mut i = Iterator::new();
48		// Assign the statement
49		let stm = Statement::from(self);
50		// Ensure futures are stored
51		let opt = &opt.new_with_futures(false);
52		// Check if there is a timeout
53		let ctx = stm.setup_timeout(ctx)?;
54		// Get a query planner
55		let mut planner = QueryPlanner::new();
56		let stm_ctx = StatementContext::new(&ctx, opt, &stm)?;
57		// Loop over the upsert targets
58		for w in self.what.0.iter() {
59			let v = w.compute(stk, &ctx, opt, doc).await?;
60			i.prepare(stk, &mut planner, &stm_ctx, v).await.map_err(|e| match e {
61				Error::InvalidStatementTarget {
62					value: v,
63				} => Error::UpsertStatement {
64					value: v,
65				},
66				e => e,
67			})?;
68		}
69		// Attach the query planner to the context
70		let ctx = stm.setup_query_planner(planner, ctx);
71		// Process the statement
72		let res = i.output(stk, &ctx, opt, &stm, RecordStrategy::KeysAndValues).await?;
73		// Catch statement timeout
74		if ctx.is_timedout() {
75			return Err(Error::QueryTimedout);
76		}
77		// Output the results
78		match res {
79			// This is a single record result
80			Value::Array(mut a) if self.only => match a.len() {
81				// There was exactly one result
82				1 => Ok(a.remove(0)),
83				// There were no results
84				_ => Err(Error::SingleOnlyOutput),
85			},
86			// This is standard query result
87			v => Ok(v),
88		}
89	}
90}
91
92impl fmt::Display for UpsertStatement {
93	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
94		write!(f, "UPSERT")?;
95		if self.only {
96			f.write_str(" ONLY")?
97		}
98		write!(f, " {}", self.what)?;
99		if let Some(ref v) = self.with {
100			write!(f, " {v}")?
101		}
102		if let Some(ref v) = self.data {
103			write!(f, " {v}")?
104		}
105		if let Some(ref v) = self.cond {
106			write!(f, " {v}")?
107		}
108		if let Some(ref v) = self.output {
109			write!(f, " {v}")?
110		}
111		if let Some(ref v) = self.timeout {
112			write!(f, " {v}")?
113		}
114		if self.parallel {
115			f.write_str(" PARALLEL")?
116		}
117		if let Some(ref v) = self.explain {
118			write!(f, " {v}")?
119		}
120		Ok(())
121	}
122}