use std::collections::HashSet;
use async_channel::{Receiver, Sender};
use surrealdb_core::dbs::QueryResult;
use uuid::Uuid;
use crate::method::BoxFuture;
use crate::opt::Endpoint;
use crate::types::{SurrealValue, Value};
use crate::{Error, ExtraFeatures, Result, Surreal};
pub(crate) mod cmd;
pub(crate) use cmd::Command;
use super::opt::Config;
#[derive(Debug)]
#[allow(dead_code, reason = "Used by the embedded and remote connections.")]
pub struct RequestData {
pub(crate) command: Command,
pub(crate) session_id: Uuid,
}
#[derive(Debug)]
#[allow(dead_code, reason = "Used by the embedded and remote connections.")]
pub(crate) struct Route {
#[allow(dead_code, reason = "Used in http and local non-wasm with ml features.")]
pub(crate) request: RequestData,
#[allow(dead_code, reason = "Used in http and local non-wasm with ml features.")]
pub(crate) response: Sender<std::result::Result<Vec<QueryResult>, surrealdb_types::Error>>,
}
#[derive(Debug, Clone)]
pub struct Router {
pub(crate) sender: Sender<Route>,
#[allow(dead_code)]
pub(crate) config: Config,
pub(crate) features: HashSet<ExtraFeatures>,
}
impl Router {
#[allow(clippy::type_complexity)]
pub(crate) fn send_command(
&self,
session_id: Uuid,
command: Command,
) -> BoxFuture<
'_,
Result<Receiver<std::result::Result<Vec<QueryResult>, surrealdb_types::Error>>>,
> {
Box::pin(async move {
let (sender, receiver) = async_channel::bounded(1);
let route = Route {
request: RequestData {
command,
session_id,
},
response: sender,
};
self.sender
.send(route)
.await
.map_err(|e| crate::Error::internal(format!("Failed to send command: {e}")))?;
Ok(receiver)
})
}
pub(crate) fn recv_value(
&self,
receiver: Receiver<std::result::Result<Vec<QueryResult>, surrealdb_types::Error>>,
) -> BoxFuture<'_, std::result::Result<Value, Error>> {
Box::pin(async move {
let response = receiver.recv().await.map_err(|_| {
crate::Error::connection(
"Connection uninitialised".to_string(),
Some(crate::types::ConnectionError::Uninitialised),
)
})?;
let mut results = response?;
match results.len() {
0 => Ok(Value::None),
1 => {
let result = results.remove(0);
result.result
}
_ => Err(crate::Error::internal(
"expected the database to return one or no results".to_string(),
)),
}
})
}
pub(crate) fn recv_results(
&self,
receiver: Receiver<std::result::Result<Vec<QueryResult>, surrealdb_types::Error>>,
) -> BoxFuture<'_, Result<Vec<QueryResult>>> {
Box::pin(async move {
receiver.recv().await.map_err(|_| {
crate::Error::connection(
"Connection uninitialised".to_string(),
Some(crate::types::ConnectionError::Uninitialised),
)
})?
})
}
pub(crate) fn execute<R>(&self, session_id: Uuid, command: Command) -> BoxFuture<'_, Result<R>>
where
R: SurrealValue,
{
Box::pin(async move {
let rx = self.send_command(session_id, command).await?;
let value = self.recv_value(rx).await?;
let result = match value {
Value::Array(array) if array.len() == 1 => {
R::from_value(array.into_iter().next().expect("array has exactly one element"))
}
v => R::from_value(v),
};
result.map_err(|e| crate::Error::internal(e.to_string()))
})
}
pub(crate) fn execute_opt<R>(
&self,
session_id: Uuid,
command: Command,
) -> BoxFuture<'_, Result<Option<R>>>
where
R: SurrealValue,
{
Box::pin(async move {
let rx = self.send_command(session_id, command).await?;
match self.recv_value(rx).await? {
Value::None | Value::Null => Ok(None),
Value::Array(array) => match array.len() {
0 => Ok(None),
1 => Ok(Some(
R::from_value(
array.into_iter().next().expect("array has exactly one element"),
)
.map_err(|e| crate::Error::internal(e.to_string()))?,
)),
_ => Ok(Some(
R::from_value(Value::Array(array))
.map_err(|e| crate::Error::internal(e.to_string()))?,
)),
},
value => Ok(Some(
R::from_value(value).map_err(|e| crate::Error::internal(e.to_string()))?,
)),
}
})
}
pub(crate) fn execute_vec<R>(
&self,
session_id: Uuid,
command: Command,
) -> BoxFuture<'_, Result<Vec<R>>>
where
R: SurrealValue,
{
Box::pin(async move {
let rx = self.send_command(session_id, command).await?;
match self.recv_value(rx).await? {
Value::None | Value::Null => Ok(Vec::new()),
Value::Array(array) => array
.into_iter()
.map(|v| R::from_value(v).map_err(|e| crate::Error::internal(e.to_string())))
.collect::<Result<Vec<R>>>(),
value => Ok(vec![
R::from_value(value).map_err(|e| crate::Error::internal(e.to_string()))?,
]),
}
})
}
pub(crate) fn execute_unit(
&self,
session_id: Uuid,
command: Command,
) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
let rx = self.send_command(session_id, command).await?;
match self.recv_value(rx).await? {
Value::None | Value::Null => Ok(()),
Value::Array(array) if array.is_empty() => Ok(()),
_value => Err(crate::Error::internal(
"expected the database to return nothing".to_string(),
)),
}
})
}
pub(crate) fn execute_value(
&self,
session_id: Uuid,
command: Command,
) -> BoxFuture<'_, Result<Value>> {
Box::pin(async move {
let rx = self.send_command(session_id, command).await?;
self.recv_value(rx).await
})
}
pub(crate) fn execute_query(
&self,
session_id: Uuid,
command: Command,
) -> BoxFuture<'_, Result<Vec<QueryResult>>> {
Box::pin(async move {
let rx = self.send_command(session_id, command).await?;
self.recv_results(rx).await
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct MlExportConfig {
#[allow(dead_code, reason = "Used in http and local non-wasm with ml features.")]
pub(crate) name: String,
#[allow(dead_code, reason = "Used in http and local non-wasm with ml features.")]
pub(crate) version: String,
}
pub trait Sealed: Sized + Send + Sync + 'static {
#[allow(private_interfaces)]
fn connect(
address: Endpoint,
capacity: usize,
session_clone: Option<crate::SessionClone>,
) -> BoxFuture<'static, Result<Surreal<Self>>>
where
Self: crate::Connection;
}