1use crate::{
2 api::{err::Error, Response as QueryResponse, Result},
3 method::{self, Stats, Stream},
4 value::Notification,
5 Value,
6};
7use futures::future::Either;
8use futures::stream::select_all;
9use serde::de::DeserializeOwned;
10use std::marker::PhantomData;
11use std::mem;
12use surrealdb_core::{
13 sql::{
14 self, from_value as from_core_value, statements::*, Statement, Statements,
15 Value as CoreValue,
16 },
17 syn,
18};
19
20pub trait IntoQuery {
22 fn into_query(self) -> Result<Vec<Statement>>;
24}
25
26impl IntoQuery for sql::Query {
27 fn into_query(self) -> Result<Vec<Statement>> {
28 Ok(self.0 .0)
29 }
30}
31
32impl IntoQuery for Statements {
33 fn into_query(self) -> Result<Vec<Statement>> {
34 Ok(self.0)
35 }
36}
37
38impl IntoQuery for Vec<Statement> {
39 fn into_query(self) -> Result<Vec<Statement>> {
40 Ok(self)
41 }
42}
43
44impl IntoQuery for Statement {
45 fn into_query(self) -> Result<Vec<Statement>> {
46 Ok(vec![self])
47 }
48}
49
50impl IntoQuery for UseStatement {
51 fn into_query(self) -> Result<Vec<Statement>> {
52 Ok(vec![Statement::Use(self)])
53 }
54}
55
56impl IntoQuery for SetStatement {
57 fn into_query(self) -> Result<Vec<Statement>> {
58 Ok(vec![Statement::Set(self)])
59 }
60}
61
62impl IntoQuery for InfoStatement {
63 fn into_query(self) -> Result<Vec<Statement>> {
64 Ok(vec![Statement::Info(self)])
65 }
66}
67
68impl IntoQuery for LiveStatement {
69 fn into_query(self) -> Result<Vec<Statement>> {
70 Ok(vec![Statement::Live(self)])
71 }
72}
73
74impl IntoQuery for KillStatement {
75 fn into_query(self) -> Result<Vec<Statement>> {
76 Ok(vec![Statement::Kill(self)])
77 }
78}
79
80impl IntoQuery for BeginStatement {
81 fn into_query(self) -> Result<Vec<Statement>> {
82 Ok(vec![Statement::Begin(self)])
83 }
84}
85
86impl IntoQuery for CancelStatement {
87 fn into_query(self) -> Result<Vec<Statement>> {
88 Ok(vec![Statement::Cancel(self)])
89 }
90}
91
92impl IntoQuery for CommitStatement {
93 fn into_query(self) -> Result<Vec<Statement>> {
94 Ok(vec![Statement::Commit(self)])
95 }
96}
97
98impl IntoQuery for OutputStatement {
99 fn into_query(self) -> Result<Vec<Statement>> {
100 Ok(vec![Statement::Output(self)])
101 }
102}
103
104impl IntoQuery for IfelseStatement {
105 fn into_query(self) -> Result<Vec<Statement>> {
106 Ok(vec![Statement::Ifelse(self)])
107 }
108}
109
110impl IntoQuery for SelectStatement {
111 fn into_query(self) -> Result<Vec<Statement>> {
112 Ok(vec![Statement::Select(self)])
113 }
114}
115
116impl IntoQuery for CreateStatement {
117 fn into_query(self) -> Result<Vec<Statement>> {
118 Ok(vec![Statement::Create(self)])
119 }
120}
121
122impl IntoQuery for UpdateStatement {
123 fn into_query(self) -> Result<Vec<Statement>> {
124 Ok(vec![Statement::Update(self)])
125 }
126}
127
128impl IntoQuery for RelateStatement {
129 fn into_query(self) -> Result<Vec<Statement>> {
130 Ok(vec![Statement::Relate(self)])
131 }
132}
133
134impl IntoQuery for DeleteStatement {
135 fn into_query(self) -> Result<Vec<Statement>> {
136 Ok(vec![Statement::Delete(self)])
137 }
138}
139
140impl IntoQuery for InsertStatement {
141 fn into_query(self) -> Result<Vec<Statement>> {
142 Ok(vec![Statement::Insert(self)])
143 }
144}
145
146impl IntoQuery for DefineStatement {
147 fn into_query(self) -> Result<Vec<Statement>> {
148 Ok(vec![Statement::Define(self)])
149 }
150}
151
152impl IntoQuery for AlterStatement {
153 fn into_query(self) -> Result<Vec<Statement>> {
154 Ok(vec![Statement::Alter(self)])
155 }
156}
157
158impl IntoQuery for RemoveStatement {
159 fn into_query(self) -> Result<Vec<Statement>> {
160 Ok(vec![Statement::Remove(self)])
161 }
162}
163
164impl IntoQuery for OptionStatement {
165 fn into_query(self) -> Result<Vec<Statement>> {
166 Ok(vec![Statement::Option(self)])
167 }
168}
169
170impl IntoQuery for &str {
171 fn into_query(self) -> Result<Vec<Statement>> {
172 syn::parse(self)?.into_query()
173 }
174}
175
176impl IntoQuery for &String {
177 fn into_query(self) -> Result<Vec<Statement>> {
178 syn::parse(self)?.into_query()
179 }
180}
181
182impl IntoQuery for String {
183 fn into_query(self) -> Result<Vec<Statement>> {
184 syn::parse(&self)?.into_query()
185 }
186}
187
188pub trait QueryResult<Response>
190where
191 Response: DeserializeOwned,
192{
193 fn query_result(self, response: &mut QueryResponse) -> Result<Response>;
195
196 fn stats(&self, response: &QueryResponse) -> Option<Stats> {
198 response.results.get(&0).map(|x| x.0)
199 }
200}
201
202impl QueryResult<Value> for usize {
203 fn query_result(self, response: &mut QueryResponse) -> Result<Value> {
204 match response.results.swap_remove(&self) {
205 Some((_, result)) => Ok(Value::from_inner(result?)),
206 None => Ok(Value::from_inner(CoreValue::None)),
207 }
208 }
209
210 fn stats(&self, response: &QueryResponse) -> Option<Stats> {
211 response.results.get(self).map(|x| x.0)
212 }
213}
214
215impl<T> QueryResult<Option<T>> for usize
216where
217 T: DeserializeOwned,
218{
219 fn query_result(self, response: &mut QueryResponse) -> Result<Option<T>> {
220 let value = match response.results.get_mut(&self) {
221 Some((_, result)) => match result {
222 Ok(val) => val,
223 Err(error) => {
224 let error = mem::replace(error, Error::ConnectionUninitialised.into());
225 response.results.swap_remove(&self);
226 return Err(error);
227 }
228 },
229 None => {
230 return Ok(None);
231 }
232 };
233 let result = match value {
234 CoreValue::Array(vec) => match &mut vec.0[..] {
235 [] => Ok(None),
236 [value] => {
237 let value = mem::take(value);
238 from_core_value(value).map_err(Into::into)
239 }
240 _ => Err(Error::LossyTake(QueryResponse {
241 results: mem::take(&mut response.results),
242 live_queries: mem::take(&mut response.live_queries),
243 ..QueryResponse::new()
244 })
245 .into()),
246 },
247 _ => {
248 let value = mem::take(value);
249 from_core_value(value).map_err(Into::into)
250 }
251 };
252 response.results.swap_remove(&self);
253 result
254 }
255
256 fn stats(&self, response: &QueryResponse) -> Option<Stats> {
257 response.results.get(self).map(|x| x.0)
258 }
259}
260
261impl QueryResult<Value> for (usize, &str) {
262 fn query_result(self, response: &mut QueryResponse) -> Result<Value> {
263 let (index, key) = self;
264 let value = match response.results.get_mut(&index) {
265 Some((_, result)) => match result {
266 Ok(val) => val,
267 Err(error) => {
268 let error = mem::replace(error, Error::ConnectionUninitialised.into());
269 response.results.swap_remove(&index);
270 return Err(error);
271 }
272 },
273 None => {
274 return Ok(Value::from_inner(CoreValue::None));
275 }
276 };
277
278 let value = match value {
279 CoreValue::Object(object) => object.remove(key).unwrap_or_default(),
280 _ => CoreValue::None,
281 };
282
283 Ok(Value::from_inner(value))
284 }
285
286 fn stats(&self, response: &QueryResponse) -> Option<Stats> {
287 response.results.get(&self.0).map(|x| x.0)
288 }
289}
290
291impl<T> QueryResult<Option<T>> for (usize, &str)
292where
293 T: DeserializeOwned,
294{
295 fn query_result(self, response: &mut QueryResponse) -> Result<Option<T>> {
296 let (index, key) = self;
297 let value = match response.results.get_mut(&index) {
298 Some((_, result)) => match result {
299 Ok(val) => val,
300 Err(error) => {
301 let error = mem::replace(error, Error::ConnectionUninitialised.into());
302 response.results.swap_remove(&index);
303 return Err(error);
304 }
305 },
306 None => {
307 return Ok(None);
308 }
309 };
310 let value = match value {
311 CoreValue::Array(vec) => match &mut vec.0[..] {
312 [] => {
313 response.results.swap_remove(&index);
314 return Ok(None);
315 }
316 [value] => value,
317 _ => {
318 return Err(Error::LossyTake(QueryResponse {
319 results: mem::take(&mut response.results),
320 live_queries: mem::take(&mut response.live_queries),
321 ..QueryResponse::new()
322 })
323 .into());
324 }
325 },
326 value => value,
327 };
328 match value {
329 CoreValue::None => {
330 response.results.swap_remove(&index);
331 Ok(None)
332 }
333 CoreValue::Object(object) => {
334 if object.is_empty() {
335 response.results.swap_remove(&index);
336 return Ok(None);
337 }
338 let Some(value) = object.remove(key) else {
339 return Ok(None);
340 };
341 from_core_value(value).map_err(Into::into)
342 }
343 _ => Ok(None),
344 }
345 }
346
347 fn stats(&self, response: &QueryResponse) -> Option<Stats> {
348 response.results.get(&self.0).map(|x| x.0)
349 }
350}
351
352impl<T> QueryResult<Vec<T>> for usize
353where
354 T: DeserializeOwned,
355{
356 fn query_result(self, response: &mut QueryResponse) -> Result<Vec<T>> {
357 let vec = match response.results.swap_remove(&self) {
358 Some((_, result)) => match result? {
359 CoreValue::Array(vec) => vec.0,
360 vec => vec![vec],
361 },
362 None => {
363 return Ok(vec![]);
364 }
365 };
366 from_core_value(vec.into()).map_err(Into::into)
367 }
368
369 fn stats(&self, response: &QueryResponse) -> Option<Stats> {
370 response.results.get(self).map(|x| x.0)
371 }
372}
373
374impl<T> QueryResult<Vec<T>> for (usize, &str)
375where
376 T: DeserializeOwned,
377{
378 fn query_result(self, response: &mut QueryResponse) -> Result<Vec<T>> {
379 let (index, key) = self;
380 match response.results.get_mut(&index) {
381 Some((_, result)) => match result {
382 Ok(val) => match val {
383 CoreValue::Array(vec) => {
384 let mut responses = Vec::with_capacity(vec.len());
385 for value in vec.iter_mut() {
386 if let CoreValue::Object(object) = value {
387 if let Some(value) = object.remove(key) {
388 responses.push(value);
389 }
390 }
391 }
392 from_core_value(responses.into()).map_err(Into::into)
393 }
394 val => {
395 if let CoreValue::Object(object) = val {
396 if let Some(value) = object.remove(key) {
397 return from_core_value(vec![value].into()).map_err(Into::into);
398 }
399 }
400 Ok(vec![])
401 }
402 },
403 Err(error) => {
404 let error = mem::replace(error, Error::ConnectionUninitialised.into());
405 response.results.swap_remove(&index);
406 Err(error)
407 }
408 },
409 None => Ok(vec![]),
410 }
411 }
412
413 fn stats(&self, response: &QueryResponse) -> Option<Stats> {
414 response.results.get(&self.0).map(|x| x.0)
415 }
416}
417
418impl QueryResult<Value> for &str {
419 fn query_result(self, response: &mut QueryResponse) -> Result<Value> {
420 (0, self).query_result(response)
421 }
422}
423
424impl<T> QueryResult<Option<T>> for &str
425where
426 T: DeserializeOwned,
427{
428 fn query_result(self, response: &mut QueryResponse) -> Result<Option<T>> {
429 (0, self).query_result(response)
430 }
431}
432
433impl<T> QueryResult<Vec<T>> for &str
434where
435 T: DeserializeOwned,
436{
437 fn query_result(self, response: &mut QueryResponse) -> Result<Vec<T>> {
438 (0, self).query_result(response)
439 }
440}
441
442pub trait QueryStream<R> {
444 fn query_stream(self, response: &mut QueryResponse) -> Result<method::QueryStream<R>>;
446}
447
448impl QueryStream<Value> for usize {
449 fn query_stream(self, response: &mut QueryResponse) -> Result<method::QueryStream<Value>> {
450 let stream = response
451 .live_queries
452 .swap_remove(&self)
453 .and_then(|result| match result {
454 Err(crate::Error::Api(Error::NotLiveQuery(..))) => {
455 response.results.swap_remove(&self).and_then(|x| x.1.err().map(Err))
456 }
457 result => Some(result),
458 })
459 .unwrap_or_else(|| match response.results.contains_key(&self) {
460 true => Err(Error::NotLiveQuery(self).into()),
461 false => Err(Error::QueryIndexOutOfBounds(self).into()),
462 })?;
463 Ok(method::QueryStream(Either::Left(stream)))
464 }
465}
466
467impl QueryStream<Value> for () {
468 fn query_stream(self, response: &mut QueryResponse) -> Result<method::QueryStream<Value>> {
469 let mut streams = Vec::with_capacity(response.live_queries.len());
470 for (index, result) in mem::take(&mut response.live_queries) {
471 match result {
472 Ok(stream) => streams.push(stream),
473 Err(crate::Error::Api(Error::NotLiveQuery(..))) => match response.results.swap_remove(&index) {
474 Some((stats, Err(error))) => {
475 response.results.insert(index, (stats, Err(Error::ResponseAlreadyTaken.into())));
476 return Err(error);
477 }
478 Some((_, Ok(..))) => unreachable!("the internal error variant indicates that an error occurred in the `LIVE SELECT` query"),
479 None => { return Err(Error::ResponseAlreadyTaken.into()); }
480 }
481 Err(error) => { return Err(error); }
482 }
483 }
484 Ok(method::QueryStream(Either::Right(select_all(streams))))
485 }
486}
487
488impl<R> QueryStream<Notification<R>> for usize
489where
490 R: DeserializeOwned + Unpin,
491{
492 fn query_stream(
493 self,
494 response: &mut QueryResponse,
495 ) -> Result<method::QueryStream<Notification<R>>> {
496 let mut stream = response
497 .live_queries
498 .swap_remove(&self)
499 .and_then(|result| match result {
500 Err(crate::Error::Api(Error::NotLiveQuery(..))) => {
501 response.results.swap_remove(&self).and_then(|x| x.1.err().map(Err))
502 }
503 result => Some(result),
504 })
505 .unwrap_or_else(|| match response.results.contains_key(&self) {
506 true => Err(Error::NotLiveQuery(self).into()),
507 false => Err(Error::QueryIndexOutOfBounds(self).into()),
508 })?;
509 Ok(method::QueryStream(Either::Left(Stream {
510 client: stream.client.clone(),
511 id: mem::take(&mut stream.id),
512 rx: stream.rx.take(),
513 response_type: PhantomData,
514 })))
515 }
516}
517
518impl<R> QueryStream<Notification<R>> for ()
519where
520 R: DeserializeOwned + Unpin,
521{
522 fn query_stream(
523 self,
524 response: &mut QueryResponse,
525 ) -> Result<method::QueryStream<Notification<R>>> {
526 let mut streams = Vec::with_capacity(response.live_queries.len());
527 for (index, result) in mem::take(&mut response.live_queries) {
528 let mut stream = match result {
529 Ok(stream) => stream,
530 Err(crate::Error::Api(Error::NotLiveQuery(..))) => match response.results.swap_remove(&index) {
531 Some((stats, Err(error))) => {
532 response.results.insert(index, (stats, Err(Error::ResponseAlreadyTaken.into())));
533 return Err(error);
534 }
535 Some((_, Ok(..))) => unreachable!("the internal error variant indicates that an error occurred in the `LIVE SELECT` query"),
536 None => { return Err(Error::ResponseAlreadyTaken.into()); }
537 }
538 Err(error) => { return Err(error); }
539 };
540 streams.push(Stream {
541 client: stream.client.clone(),
542 id: mem::take(&mut stream.id),
543 rx: stream.rx.take(),
544 response_type: PhantomData,
545 });
546 }
547 Ok(method::QueryStream(Either::Right(select_all(streams))))
548 }
549}