1use async_trait::async_trait;
2use futures::{future::BoxFuture, FutureExt};
3use rmpv::Value;
4use serde::de::DeserializeOwned;
5
6use crate::{
7 codec::request::{
8 Call, Delete, EncodedRequest, Eval, Execute, Insert, Ping, Prepare, Replace, Request,
9 Select, Update, Upsert,
10 },
11 schema::{SchemaEntityKey, Space},
12 tuple::Tuple,
13 utils::extract_and_deserialize_iproto_data,
14 CallResponse, DmoResponse, Executor, IteratorType, PreparedSqlStatement, Result, SqlResponse,
15};
16
17#[async_trait]
20pub trait ExecutorExt: Executor {
21 fn send_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
26 where
27 R: Request;
28
29 async fn ping(&self) -> Result<()> {
31 self.send_request(Ping {}).await.map(drop)
32 }
33
34 async fn eval<A, I>(&self, expr: I, args: A) -> Result<CallResponse>
40 where
41 A: Tuple + Send,
42 I: AsRef<str> + Send + Sync,
43 {
44 Ok(CallResponse(
45 self.send_request(Eval::new(expr.as_ref(), args)).await?,
46 ))
47 }
48
49 async fn call<A, I>(&self, function_name: I, args: A) -> Result<CallResponse>
53 where
54 A: Tuple + Send,
55 I: AsRef<str> + Send + Sync,
56 {
57 Ok(CallResponse(
58 self.send_request(Call::new(function_name.as_ref(), args))
59 .await?,
60 ))
61 }
62
63 async fn select<T, A>(
65 &self,
66 space_id: u32,
67 index_id: u32,
68 limit: Option<u32>,
69 offset: Option<u32>,
70 iterator: Option<IteratorType>,
71 keys: A,
72 ) -> Result<Vec<T>>
73 where
74 T: DeserializeOwned,
75 A: Tuple + Send,
76 {
77 let body = self
78 .send_request(Select::new(
79 space_id, index_id, limit, offset, iterator, keys,
80 ))
81 .await?;
82 extract_and_deserialize_iproto_data(body).map_err(Into::into)
83 }
84
85 async fn insert<T>(&self, space_id: u32, tuple: T) -> Result<DmoResponse>
87 where
88 T: Tuple + Send,
89 {
90 Ok(DmoResponse(
91 self.send_request(Insert::new(space_id, tuple)).await?,
92 ))
93 }
94
95 async fn update<K, O>(
98 &self,
99 space_id: u32,
100 index_id: u32,
101 keys: K,
102 ops: O,
103 ) -> Result<DmoResponse>
104 where
105 K: Tuple + Send,
106 O: Tuple + Send,
107 {
108 Ok(DmoResponse(
109 self.send_request(Update::new(space_id, index_id, keys, ops))
110 .await?,
111 ))
112 }
113
114 async fn upsert<T, O>(&self, space_id: u32, tuple: T, ops: O) -> Result<DmoResponse>
116 where
117 T: Tuple + Send,
118 O: Tuple + Send,
119 {
120 Ok(DmoResponse(
121 self.send_request(Upsert::new(space_id, ops, tuple)).await?,
122 ))
123 }
124
125 async fn replace<T>(&self, space_id: u32, tuple: T) -> Result<DmoResponse>
128 where
129 T: Tuple + Send,
130 {
131 Ok(DmoResponse(
132 self.send_request(Replace::new(space_id, tuple)).await?,
133 ))
134 }
135
136 async fn delete<T>(&self, space_id: u32, index_id: u32, keys: T) -> Result<DmoResponse>
138 where
139 T: Tuple + Send,
140 {
141 Ok(DmoResponse(
142 self.send_request(Delete::new(space_id, index_id, keys))
143 .await?,
144 ))
145 }
146
147 async fn execute_sql<T, I>(&self, query: I, binds: T) -> Result<SqlResponse>
151 where
152 T: Tuple + Send,
153 I: AsRef<str> + Send + Sync,
154 {
155 let query = query.as_ref();
156 let request = if let Some(stmt_id) = self.get_cached_sql_statement_id(query).await {
157 Execute::new_statement_id(stmt_id, binds)
158 } else {
159 Execute::new_query(query, binds)
160 };
161 Ok(SqlResponse(self.send_request(request).await?))
162 }
163
164 async fn prepare_sql<I>(&self, query: I) -> Result<PreparedSqlStatement<&Self>>
167 where
168 I: AsRef<str> + Send + Sync,
169 {
170 let response = self.send_request(Prepare::new(query.as_ref())).await?;
171 Ok(PreparedSqlStatement::from_prepare_response(response, self)?)
172 }
173
174 async fn space<K>(&self, key: K) -> Result<Option<Space<&Self>>>
180 where
181 Self: Sized + Send,
182 K: Into<SchemaEntityKey> + Send,
183 {
184 Space::load(self, key.into()).await
185 }
186
187 async fn into_space<K>(self, key: K) -> Result<Option<Space<Self>>>
193 where
194 Self: Sized + Send,
195 K: Into<SchemaEntityKey> + Send,
196 {
197 Space::load(self, key.into()).await
198 }
199}
200
201#[async_trait]
202impl<E: Executor + ?Sized> ExecutorExt for E {
203 fn send_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
204 where
205 R: Request,
206 {
207 let req = EncodedRequest::new(body, None);
208 async move { (*self).send_encoded_request(req?).await }.boxed()
209 }
210}
211
212#[cfg(test)]
213mod ui {
214 #![allow(unused)]
215
216 use crate::{Connection, Transaction};
217
218 use super::*;
219
220 fn executor_ext_on_connection_ref() {
221 async fn f(conn: &Connection) -> Space<&Connection> {
222 conn.space("space").await.unwrap().unwrap()
223 }
224 }
225
226 fn executor_ext_on_connection() {
227 async fn f(conn: Connection) -> Space<Connection> {
228 conn.into_space("space").await.unwrap().unwrap()
229 }
230 }
231
232 fn executor_ext_on_connection_cloned() {
233 async fn f(conn: &Connection) -> Space<Connection> {
234 conn.clone().into_space("space").await.unwrap().unwrap()
235 }
236 }
237
238 fn executor_ext_on_transaction_ref() {
239 async fn f(tx: &Transaction) -> Space<&Transaction> {
240 tx.space("space").await.unwrap().unwrap()
241 }
242 }
243
244 fn executor_ext_on_transaction() {
245 async fn f(tx: Transaction) {
246 let space_tx: Space<Transaction> = tx.into_space("space").await.unwrap().unwrap();
247 space_tx.delete((1,)).await.unwrap();
248 space_tx.commit().await.unwrap();
249 }
250 }
251}