tarantool_rs/client/
executor_ext.rs

1use async_trait::async_trait;
2use futures::{future::BoxFuture, FutureExt};
3use rmpv::Value;
4use serde::de::DeserializeOwned;
5
6use crate::{
7    codec::request::{
8        Call, Delete, EncodedRequest, Eval, Execute, Insert, Ping, Prepare, Replace, Request,
9        Select, Update, Upsert,
10    },
11    schema::{SchemaEntityKey, Space},
12    tuple::Tuple,
13    utils::extract_and_deserialize_iproto_data,
14    CallResponse, DmoResponse, Executor, IteratorType, PreparedSqlStatement, Result, SqlResponse,
15};
16
17/// Helper trait around [`Executor`] trait, which allows to send specific requests
18/// with any type, implementing `Execitor` trait.
19#[async_trait]
20pub trait ExecutorExt: Executor {
21    /// Send request, receiving raw response body.
22    ///
23    /// It is not recommended to use this method directly, since some requests
24    /// should be only sent in specific situations and might break connection.
25    fn send_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
26    where
27        R: Request;
28
29    /// Ping tarantool instance.
30    async fn ping(&self) -> Result<()> {
31        self.send_request(Ping {}).await.map(drop)
32    }
33
34    // TODO: add examples
35
36    /// Evaluate Lua expression.
37    ///
38    /// Check [docs][crate#deserializing-lua-responses-in-call-and-eval] on how to deserialize response.
39    async fn eval<A, I>(&self, expr: I, args: A) -> Result<CallResponse>
40    where
41        A: Tuple + Send,
42        I: AsRef<str> + Send + Sync,
43    {
44        Ok(CallResponse(
45            self.send_request(Eval::new(expr.as_ref(), args)).await?,
46        ))
47    }
48
49    /// Remotely call function in Tarantool.
50    ///
51    /// Check [docs][crate#deserializing-lua-responses-in-call-and-eval] on how to deserialize response.
52    async fn call<A, I>(&self, function_name: I, args: A) -> Result<CallResponse>
53    where
54        A: Tuple + Send,
55        I: AsRef<str> + Send + Sync,
56    {
57        Ok(CallResponse(
58            self.send_request(Call::new(function_name.as_ref(), args))
59                .await?,
60        ))
61    }
62
63    /// Select tuples from space.
64    async fn select<T, A>(
65        &self,
66        space_id: u32,
67        index_id: u32,
68        limit: Option<u32>,
69        offset: Option<u32>,
70        iterator: Option<IteratorType>,
71        keys: A,
72    ) -> Result<Vec<T>>
73    where
74        T: DeserializeOwned,
75        A: Tuple + Send,
76    {
77        let body = self
78            .send_request(Select::new(
79                space_id, index_id, limit, offset, iterator, keys,
80            ))
81            .await?;
82        extract_and_deserialize_iproto_data(body).map_err(Into::into)
83    }
84
85    /// Insert tuple.
86    async fn insert<T>(&self, space_id: u32, tuple: T) -> Result<DmoResponse>
87    where
88        T: Tuple + Send,
89    {
90        Ok(DmoResponse(
91            self.send_request(Insert::new(space_id, tuple)).await?,
92        ))
93    }
94
95    // TODO: docs and doctests for DmoOperation
96    /// Update tuple.
97    async fn update<K, O>(
98        &self,
99        space_id: u32,
100        index_id: u32,
101        keys: K,
102        ops: O,
103    ) -> Result<DmoResponse>
104    where
105        K: Tuple + Send,
106        O: Tuple + Send,
107    {
108        Ok(DmoResponse(
109            self.send_request(Update::new(space_id, index_id, keys, ops))
110                .await?,
111        ))
112    }
113
114    /// Update or insert tuple.
115    async fn upsert<T, O>(&self, space_id: u32, tuple: T, ops: O) -> Result<DmoResponse>
116    where
117        T: Tuple + Send,
118        O: Tuple + Send,
119    {
120        Ok(DmoResponse(
121            self.send_request(Upsert::new(space_id, ops, tuple)).await?,
122        ))
123    }
124
125    /// Insert a tuple into a space. If a tuple with the same primary key already exists,
126    /// replaces the existing tuple with a new one.
127    async fn replace<T>(&self, space_id: u32, tuple: T) -> Result<DmoResponse>
128    where
129        T: Tuple + Send,
130    {
131        Ok(DmoResponse(
132            self.send_request(Replace::new(space_id, tuple)).await?,
133        ))
134    }
135
136    /// Delete a tuple identified by the primary key.
137    async fn delete<T>(&self, space_id: u32, index_id: u32, keys: T) -> Result<DmoResponse>
138    where
139        T: Tuple + Send,
140    {
141        Ok(DmoResponse(
142            self.send_request(Delete::new(space_id, index_id, keys))
143                .await?,
144        ))
145    }
146
147    // TODO: options
148    // TODO: tests for SQL
149    /// Perform SQL query.
150    async fn execute_sql<T, I>(&self, query: I, binds: T) -> Result<SqlResponse>
151    where
152        T: Tuple + Send,
153        I: AsRef<str> + Send + Sync,
154    {
155        let query = query.as_ref();
156        let request = if let Some(stmt_id) = self.get_cached_sql_statement_id(query).await {
157            Execute::new_statement_id(stmt_id, binds)
158        } else {
159            Execute::new_query(query, binds)
160        };
161        Ok(SqlResponse(self.send_request(request).await?))
162    }
163
164    // TODO: add caching in case of user incorrectly uses prepared statements
165    /// Prepare SQL statement.
166    async fn prepare_sql<I>(&self, query: I) -> Result<PreparedSqlStatement<&Self>>
167    where
168        I: AsRef<str> + Send + Sync,
169    {
170        let response = self.send_request(Prepare::new(query.as_ref())).await?;
171        Ok(PreparedSqlStatement::from_prepare_response(response, self)?)
172    }
173
174    /// Find and load space by key.
175    ///
176    /// Can be called with space's index (if passed unsigned integer) or name (if passed `&str`).
177    ///
178    /// Returned [`Space`] object contains reference to current executor.
179    async fn space<K>(&self, key: K) -> Result<Option<Space<&Self>>>
180    where
181        Self: Sized + Send,
182        K: Into<SchemaEntityKey> + Send,
183    {
184        Space::load(self, key.into()).await
185    }
186
187    /// Find and load space by key, moving current executor into [`Space`].
188    ///
189    /// Can be called with space's index (if passed unsigned integer) or name (if passed `&str`).
190    ///
191    /// Returned [`Space`] object contains current executor.
192    async fn into_space<K>(self, key: K) -> Result<Option<Space<Self>>>
193    where
194        Self: Sized + Send,
195        K: Into<SchemaEntityKey> + Send,
196    {
197        Space::load(self, key.into()).await
198    }
199}
200
201#[async_trait]
202impl<E: Executor + ?Sized> ExecutorExt for E {
203    fn send_request<R>(&self, body: R) -> BoxFuture<Result<Value>>
204    where
205        R: Request,
206    {
207        let req = EncodedRequest::new(body, None);
208        async move { (*self).send_encoded_request(req?).await }.boxed()
209    }
210}
211
212#[cfg(test)]
213mod ui {
214    #![allow(unused)]
215
216    use crate::{Connection, Transaction};
217
218    use super::*;
219
220    fn executor_ext_on_connection_ref() {
221        async fn f(conn: &Connection) -> Space<&Connection> {
222            conn.space("space").await.unwrap().unwrap()
223        }
224    }
225
226    fn executor_ext_on_connection() {
227        async fn f(conn: Connection) -> Space<Connection> {
228            conn.into_space("space").await.unwrap().unwrap()
229        }
230    }
231
232    fn executor_ext_on_connection_cloned() {
233        async fn f(conn: &Connection) -> Space<Connection> {
234            conn.clone().into_space("space").await.unwrap().unwrap()
235        }
236    }
237
238    fn executor_ext_on_transaction_ref() {
239        async fn f(tx: &Transaction) -> Space<&Transaction> {
240            tx.space("space").await.unwrap().unwrap()
241        }
242    }
243
244    fn executor_ext_on_transaction() {
245        async fn f(tx: Transaction) {
246            let space_tx: Space<Transaction> = tx.into_space("space").await.unwrap().unwrap();
247            space_tx.delete((1,)).await.unwrap();
248            space_tx.commit().await.unwrap();
249        }
250    }
251}