surrealdb/api/opt/
query.rs

1use crate::{
2	api::{err::Error, Response as QueryResponse, Result},
3	method::{self, Stats, Stream},
4	value::Notification,
5	Value,
6};
7use futures::future::Either;
8use futures::stream::select_all;
9use serde::de::DeserializeOwned;
10use std::marker::PhantomData;
11use std::mem;
12use surrealdb_core::{
13	sql::{
14		self, from_value as from_core_value, statements::*, Statement, Statements,
15		Value as CoreValue,
16	},
17	syn,
18};
19
20/// A trait for converting inputs into SQL statements
21pub trait IntoQuery {
22	/// Converts an input into SQL statements
23	fn into_query(self) -> Result<Vec<Statement>>;
24}
25
26impl IntoQuery for sql::Query {
27	fn into_query(self) -> Result<Vec<Statement>> {
28		Ok(self.0 .0)
29	}
30}
31
32impl IntoQuery for Statements {
33	fn into_query(self) -> Result<Vec<Statement>> {
34		Ok(self.0)
35	}
36}
37
38impl IntoQuery for Vec<Statement> {
39	fn into_query(self) -> Result<Vec<Statement>> {
40		Ok(self)
41	}
42}
43
44impl IntoQuery for Statement {
45	fn into_query(self) -> Result<Vec<Statement>> {
46		Ok(vec![self])
47	}
48}
49
50impl IntoQuery for UseStatement {
51	fn into_query(self) -> Result<Vec<Statement>> {
52		Ok(vec![Statement::Use(self)])
53	}
54}
55
56impl IntoQuery for SetStatement {
57	fn into_query(self) -> Result<Vec<Statement>> {
58		Ok(vec![Statement::Set(self)])
59	}
60}
61
62impl IntoQuery for InfoStatement {
63	fn into_query(self) -> Result<Vec<Statement>> {
64		Ok(vec![Statement::Info(self)])
65	}
66}
67
68impl IntoQuery for LiveStatement {
69	fn into_query(self) -> Result<Vec<Statement>> {
70		Ok(vec![Statement::Live(self)])
71	}
72}
73
74impl IntoQuery for KillStatement {
75	fn into_query(self) -> Result<Vec<Statement>> {
76		Ok(vec![Statement::Kill(self)])
77	}
78}
79
80impl IntoQuery for BeginStatement {
81	fn into_query(self) -> Result<Vec<Statement>> {
82		Ok(vec![Statement::Begin(self)])
83	}
84}
85
86impl IntoQuery for CancelStatement {
87	fn into_query(self) -> Result<Vec<Statement>> {
88		Ok(vec![Statement::Cancel(self)])
89	}
90}
91
92impl IntoQuery for CommitStatement {
93	fn into_query(self) -> Result<Vec<Statement>> {
94		Ok(vec![Statement::Commit(self)])
95	}
96}
97
98impl IntoQuery for OutputStatement {
99	fn into_query(self) -> Result<Vec<Statement>> {
100		Ok(vec![Statement::Output(self)])
101	}
102}
103
104impl IntoQuery for IfelseStatement {
105	fn into_query(self) -> Result<Vec<Statement>> {
106		Ok(vec![Statement::Ifelse(self)])
107	}
108}
109
110impl IntoQuery for SelectStatement {
111	fn into_query(self) -> Result<Vec<Statement>> {
112		Ok(vec![Statement::Select(self)])
113	}
114}
115
116impl IntoQuery for CreateStatement {
117	fn into_query(self) -> Result<Vec<Statement>> {
118		Ok(vec![Statement::Create(self)])
119	}
120}
121
122impl IntoQuery for UpdateStatement {
123	fn into_query(self) -> Result<Vec<Statement>> {
124		Ok(vec![Statement::Update(self)])
125	}
126}
127
128impl IntoQuery for RelateStatement {
129	fn into_query(self) -> Result<Vec<Statement>> {
130		Ok(vec![Statement::Relate(self)])
131	}
132}
133
134impl IntoQuery for DeleteStatement {
135	fn into_query(self) -> Result<Vec<Statement>> {
136		Ok(vec![Statement::Delete(self)])
137	}
138}
139
140impl IntoQuery for InsertStatement {
141	fn into_query(self) -> Result<Vec<Statement>> {
142		Ok(vec![Statement::Insert(self)])
143	}
144}
145
146impl IntoQuery for DefineStatement {
147	fn into_query(self) -> Result<Vec<Statement>> {
148		Ok(vec![Statement::Define(self)])
149	}
150}
151
152impl IntoQuery for AlterStatement {
153	fn into_query(self) -> Result<Vec<Statement>> {
154		Ok(vec![Statement::Alter(self)])
155	}
156}
157
158impl IntoQuery for RemoveStatement {
159	fn into_query(self) -> Result<Vec<Statement>> {
160		Ok(vec![Statement::Remove(self)])
161	}
162}
163
164impl IntoQuery for OptionStatement {
165	fn into_query(self) -> Result<Vec<Statement>> {
166		Ok(vec![Statement::Option(self)])
167	}
168}
169
170impl IntoQuery for &str {
171	fn into_query(self) -> Result<Vec<Statement>> {
172		syn::parse(self)?.into_query()
173	}
174}
175
176impl IntoQuery for &String {
177	fn into_query(self) -> Result<Vec<Statement>> {
178		syn::parse(self)?.into_query()
179	}
180}
181
182impl IntoQuery for String {
183	fn into_query(self) -> Result<Vec<Statement>> {
184		syn::parse(&self)?.into_query()
185	}
186}
187
188/// Represents a way to take a single query result from a list of responses
189pub trait QueryResult<Response>
190where
191	Response: DeserializeOwned,
192{
193	/// Extracts and deserializes a query result from a query response
194	fn query_result(self, response: &mut QueryResponse) -> Result<Response>;
195
196	/// Extracts the statistics from a query response
197	fn stats(&self, response: &QueryResponse) -> Option<Stats> {
198		response.results.get(&0).map(|x| x.0)
199	}
200}
201
202impl QueryResult<Value> for usize {
203	fn query_result(self, response: &mut QueryResponse) -> Result<Value> {
204		match response.results.swap_remove(&self) {
205			Some((_, result)) => Ok(Value::from_inner(result?)),
206			None => Ok(Value::from_inner(CoreValue::None)),
207		}
208	}
209
210	fn stats(&self, response: &QueryResponse) -> Option<Stats> {
211		response.results.get(self).map(|x| x.0)
212	}
213}
214
215impl<T> QueryResult<Option<T>> for usize
216where
217	T: DeserializeOwned,
218{
219	fn query_result(self, response: &mut QueryResponse) -> Result<Option<T>> {
220		let value = match response.results.get_mut(&self) {
221			Some((_, result)) => match result {
222				Ok(val) => val,
223				Err(error) => {
224					let error = mem::replace(error, Error::ConnectionUninitialised.into());
225					response.results.swap_remove(&self);
226					return Err(error);
227				}
228			},
229			None => {
230				return Ok(None);
231			}
232		};
233		let result = match value {
234			CoreValue::Array(vec) => match &mut vec.0[..] {
235				[] => Ok(None),
236				[value] => {
237					let value = mem::take(value);
238					from_core_value(value).map_err(Into::into)
239				}
240				_ => Err(Error::LossyTake(QueryResponse {
241					results: mem::take(&mut response.results),
242					live_queries: mem::take(&mut response.live_queries),
243					..QueryResponse::new()
244				})
245				.into()),
246			},
247			_ => {
248				let value = mem::take(value);
249				from_core_value(value).map_err(Into::into)
250			}
251		};
252		response.results.swap_remove(&self);
253		result
254	}
255
256	fn stats(&self, response: &QueryResponse) -> Option<Stats> {
257		response.results.get(self).map(|x| x.0)
258	}
259}
260
261impl QueryResult<Value> for (usize, &str) {
262	fn query_result(self, response: &mut QueryResponse) -> Result<Value> {
263		let (index, key) = self;
264		let value = match response.results.get_mut(&index) {
265			Some((_, result)) => match result {
266				Ok(val) => val,
267				Err(error) => {
268					let error = mem::replace(error, Error::ConnectionUninitialised.into());
269					response.results.swap_remove(&index);
270					return Err(error);
271				}
272			},
273			None => {
274				return Ok(Value::from_inner(CoreValue::None));
275			}
276		};
277
278		let value = match value {
279			CoreValue::Object(object) => object.remove(key).unwrap_or_default(),
280			_ => CoreValue::None,
281		};
282
283		Ok(Value::from_inner(value))
284	}
285
286	fn stats(&self, response: &QueryResponse) -> Option<Stats> {
287		response.results.get(&self.0).map(|x| x.0)
288	}
289}
290
291impl<T> QueryResult<Option<T>> for (usize, &str)
292where
293	T: DeserializeOwned,
294{
295	fn query_result(self, response: &mut QueryResponse) -> Result<Option<T>> {
296		let (index, key) = self;
297		let value = match response.results.get_mut(&index) {
298			Some((_, result)) => match result {
299				Ok(val) => val,
300				Err(error) => {
301					let error = mem::replace(error, Error::ConnectionUninitialised.into());
302					response.results.swap_remove(&index);
303					return Err(error);
304				}
305			},
306			None => {
307				return Ok(None);
308			}
309		};
310		let value = match value {
311			CoreValue::Array(vec) => match &mut vec.0[..] {
312				[] => {
313					response.results.swap_remove(&index);
314					return Ok(None);
315				}
316				[value] => value,
317				_ => {
318					return Err(Error::LossyTake(QueryResponse {
319						results: mem::take(&mut response.results),
320						live_queries: mem::take(&mut response.live_queries),
321						..QueryResponse::new()
322					})
323					.into());
324				}
325			},
326			value => value,
327		};
328		match value {
329			CoreValue::None => {
330				response.results.swap_remove(&index);
331				Ok(None)
332			}
333			CoreValue::Object(object) => {
334				if object.is_empty() {
335					response.results.swap_remove(&index);
336					return Ok(None);
337				}
338				let Some(value) = object.remove(key) else {
339					return Ok(None);
340				};
341				from_core_value(value).map_err(Into::into)
342			}
343			_ => Ok(None),
344		}
345	}
346
347	fn stats(&self, response: &QueryResponse) -> Option<Stats> {
348		response.results.get(&self.0).map(|x| x.0)
349	}
350}
351
352impl<T> QueryResult<Vec<T>> for usize
353where
354	T: DeserializeOwned,
355{
356	fn query_result(self, response: &mut QueryResponse) -> Result<Vec<T>> {
357		let vec = match response.results.swap_remove(&self) {
358			Some((_, result)) => match result? {
359				CoreValue::Array(vec) => vec.0,
360				vec => vec![vec],
361			},
362			None => {
363				return Ok(vec![]);
364			}
365		};
366		from_core_value(vec.into()).map_err(Into::into)
367	}
368
369	fn stats(&self, response: &QueryResponse) -> Option<Stats> {
370		response.results.get(self).map(|x| x.0)
371	}
372}
373
374impl<T> QueryResult<Vec<T>> for (usize, &str)
375where
376	T: DeserializeOwned,
377{
378	fn query_result(self, response: &mut QueryResponse) -> Result<Vec<T>> {
379		let (index, key) = self;
380		match response.results.get_mut(&index) {
381			Some((_, result)) => match result {
382				Ok(val) => match val {
383					CoreValue::Array(vec) => {
384						let mut responses = Vec::with_capacity(vec.len());
385						for value in vec.iter_mut() {
386							if let CoreValue::Object(object) = value {
387								if let Some(value) = object.remove(key) {
388									responses.push(value);
389								}
390							}
391						}
392						from_core_value(responses.into()).map_err(Into::into)
393					}
394					val => {
395						if let CoreValue::Object(object) = val {
396							if let Some(value) = object.remove(key) {
397								return from_core_value(vec![value].into()).map_err(Into::into);
398							}
399						}
400						Ok(vec![])
401					}
402				},
403				Err(error) => {
404					let error = mem::replace(error, Error::ConnectionUninitialised.into());
405					response.results.swap_remove(&index);
406					Err(error)
407				}
408			},
409			None => Ok(vec![]),
410		}
411	}
412
413	fn stats(&self, response: &QueryResponse) -> Option<Stats> {
414		response.results.get(&self.0).map(|x| x.0)
415	}
416}
417
418impl QueryResult<Value> for &str {
419	fn query_result(self, response: &mut QueryResponse) -> Result<Value> {
420		(0, self).query_result(response)
421	}
422}
423
424impl<T> QueryResult<Option<T>> for &str
425where
426	T: DeserializeOwned,
427{
428	fn query_result(self, response: &mut QueryResponse) -> Result<Option<T>> {
429		(0, self).query_result(response)
430	}
431}
432
433impl<T> QueryResult<Vec<T>> for &str
434where
435	T: DeserializeOwned,
436{
437	fn query_result(self, response: &mut QueryResponse) -> Result<Vec<T>> {
438		(0, self).query_result(response)
439	}
440}
441
442/// A way to take a query stream future from a query response
443pub trait QueryStream<R> {
444	/// Retrieves the query stream future
445	fn query_stream(self, response: &mut QueryResponse) -> Result<method::QueryStream<R>>;
446}
447
448impl QueryStream<Value> for usize {
449	fn query_stream(self, response: &mut QueryResponse) -> Result<method::QueryStream<Value>> {
450		let stream = response
451			.live_queries
452			.swap_remove(&self)
453			.and_then(|result| match result {
454				Err(crate::Error::Api(Error::NotLiveQuery(..))) => {
455					response.results.swap_remove(&self).and_then(|x| x.1.err().map(Err))
456				}
457				result => Some(result),
458			})
459			.unwrap_or_else(|| match response.results.contains_key(&self) {
460				true => Err(Error::NotLiveQuery(self).into()),
461				false => Err(Error::QueryIndexOutOfBounds(self).into()),
462			})?;
463		Ok(method::QueryStream(Either::Left(stream)))
464	}
465}
466
467impl QueryStream<Value> for () {
468	fn query_stream(self, response: &mut QueryResponse) -> Result<method::QueryStream<Value>> {
469		let mut streams = Vec::with_capacity(response.live_queries.len());
470		for (index, result) in mem::take(&mut response.live_queries) {
471			match result {
472				Ok(stream) => streams.push(stream),
473				Err(crate::Error::Api(Error::NotLiveQuery(..))) => match response.results.swap_remove(&index) {
474					Some((stats, Err(error))) => {
475						response.results.insert(index, (stats, Err(Error::ResponseAlreadyTaken.into())));
476						return Err(error);
477					}
478					Some((_, Ok(..))) => unreachable!("the internal error variant indicates that an error occurred in the `LIVE SELECT` query"),
479					None => { return Err(Error::ResponseAlreadyTaken.into()); }
480				}
481				Err(error) => { return Err(error); }
482			}
483		}
484		Ok(method::QueryStream(Either::Right(select_all(streams))))
485	}
486}
487
488impl<R> QueryStream<Notification<R>> for usize
489where
490	R: DeserializeOwned + Unpin,
491{
492	fn query_stream(
493		self,
494		response: &mut QueryResponse,
495	) -> Result<method::QueryStream<Notification<R>>> {
496		let mut stream = response
497			.live_queries
498			.swap_remove(&self)
499			.and_then(|result| match result {
500				Err(crate::Error::Api(Error::NotLiveQuery(..))) => {
501					response.results.swap_remove(&self).and_then(|x| x.1.err().map(Err))
502				}
503				result => Some(result),
504			})
505			.unwrap_or_else(|| match response.results.contains_key(&self) {
506				true => Err(Error::NotLiveQuery(self).into()),
507				false => Err(Error::QueryIndexOutOfBounds(self).into()),
508			})?;
509		Ok(method::QueryStream(Either::Left(Stream {
510			client: stream.client.clone(),
511			id: mem::take(&mut stream.id),
512			rx: stream.rx.take(),
513			response_type: PhantomData,
514		})))
515	}
516}
517
518impl<R> QueryStream<Notification<R>> for ()
519where
520	R: DeserializeOwned + Unpin,
521{
522	fn query_stream(
523		self,
524		response: &mut QueryResponse,
525	) -> Result<method::QueryStream<Notification<R>>> {
526		let mut streams = Vec::with_capacity(response.live_queries.len());
527		for (index, result) in mem::take(&mut response.live_queries) {
528			let mut stream = match result {
529				Ok(stream) => stream,
530				Err(crate::Error::Api(Error::NotLiveQuery(..))) => match response.results.swap_remove(&index) {
531					Some((stats, Err(error))) => {
532						response.results.insert(index, (stats, Err(Error::ResponseAlreadyTaken.into())));
533						return Err(error);
534					}
535					Some((_, Ok(..))) => unreachable!("the internal error variant indicates that an error occurred in the `LIVE SELECT` query"),
536					None => { return Err(Error::ResponseAlreadyTaken.into()); }
537				}
538				Err(error) => { return Err(error); }
539			};
540			streams.push(Stream {
541				client: stream.client.clone(),
542				id: mem::take(&mut stream.id),
543				rx: stream.rx.take(),
544				response_type: PhantomData,
545			});
546		}
547		Ok(method::QueryStream(Either::Right(select_all(streams))))
548	}
549}