use async_trait::async_trait;
use futures::{future::BoxFuture, FutureExt};
use rmpv::Value;
use serde::de::DeserializeOwned;
use crate::{
codec::request::{
Call, Delete, EncodedRequest, Eval, Execute, Insert, Ping, Prepare, Replace, Request,
Select, Update, Upsert,
},
schema::{SchemaEntityKey, Space},
tuple::Tuple,
utils::extract_and_deserialize_iproto_data,
CallResponse, DmoResponse, Executor, IteratorType, PreparedSqlStatement, Result, SqlResponse,
};
#[async_trait]
pub trait ExecutorExt: Executor {
fn send_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
where
R: Request;
async fn ping(&self) -> Result<()> {
self.send_request(Ping {}).await.map(drop)
}
async fn eval<A, I>(&self, expr: I, args: A) -> Result<CallResponse>
where
A: Tuple + Send,
I: AsRef<str> + Send + Sync,
{
Ok(CallResponse(
self.send_request(Eval::new(expr.as_ref(), args)).await?,
))
}
async fn call<A, I>(&self, function_name: I, args: A) -> Result<CallResponse>
where
A: Tuple + Send,
I: AsRef<str> + Send + Sync,
{
Ok(CallResponse(
self.send_request(Call::new(function_name.as_ref(), args))
.await?,
))
}
async fn select<T, A>(
&self,
space_id: u32,
index_id: u32,
limit: Option<u32>,
offset: Option<u32>,
iterator: Option<IteratorType>,
keys: A,
) -> Result<Vec<T>>
where
T: DeserializeOwned,
A: Tuple + Send,
{
let body = self
.send_request(Select::new(
space_id, index_id, limit, offset, iterator, keys,
))
.await?;
extract_and_deserialize_iproto_data(body).map_err(Into::into)
}
async fn insert<T>(&self, space_id: u32, tuple: T) -> Result<DmoResponse>
where
T: Tuple + Send,
{
Ok(DmoResponse(
self.send_request(Insert::new(space_id, tuple)).await?,
))
}
async fn update<K, O>(
&self,
space_id: u32,
index_id: u32,
keys: K,
ops: O,
) -> Result<DmoResponse>
where
K: Tuple + Send,
O: Tuple + Send,
{
Ok(DmoResponse(
self.send_request(Update::new(space_id, index_id, keys, ops))
.await?,
))
}
async fn upsert<T, O>(&self, space_id: u32, tuple: T, ops: O) -> Result<DmoResponse>
where
T: Tuple + Send,
O: Tuple + Send,
{
Ok(DmoResponse(
self.send_request(Upsert::new(space_id, ops, tuple)).await?,
))
}
async fn replace<T>(&self, space_id: u32, tuple: T) -> Result<DmoResponse>
where
T: Tuple + Send,
{
Ok(DmoResponse(
self.send_request(Replace::new(space_id, tuple)).await?,
))
}
async fn delete<T>(&self, space_id: u32, index_id: u32, keys: T) -> Result<DmoResponse>
where
T: Tuple + Send,
{
Ok(DmoResponse(
self.send_request(Delete::new(space_id, index_id, keys))
.await?,
))
}
async fn execute_sql<T, I>(&self, query: I, binds: T) -> Result<SqlResponse>
where
T: Tuple + Send,
I: AsRef<str> + Send + Sync,
{
let query = query.as_ref();
let request = if let Some(stmt_id) = self.get_cached_sql_statement_id(query).await {
Execute::new_statement_id(stmt_id, binds)
} else {
Execute::new_query(query, binds)
};
Ok(SqlResponse(self.send_request(request).await?))
}
async fn prepare_sql<I>(&self, query: I) -> Result<PreparedSqlStatement<&Self>>
where
I: AsRef<str> + Send + Sync,
{
let response = self.send_request(Prepare::new(query.as_ref())).await?;
Ok(PreparedSqlStatement::from_prepare_response(response, self)?)
}
async fn space<K>(&self, key: K) -> Result<Option<Space<&Self>>>
where
Self: Sized + Send,
K: Into<SchemaEntityKey> + Send,
{
Space::load(self, key.into()).await
}
async fn into_space<K>(self, key: K) -> Result<Option<Space<Self>>>
where
Self: Sized + Send,
K: Into<SchemaEntityKey> + Send,
{
Space::load(self, key.into()).await
}
}
#[async_trait]
impl<E: Executor + ?Sized> ExecutorExt for E {
fn send_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
where
R: Request,
{
let req = EncodedRequest::new(body, None);
async move { (*self).send_encoded_request(req?).await }.boxed()
}
}
#[cfg(test)]
mod ui {
#![allow(unused)]
use crate::{Connection, Transaction};
use super::*;
fn executor_ext_on_connection_ref() {
async fn f(conn: &Connection) -> Space<&Connection> {
conn.space("space").await.unwrap().unwrap()
}
}
fn executor_ext_on_connection() {
async fn f(conn: Connection) -> Space<Connection> {
conn.into_space("space").await.unwrap().unwrap()
}
}
fn executor_ext_on_connection_cloned() {
async fn f(conn: &Connection) -> Space<Connection> {
conn.clone().into_space("space").await.unwrap().unwrap()
}
}
fn executor_ext_on_transaction_ref() {
async fn f(tx: &Transaction) -> Space<&Transaction> {
tx.space("space").await.unwrap().unwrap()
}
}
fn executor_ext_on_transaction() {
async fn f(tx: Transaction) {
let space_tx: Space<Transaction> = tx.into_space("space").await.unwrap().unwrap();
space_tx.delete((1,)).await.unwrap();
space_tx.commit().await.unwrap();
}
}
}