use crate::api;
use crate::api::err::Error;
use crate::api::method::query::Response;
use crate::api::method::BoxFuture;
use crate::api::opt::Endpoint;
use crate::api::ExtraFeatures;
use crate::api::Result;
use crate::api::Surreal;
use crate::Value;
use channel::Receiver;
use channel::Sender;
use serde::de::DeserializeOwned;
use std::collections::HashSet;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use surrealdb_core::sql::{from_value as from_core_value, Value as CoreValue};
mod cmd;
pub(crate) use cmd::Command;
#[cfg(feature = "protocol-http")]
pub(crate) use cmd::RouterRequest;
#[derive(Debug)]
#[allow(dead_code)] pub struct RequestData {
pub(crate) id: i64,
pub(crate) command: Command,
}
#[derive(Debug)]
#[allow(dead_code)] pub(crate) struct Route {
pub(crate) request: RequestData,
pub(crate) response: Sender<Result<DbResponse>>,
}
#[derive(Debug)]
pub struct Router {
pub(crate) sender: Sender<Route>,
pub(crate) last_id: AtomicI64,
pub(crate) features: HashSet<ExtraFeatures>,
}
impl Router {
pub(crate) fn next_id(&self) -> i64 {
self.last_id.fetch_add(1, Ordering::SeqCst)
}
pub(crate) fn send(
&self,
command: Command,
) -> BoxFuture<'_, Result<Receiver<Result<DbResponse>>>> {
Box::pin(async move {
let id = self.next_id();
let (sender, receiver) = channel::bounded(1);
let route = Route {
request: RequestData {
id,
command,
},
response: sender,
};
self.sender.send(route).await?;
Ok(receiver)
})
}
pub(crate) fn recv(
&self,
receiver: Receiver<Result<DbResponse>>,
) -> BoxFuture<'_, Result<CoreValue>> {
Box::pin(async move {
let response = receiver.recv().await?;
match response? {
DbResponse::Other(value) => Ok(value),
DbResponse::Query(..) => unreachable!(),
}
})
}
pub(crate) fn recv_query(
&self,
receiver: Receiver<Result<DbResponse>>,
) -> BoxFuture<'_, Result<Response>> {
Box::pin(async move {
let response = receiver.recv().await?;
match response? {
DbResponse::Query(results) => Ok(results),
DbResponse::Other(..) => unreachable!(),
}
})
}
pub(crate) fn execute<R>(&self, command: Command) -> BoxFuture<'_, Result<R>>
where
R: DeserializeOwned,
{
Box::pin(async move {
let rx = self.send(command).await?;
let value = self.recv(rx).await?;
from_core_value(value).map_err(Into::into)
})
}
pub(crate) fn execute_opt<R>(&self, command: Command) -> BoxFuture<'_, Result<Option<R>>>
where
R: DeserializeOwned,
{
Box::pin(async move {
let rx = self.send(command).await?;
match self.recv(rx).await? {
CoreValue::None | CoreValue::Null => Ok(None),
value => from_core_value(value).map_err(Into::into),
}
})
}
pub(crate) fn execute_vec<R>(&self, command: Command) -> BoxFuture<'_, Result<Vec<R>>>
where
R: DeserializeOwned,
{
Box::pin(async move {
let rx = self.send(command).await?;
let value = match self.recv(rx).await? {
CoreValue::None | CoreValue::Null => return Ok(Vec::new()),
CoreValue::Array(array) => CoreValue::Array(array),
value => vec![value].into(),
};
from_core_value(value).map_err(Into::into)
})
}
pub(crate) fn execute_unit(&self, command: Command) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
let rx = self.send(command).await?;
match self.recv(rx).await? {
CoreValue::None | CoreValue::Null => Ok(()),
CoreValue::Array(array) if array.is_empty() => Ok(()),
value => Err(Error::FromValue {
value: Value::from_inner(value),
error: "expected the database to return nothing".to_owned(),
}
.into()),
}
})
}
pub(crate) fn execute_value(&self, command: Command) -> BoxFuture<'_, Result<Value>> {
Box::pin(async move {
let rx = self.send(command).await?;
Ok(Value::from_inner(self.recv(rx).await?))
})
}
pub(crate) fn execute_query(&self, command: Command) -> BoxFuture<'_, Result<Response>> {
Box::pin(async move {
let rx = self.send(command).await?;
self.recv_query(rx).await
})
}
}
#[derive(Debug)]
pub enum DbResponse {
Query(Response),
Other(CoreValue),
}
#[derive(Debug, Clone)]
pub(crate) struct MlExportConfig {
#[allow(dead_code)]
pub(crate) name: String,
#[allow(dead_code)]
pub(crate) version: String,
}
pub trait Connection: Sized + Send + Sync + 'static {
fn connect(address: Endpoint, capacity: usize) -> BoxFuture<'static, Result<Surreal<Self>>>
where
Self: api::Connection;
}