use std::collections::{HashSet, LinkedList};
use std::hash::Hash;
use std::marker::PhantomData;
use std::mem;
use futures::future::Either;
use futures::stream::select_all;
use surrealdb_core::rpc::DbResultStats;
use crate::method::live::Stream;
use crate::notification::Notification;
use crate::types::{SurrealValue, Value};
use crate::{Error, IndexedResults as QueryResponse, Result};
pub trait QueryResult<Response>: query_result::Sealed<Response>
where
Response: SurrealValue,
{
}
mod query_result {
use surrealdb_core::rpc::DbResultStats;
pub trait Sealed<Response>
where
Response: super::SurrealValue,
{
fn query_result(self, response: &mut super::QueryResponse) -> super::Result<Response>;
fn stats(&self, response: &super::QueryResponse) -> Option<DbResultStats> {
response.results.get(&0).map(|x| x.0)
}
}
}
impl QueryResult<Value> for usize {}
impl query_result::Sealed<Value> for usize {
fn query_result(self, response: &mut QueryResponse) -> Result<Value> {
match response.results.swap_remove(&self) {
Some((_, result)) => Ok(result?),
None => Ok(Value::None),
}
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(self).map(|x| x.0)
}
}
impl<T> QueryResult<Option<T>> for usize where T: SurrealValue {}
impl<T> query_result::Sealed<Option<T>> for usize
where
T: SurrealValue,
{
fn query_result(self, response: &mut QueryResponse) -> Result<Option<T>> {
let value = match response.results.swap_remove(&self) {
Some((_, Err(err))) => return Err(err),
Some((_, Ok(value))) => value,
None => return Ok(None),
};
match value {
Value::Array(mut vec) => match vec.len() {
0 => Ok(None),
1 => {
let value = vec.swap_remove(0);
match value {
Value::None => Ok(None),
v => {
Ok(Some(T::from_value(v).map_err(|e| Error::internal(e.to_string()))?))
}
}
}
_ => Err(Error::internal(
"Tried to take only a single result from a query that contains multiple"
.to_string(),
)),
},
Value::None => Ok(None),
v => Ok(Some(T::from_value(v).map_err(|e| Error::internal(e.to_string()))?)),
}
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(self).map(|x| x.0)
}
}
impl QueryResult<Value> for (usize, &str) {}
impl query_result::Sealed<Value> for (usize, &str) {
fn query_result(self, response: &mut QueryResponse) -> Result<Value> {
let (index, key) = self;
let Some(value) = response.try_get_value_mut(&index)? else {
return Ok(Value::None);
};
let value = match value {
Value::Object(object) => object.remove(key).unwrap_or_default(),
_ => Value::None,
};
Ok(value)
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(&self.0).map(|x| x.0)
}
}
impl<T> QueryResult<Option<T>> for (usize, &str) where T: SurrealValue {}
impl<T> query_result::Sealed<Option<T>> for (usize, &str)
where
T: SurrealValue,
{
fn query_result(self, response: &mut QueryResponse) -> Result<Option<T>> {
let (index, key) = self;
let Some(value) = response.try_get_value_mut(&index)? else {
return Ok(None);
};
let value = match value {
Value::Array(vec) => match &mut vec[..] {
[] => {
response.results.swap_remove(&index);
return Ok(None);
}
[value] => value,
_ => {
return Err(Error::internal(
"Tried to take only a single result from a query that contains multiple"
.to_string(),
));
}
},
value => value,
};
match value {
Value::None => {
response.results.swap_remove(&index);
Ok(None)
}
Value::Object(object) => {
if object.is_empty() {
response.results.swap_remove(&index);
return Ok(None);
}
let Some(value) = object.remove(key) else {
return Ok(None);
};
Ok(Some(T::from_value(value).map_err(|e| Error::internal(e.to_string()))?))
}
_ => Ok(None),
}
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(&self.0).map(|x| x.0)
}
}
impl<T> QueryResult<Vec<T>> for usize where T: SurrealValue {}
impl<T> query_result::Sealed<Vec<T>> for usize
where
T: SurrealValue,
{
fn query_result(self, response: &mut QueryResponse) -> Result<Vec<T>> {
let vec = match response.results.swap_remove(&self) {
Some((_, result)) => match result? {
Value::Array(arr) => arr.into_vec(),
vec => vec![vec],
},
None => {
return Ok(vec![]);
}
};
vec.into_iter()
.map(|v| T::from_value(v).map_err(|e| Error::internal(e.to_string())))
.collect::<Result<Vec<T>>>()
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(self).map(|x| x.0)
}
}
impl<T> QueryResult<Vec<T>> for (usize, &str) where T: SurrealValue {}
impl<T> query_result::Sealed<Vec<T>> for (usize, &str)
where
T: SurrealValue,
{
fn query_result(self, response: &mut QueryResponse) -> Result<Vec<T>> {
let (index, key) = self;
match response.try_get_value_mut(&index)? {
Some(val) => match val {
Value::Array(vec) => {
let mut responses = Vec::with_capacity(vec.len());
for value in vec.iter_mut() {
if let Value::Object(object) = value
&& let Some(value) = object.remove(key)
{
responses.push(value);
}
}
responses
.into_iter()
.map(|v| T::from_value(v).map_err(|e| Error::internal(e.to_string())))
.collect::<Result<Vec<T>>>()
}
val => {
if let Value::Object(object) = val
&& let Some(value) = object.remove(key)
{
return Ok(vec![
T::from_value(value).map_err(|e| Error::internal(e.to_string()))?,
]);
}
Ok(vec![])
}
},
None => Ok(vec![]),
}
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(&self.0).map(|x| x.0)
}
}
impl<T> QueryResult<LinkedList<T>> for usize where T: SurrealValue {}
impl<T> query_result::Sealed<LinkedList<T>> for usize
where
T: SurrealValue,
{
fn query_result(self, response: &mut QueryResponse) -> Result<LinkedList<T>> {
let vec = match response.results.swap_remove(&self) {
Some((_, result)) => match result? {
Value::Array(arr) => arr.into_vec(),
vec => vec![vec],
},
None => {
return Ok(LinkedList::new());
}
};
vec.into_iter()
.map(|v| T::from_value(v).map_err(|e| Error::internal(e.to_string())))
.collect::<Result<LinkedList<T>>>()
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(self).map(|x| x.0)
}
}
impl<T> QueryResult<LinkedList<T>> for (usize, &str) where T: SurrealValue {}
impl<T> query_result::Sealed<LinkedList<T>> for (usize, &str)
where
T: SurrealValue,
{
fn query_result(self, response: &mut QueryResponse) -> Result<LinkedList<T>> {
let (index, key) = self;
match response.try_get_value_mut(&index)? {
Some(val) => match val {
Value::Array(vec) => {
let mut responses = Vec::with_capacity(vec.len());
for value in vec.iter_mut() {
if let Value::Object(object) = value
&& let Some(value) = object.remove(key)
{
responses.push(value);
}
}
responses
.into_iter()
.map(|v| T::from_value(v).map_err(|e| Error::internal(e.to_string())))
.collect::<Result<LinkedList<T>>>()
}
val => {
if let Value::Object(object) = val
&& let Some(value) = object.remove(key)
{
return Ok(LinkedList::from([
T::from_value(value).map_err(|e| Error::internal(e.to_string()))?
]));
}
Ok(LinkedList::new())
}
},
None => Ok(LinkedList::new()),
}
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(&self.0).map(|x| x.0)
}
}
impl<T> QueryResult<HashSet<T>> for usize where T: SurrealValue + Hash + Eq {}
impl<T> query_result::Sealed<HashSet<T>> for usize
where
T: SurrealValue + Hash + Eq,
{
fn query_result(self, response: &mut QueryResponse) -> Result<HashSet<T>> {
let vec = match response.results.swap_remove(&self) {
Some((_, result)) => match result? {
Value::Array(arr) => arr.into_vec(),
vec => vec![vec],
},
None => {
return Ok(HashSet::new());
}
};
vec.into_iter()
.map(|v| T::from_value(v).map_err(|e| Error::internal(e.to_string())))
.collect::<Result<HashSet<T>>>()
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(self).map(|x| x.0)
}
}
impl<T> QueryResult<HashSet<T>> for (usize, &str) where T: SurrealValue + Hash + Eq {}
impl<T> query_result::Sealed<HashSet<T>> for (usize, &str)
where
T: SurrealValue + Hash + Eq,
{
fn query_result(self, response: &mut QueryResponse) -> Result<HashSet<T>> {
let (index, key) = self;
match response.try_get_value_mut(&index)? {
Some(val) => match val {
Value::Array(vec) => {
let mut responses = Vec::with_capacity(vec.len());
for value in vec.iter_mut() {
if let Value::Object(object) = value
&& let Some(value) = object.remove(key)
{
responses.push(value);
}
}
responses
.into_iter()
.map(|v| T::from_value(v).map_err(|e| Error::internal(e.to_string())))
.collect::<Result<HashSet<T>>>()
}
val => {
if let Value::Object(object) = val
&& let Some(value) = object.remove(key)
{
return Ok(HashSet::from([
T::from_value(value).map_err(|e| Error::internal(e.to_string()))?
]));
}
Ok(HashSet::new())
}
},
None => Ok(HashSet::new()),
}
}
fn stats(&self, response: &QueryResponse) -> Option<DbResultStats> {
response.results.get(&self.0).map(|x| x.0)
}
}
impl QueryResult<Value> for &str {}
impl query_result::Sealed<Value> for &str {
fn query_result(self, response: &mut QueryResponse) -> Result<Value> {
(0, self).query_result(response)
}
}
impl<T> QueryResult<Option<T>> for &str where T: SurrealValue {}
impl<T> query_result::Sealed<Option<T>> for &str
where
T: SurrealValue,
{
fn query_result(self, response: &mut QueryResponse) -> Result<Option<T>> {
(0, self).query_result(response)
}
}
impl<T> QueryResult<Vec<T>> for &str where T: SurrealValue {}
impl<T> query_result::Sealed<Vec<T>> for &str
where
T: SurrealValue,
{
fn query_result(self, response: &mut QueryResponse) -> Result<Vec<T>> {
(0, self).query_result(response)
}
}
impl<T> QueryResult<LinkedList<T>> for &str where T: SurrealValue {}
impl<T> query_result::Sealed<LinkedList<T>> for &str
where
T: SurrealValue,
{
fn query_result(self, response: &mut QueryResponse) -> Result<LinkedList<T>> {
(0, self).query_result(response)
}
}
impl<T> QueryResult<HashSet<T>> for &str where T: SurrealValue + Hash + Eq {}
impl<T> query_result::Sealed<HashSet<T>> for &str
where
T: SurrealValue + Hash + Eq,
{
fn query_result(self, response: &mut QueryResponse) -> Result<HashSet<T>> {
(0, self).query_result(response)
}
}
pub trait QueryStream<R>: query_stream::Sealed<R> {}
mod query_stream {
pub trait Sealed<R> {
fn query_stream(
self,
response: &mut super::QueryResponse,
) -> super::Result<crate::method::QueryStream<R>>;
}
}
impl QueryStream<Value> for usize {}
impl query_stream::Sealed<Value> for usize {
fn query_stream(
self,
response: &mut QueryResponse,
) -> Result<crate::method::QueryStream<Value>> {
let stream = response
.live_queries
.swap_remove(&self)
.and_then(|result| match result {
Err(e) => {
if e.message().contains("is not a live query") {
response.results.swap_remove(&self);
None
} else {
Some(Err(e))
}
}
result => Some(result),
})
.unwrap_or_else(|| match response.results.contains_key(&self) {
true => {
Err(Error::internal(format!("Query statement {} is not a live query", self)))
}
false => Err(Error::internal(format!("Query statement {} is out of bounds", self))),
})?;
Ok(crate::method::QueryStream(Either::Left(stream)))
}
}
impl QueryStream<Value> for () {}
impl query_stream::Sealed<Value> for () {
fn query_stream(
self,
response: &mut QueryResponse,
) -> Result<crate::method::QueryStream<Value>> {
let mut streams = Vec::with_capacity(response.live_queries.len());
for (index, result) in mem::take(&mut response.live_queries) {
match result {
Ok(stream) => streams.push(stream),
Err(e) => {
if e.message().contains("is not a live query") {
match response.results.swap_remove(&index) {
Some((_, Err(err))) => {
return Err(err);
}
Some((_, Ok(..))) => unreachable!(
"the internal error variant indicates that an error occurred in the `LIVE SELECT` query"
),
None => {
return Err(Error::internal(
"Tried to take a query response that has already been taken"
.to_string(),
));
}
}
} else {
return Err(e);
}
}
}
}
Ok(crate::method::QueryStream(Either::Right(select_all(streams))))
}
}
impl<R> QueryStream<Notification<R>> for usize where R: SurrealValue + Unpin {}
impl<R> query_stream::Sealed<Notification<R>> for usize
where
R: SurrealValue + Unpin,
{
fn query_stream(
self,
response: &mut QueryResponse,
) -> Result<crate::method::QueryStream<Notification<R>>> {
let mut stream = response
.live_queries
.swap_remove(&self)
.and_then(|result| match result {
Err(e) => {
if e.message().contains("is not a live query") {
response.results.swap_remove(&self);
None
} else {
Some(Err(e))
}
}
result => Some(result),
})
.unwrap_or_else(|| match response.results.contains_key(&self) {
true => {
Err(Error::internal(format!("Query statement {} is not a live query", self)))
}
false => Err(Error::internal(format!("Query statement {} is out of bounds", self))),
})?;
Ok(crate::method::QueryStream(Either::Left(Stream {
client: stream.client.clone(),
id: mem::take(&mut stream.id),
rx: stream.rx.take(),
response_type: PhantomData,
})))
}
}
impl<R> QueryStream<Notification<R>> for () where R: SurrealValue + Unpin {}
impl<R> query_stream::Sealed<Notification<R>> for ()
where
R: SurrealValue + Unpin,
{
fn query_stream(
self,
response: &mut QueryResponse,
) -> Result<crate::method::QueryStream<Notification<R>>> {
let mut streams = Vec::with_capacity(response.live_queries.len());
for (index, result) in mem::take(&mut response.live_queries) {
let mut stream = match result {
Ok(stream) => stream,
Err(e) => {
if e.message().contains("is not a live query") {
match response.results.swap_remove(&index) {
Some((_, Err(err))) => {
return Err(err);
}
Some((_, Ok(..))) => unreachable!(
"the internal error variant indicates that an error occurred in the `LIVE SELECT` query"
),
None => {
return Err(Error::internal(
"Tried to take a query response that has already been taken"
.to_string(),
));
}
}
} else {
return Err(e);
}
}
};
streams.push(Stream {
client: stream.client.clone(),
id: mem::take(&mut stream.id),
rx: stream.rx.take(),
response_type: PhantomData,
});
}
Ok(crate::method::QueryStream(Either::Right(select_all(streams))))
}
}