Skip to main content

rusty_tarantool/tarantool/
mod.rs

1pub use crate::tarantool::packets::{CommandPacket, TarantoolRequest, TarantoolResponse, TarantoolSqlResponse};
2pub use crate::tarantool::tools::{serialize_to_vec_u8, serialize_array};
3use futures_channel::mpsc;
4use futures_channel::oneshot;
5use serde::Serialize;
6use std::io;
7use std::sync::{Arc, Mutex, RwLock};
8
9pub mod codec;
10mod dispatch;
11pub mod packets;
12mod tools;
13
14use crate::tarantool::dispatch::{
15    CallbackSender, Dispatch, ERROR_CLIENT_DISCONNECTED, ERROR_DISPATCH_THREAD_IS_DEAD,
16};
17pub use crate::tarantool::dispatch::{ClientConfig, ClientStatus};
18use rmpv::Value;
19
20impl ClientConfig {
21    pub fn build(self) -> Client {
22        Client::new(self)
23    }
24}
25
26///iterator types
27pub enum IteratorType {
28    EQ = 0 , // key == x ASC order
29    REQ = 1, // key == x DESC order
30    ALL = 2, // all tuples
31    LT = 3, // key <  x
32    LE = 4, // key <= x
33    GE = 5, // key >= x
34    GT = 6, // key >  x
35    BitsAllSet = 7, // all bits from x are set in key
36    BitsAnySet = 8, // at least one x's bit is set
37    BitsAllNotSet = 9, // all bits are not set
38    OVERLAPS = 10, // key overlaps x
39    NEIGHBOR = 11,
40}
41
42///
43/// API to tarantool.
44///
45/// Create client by call build() on ClientConfig.
46///
47/// # Examples
48///
49/// let client = ClientConfig::new(addr, "rust", "rust").set_timeout_time_ms(1000).set_reconnect_time_ms(10000).build();
50///
51#[derive(Clone)]
52pub struct Client {
53    command_sender: mpsc::UnboundedSender<(CommandPacket, CallbackSender)>,
54    dispatch: Arc<Mutex<Option<Dispatch>>>,
55    status: Arc<RwLock<ClientStatus>>,
56    notify_callbacks: Arc<Mutex<Vec<dispatch::ReconnectNotifySender>>>,
57}
58
59pub trait ExecWithParamaters {
60    fn bind_raw(self, param: Vec<u8>) -> io::Result<Self>
61        where Self: std::marker::Sized;
62
63    ///bind named parameter
64    fn bind_named_ref<T : Serialize, T1:Into<String>>(self, name: T1, param: &T) -> io::Result<Self>
65        where Self: std::marker::Sized,
66
67    {
68        let mut name_s = name.into();
69        name_s.insert(0,':');
70
71        self.bind_raw(tools::serialize_one_element_map(
72            name_s,
73            tools::serialize_to_vec_u8(param)?)?)
74    }
75
76    ///bind parameter
77    fn bind_ref<T : Serialize>(self, param: &T) -> io::Result<Self>
78        where Self: std::marker::Sized
79    {
80        self.bind_raw(tools::serialize_to_vec_u8(param)?)
81    }
82
83    ///bind named parameter
84    fn bind_named<T, T1>(self, name: T1,param: T) -> io::Result<Self>
85        where T: Serialize,
86              Self: std::marker::Sized,
87              T1:Into<String>
88    {
89        self.bind_named_ref(name, &param)
90    }
91
92
93    ///bind parameter
94    fn bind<T>(self, param: T) -> io::Result<Self>
95        where T: Serialize,
96              Self: std::marker::Sized
97    {
98        self.bind_ref(&param)
99    }
100
101    ///bind null
102    fn bind_null(self) -> io::Result<Self>
103        where  Self: std::marker::Sized
104    {
105        self.bind_ref(&Value::Nil)
106    }
107
108    ///bind named null as parameter
109    fn bind_named_null<T1>(self, name: T1) -> io::Result<Self>
110        where  Self: std::marker::Sized,
111               T1:Into<String>
112    {
113        self.bind_named_ref(name, &Value::Nil)
114    }
115}
116
117pub struct PreparedSql {
118    client: Client,
119    sql: String,
120    params: Vec<Vec<u8>>
121}
122
123pub struct PreparedFunctionCall {
124    client: Client,
125    function: String,
126    params: Vec<Vec<u8>>
127}
128
129impl Client {
130    /// manually create client by consume Client Config
131    pub fn new(config: ClientConfig) -> Client {
132        let (command_sender, command_receiver) = mpsc::unbounded();
133
134        let status = Arc::new(RwLock::new(ClientStatus::Init));
135        let notify_callbacks = Arc::new(Mutex::new(Vec::new()));
136
137        Client {
138            command_sender,
139            dispatch: Arc::new(Mutex::new(Some(Dispatch::new(
140                config,
141                command_receiver,
142                status.clone(),
143                notify_callbacks.clone(),
144            )))),
145            status,
146            notify_callbacks,
147        }
148    }
149
150    /// prepare sql call
151    ///
152    ///  # Examples
153    ///
154    /// rust code
155    /// ```text
156    ///     let response : TarantoolSqlResponse<(u32, String)> = client
157    ///         .prepare_sql("select * from TABLE1 where COLUMN1=?")
158    ///         .bind(&1)?
159    ///         .execute().await?;
160    ///     let rows = response.decode_result_set()?;
161    pub fn prepare_sql<T>(&self, sql: T) -> PreparedSql
162        where   T:Into<String>
163    {
164        PreparedSql {
165            client : self.clone(),
166            sql: sql.into(),
167            params: vec![]
168        }
169    }
170
171    /// prepare call to stored procedure
172    ///
173    ///  # Examples
174    ///
175    /// rust code
176    /// ```text
177    /// let response = client
178    ///         .prepare_fn_call("test")
179    ///         .bind(&("aa", "aa"))?
180    ///         .bind(&1)?
181    ///         .execute().await?;
182    ///     let s: (Vec<String>, u64) = response.decode_pair()?;
183    pub  fn prepare_fn_call<T>(&self, function_name: T) -> PreparedFunctionCall
184     where T:Into<String>
185    {
186        PreparedFunctionCall {
187            client : self.clone(),
188            function: function_name.into(),
189            params: vec![]
190        }
191    }
192
193    /// return client status
194    pub fn get_status(&self) -> ClientStatus {
195        self.status.read().unwrap().clone()
196    }
197
198    /// return notify stream with connection statuses
199    pub fn subscribe_to_notify_stream(&self) -> mpsc::UnboundedReceiver<ClientStatus> {
200        let (callback_sender, callback_receiver) = mpsc::unbounded();
201        self.notify_callbacks.lock().unwrap().push(callback_sender);
202        callback_receiver
203    }
204
205    /// send any command you manually create, this method is low level and not intended to be used
206    pub async fn send_command(&self, req: CommandPacket) -> io::Result<TarantoolResponse> {
207        //        let dispatch = self.dispatch.clone();
208
209        if let Some(mut extracted_dispatch) = self.dispatch.clone().lock().unwrap().take() {
210            debug!("spawn coroutine!");
211            //lazy spawning main coroutine in first tarantool call
212            tokio::spawn(async move {
213                extracted_dispatch.run().await;
214            });
215        }
216
217        let (callback_sender, callback_receiver) = oneshot::channel();
218        let send_res = self.command_sender.unbounded_send((req, callback_sender));
219        if send_res.is_err() {
220            return Err(io::Error::new(
221                io::ErrorKind::Other,
222                ERROR_DISPATCH_THREAD_IS_DEAD,
223            ));
224        }
225        match callback_receiver.await {
226            Err(_) => Err(io::Error::new(
227                io::ErrorKind::Other,
228                ERROR_CLIENT_DISCONNECTED,
229            )),
230            Ok(res) => res,
231        }
232    }
233
234    /// call tarantool stored procedure
235    ///
236    /// params mast be serializable to MsgPack tuple by Serde - rust tuple or vector or struct (by default structs serialized as tuple)
237    ///
238    /// order of fields in serializes tuple is order of parameters of procedure
239    ///
240    ///  # Examples
241    ///
242    /// lua function on tarantool
243    /// ```lua
244    /// function test(a,b)
245    ///   return a,b,11
246    ///  end
247    /// ```
248    ///
249    /// rust code
250    /// ```text
251    ///  let response = client.call_fn("test", &(("aa", "aa"), 1)).await?;
252    ///  let s: (Vec<String>, Vec<u64>) = response.decode_pair()?;
253    ///  println!("resp value={:?}", s);
254    ///  assert_eq!((vec!["aa".to_string(), "aa".to_string()], vec![1]), s);
255    ///
256    #[inline(always)]
257    pub async fn call_fn<T>(&self, function: &str, params: &T) -> io::Result<TarantoolResponse>
258    where
259        T: Serialize,
260    {
261        self.send_command(CommandPacket::call(function, params).unwrap())
262            .await
263    }
264
265    ///call tarantool stored procedure with one parameter
266    ///
267    #[inline(always)]
268    pub async fn call_fn1<T1>(&self, function: &str, param1: &T1) -> io::Result<TarantoolResponse>
269    where
270        T1: Serialize,
271    {
272        self.send_command(CommandPacket::call(function, &(param1,)).unwrap())
273            .await
274    }
275
276    ///call tarantool stored procedure with two parameters
277    ///
278    /// # Examples
279    ///
280    ///```text
281    /// let response = client.call_fn2("test", &("param11", "param12") , &2).await?;
282    ///  let s: (Vec<String>, Vec<u64>) = response.decode_pair()?;
283    ///  println!("resp value={:?}", s);
284    ///  assert_eq!((vec!["aa".to_string(), "aa".to_string()], vec![1]), s);
285    ///
286    #[inline(always)]
287    pub async fn call_fn2<T1, T2>(
288        &self,
289        function: &str,
290        param1: &T1,
291        param2: &T2,
292    ) -> io::Result<TarantoolResponse>
293    where
294        T1: Serialize,
295        T2: Serialize,
296    {
297        self.send_command(CommandPacket::call(function, &(param1, param2)).unwrap())
298            .await
299    }
300
301    ///call tarantool stored procedure with three parameters
302    ///
303    #[inline(always)]
304    pub async fn call_fn3<T1, T2, T3>(
305        &self,
306        function: &str,
307        param1: &T1,
308        param2: &T2,
309        param3: &T3,
310    ) -> io::Result<TarantoolResponse>
311    where
312        T1: Serialize,
313        T2: Serialize,
314        T3: Serialize,
315    {
316        self.send_command(CommandPacket::call(function, &(param1, param2, param3)).unwrap())
317            .await
318    }
319
320    ///call tarantool stored procedure with four parameters
321    ///
322    #[inline(always)]
323    pub async fn call_fn4<T1, T2, T3, T4>(
324        &self,
325        function: &str,
326        param1: &T1,
327        param2: &T2,
328        param3: &T3,
329        param4: &T4,
330    ) -> io::Result<TarantoolResponse>
331    where
332        T1: Serialize,
333        T2: Serialize,
334        T3: Serialize,
335        T4: Serialize,
336    {
337        self.send_command(CommandPacket::call(function, &(param1, param2, param3, param4)).unwrap())
338            .await
339    }
340
341    ///call tarantool stored procedure with five parameters
342    ///
343    #[inline(always)]
344    pub async fn call_fn5<T1, T2, T3, T4, T5>(
345        &self,
346        function: &str,
347        param1: &T1,
348        param2: &T2,
349        param3: &T3,
350        param4: &T4,
351        param5: &T5,
352    ) -> io::Result<TarantoolResponse>
353    where
354        T1: Serialize,
355        T2: Serialize,
356        T3: Serialize,
357        T4: Serialize,
358        T5: Serialize,
359    {
360        self.send_command(
361            CommandPacket::call(function, &(param1, param2, param3, param4, param5)).unwrap(),
362        )
363        .await
364    }
365
366    ///call "select" from tarantool
367    /// - space - i32 space id
368    /// - index - i32 index id
369    /// - key - key part used for select, may be sequence (vec or tuple)
370    /// - offset - i32 select offset
371    /// - limit - i32 limit of rows
372    /// - iterator - type of iterator
373    ///
374    #[inline(always)]
375    pub async fn select<T>(
376        &self,
377        space: i32,
378        index: i32,
379        key: &T,
380        offset: i32,
381        limit: i32,
382        iterator: IteratorType,
383    ) -> io::Result<TarantoolResponse>
384    where
385        T: Serialize,
386    {
387        self.send_command(
388            CommandPacket::select(space, index, key, offset, limit, iterator as i32).unwrap(),
389        )
390        .await
391    }
392
393    ///insert tuple to space
394    /// - space - space id
395    /// - tuple - sequence of fields(can be vec or rust tuple)
396    ///
397    #[inline(always)]
398    pub async fn insert<T>(&self, space: i32, tuple: &T) -> io::Result<TarantoolResponse>
399    where
400        T: Serialize,
401    {
402        self.send_command(CommandPacket::insert(space, tuple).unwrap())
403            .await
404    }
405
406    #[inline(always)]
407    ///replace tuple in space by primary key
408    /// - space - space id
409    /// - tuple - sequence of fields(can be vec or rust tuple)
410    ///
411    /// # Examples
412    /// ```text
413    /// let tuple_replace= (3,"test_insert","replace");
414    /// client.replace(SPACE_ID, &tuple_replace).await?;
415    ///
416    pub async fn replace<T>(&self, space: i32, tuple: &T) -> io::Result<TarantoolResponse>
417    where
418        T: Serialize,
419    {
420        self.send_command(CommandPacket::replace(space, tuple).unwrap())
421            .await
422    }
423
424    #[inline(always)]
425    ///replace tuple in space by primary key - raw method if you have already serialized msgpack
426    /// - space - space id
427    /// - tuple - sequence of fields stored as msgpack
428    ///
429    /// # Examples
430    /// ```text
431    /// let tuple_replace= (3,"test_insert","replace");
432    /// let raw_buf = serialize_to_vec_u8(&tuple_replace).unwrap();
433    /// client.replace_raw(SPACE_ID, raw_buf).await?;
434    ///
435    pub async fn replace_raw(
436        &self,
437        space: i32,
438        tuple_raw: Vec<u8>,
439    ) -> io::Result<TarantoolResponse> {
440        self.send_command(CommandPacket::replace_raw(space, tuple_raw).unwrap())
441            .await
442    }
443
444    ///update row in tarantool
445    /// - space - space id
446    /// - key - sequence of fields(rust tuple or vec)
447    /// - args - sequence of update operations, for example (('=',2,"test_update"),)
448    ///
449    /// # Examples
450    /// ```text
451    /// let tuple= (3,"test_insert");
452    /// let update_op= (('=',2,"test_update"),);
453    /// client.update(SPACE_ID, &tuple, &update_op).await?;
454    ///
455    #[inline(always)]
456    pub async fn update<T, T2>(
457        &self,
458        space: i32,
459        key: &T2,
460        args: &T,
461    ) -> io::Result<TarantoolResponse>
462    where
463        T: Serialize,
464        T2: Serialize,
465    {
466        self.send_command(CommandPacket::update(space, key, args).unwrap())
467            .await
468    }
469
470    ///upsert row in tuple
471    ///
472    /// # Examples
473    /// ```text
474    /// let key= (4,"test_upsert");
475    /// let update_op= (('=',2,"test_update_upsert"),);
476    /// client.upsert(SPACE_ID,&key, &key,&update_op).await?;
477    ///
478    #[inline(always)]
479    pub async fn upsert<T, T2, T3>(
480        &self,
481        space: i32,
482        key: &T2,
483        def: &T3,
484        args: &T,
485    ) -> io::Result<TarantoolResponse>
486    where
487        T: Serialize,
488        T2: Serialize,
489        T3: Serialize,
490    {
491        self.send_command(CommandPacket::upsert(space, key, def, args).unwrap())
492            .await
493    }
494
495    ///delete row in space
496    ///
497    /// # Examples
498    /// ```text
499    /// let tuple= (3,"test_insert");
500    /// client.delete(SPACE_ID,&tuple).await?;
501    #[inline(always)]
502    pub async fn delete<T>(&self, space: i32, key: &T) -> io::Result<TarantoolResponse>
503    where
504        T: Serialize,
505    {
506        self.send_command(CommandPacket::delete(space, key).unwrap())
507            .await
508    }
509
510    ///eval expression in tarantool
511    ///
512    /// # Examples
513    ///
514    /// ```text
515    /// client.eval("return ...\n".to_string(),&(1,2)).await?
516    ///
517    #[inline(always)]
518    pub async fn eval<T, T1>(&self, expression: T1, args: &T) -> io::Result<TarantoolResponse>
519    where
520        T: Serialize,
521        T1: Into<String>
522    {
523        self.send_command(CommandPacket::eval(expression.into(), args).unwrap())
524            .await
525    }
526
527    ///eval sql expression in tarantool
528    ///
529    /// # Examples
530    ///
531    /// ```text
532    /// client.exec_sql("select * from TABLE1 where COLUMN1=?".to_string(), &(1,)).await?;
533    ///
534    #[inline(always)]
535    pub async fn exec_sql<T, T1>(&self, sql: T1, args: &T) -> io::Result<TarantoolResponse>
536        where
537            T: Serialize,
538            T1: Into<String>
539    {
540        self.send_command(CommandPacket::exec_sql(sql.into().as_str(), args).unwrap())
541            .await
542    }
543
544    // ///prepare sql expression in tarantool
545    // ///
546    // /// # Examples
547    // ///
548    // /// ```text
549    // /// client.prepare_stmt("select * from TABLE1 where COLUMN1=?".to_string(), &(1,)).await?;
550    // ///
551    // #[inline(always)]
552    // pub async fn prepare_stmt(&self, sql: String) -> io::Result<TarantoolResponse>
553    // {
554    //     self.send_command(CommandPacket::prepare_stmt(sql).unwrap())
555    //         .await
556    // }
557
558    ///ping tarantool server, return empty response in success
559    ///
560    /// # Examples
561    ///
562    /// ```text
563    /// client.ping().await?
564    ///
565    #[inline(always)]
566    pub async fn ping(&self) -> io::Result<TarantoolResponse> {
567        self.send_command(CommandPacket::ping().unwrap()).await
568    }
569}
570
571
572impl ExecWithParamaters for PreparedSql {
573
574    ///bind raw parameter
575    fn bind_raw(mut self, param: Vec<u8>) -> io::Result<PreparedSql>  {
576        self.params.push(param);
577        Ok(self)
578    }
579}
580
581
582impl PreparedSql {
583
584    ///eval sql expression in tarantool
585    ///
586    /// # Examples
587    ///
588    /// ```text
589    /// let response = client
590    //         .prepare_sql("select * from TABLE1 where COLUMN1=?")
591    //         .bind(1)?
592    //         .execute().await?;
593    ///
594    #[inline(always)]
595    pub async fn execute(self) -> io::Result<TarantoolSqlResponse>
596    {
597        self.client.send_command(CommandPacket::exec_sql_raw(&self.sql.as_str(), serialize_array(&self.params)?).unwrap())
598            .await.map(|val| val.into())
599    }
600}
601
602impl ExecWithParamaters for PreparedFunctionCall {
603
604    ///bind raw parameter
605    fn bind_raw(mut self, param: Vec<u8>) -> io::Result<PreparedFunctionCall>  {
606        self.params.push(param);
607        Ok(self)
608    }
609}
610
611impl PreparedFunctionCall {
612    /// call tarantool stored procedure
613    ///
614    /// params mast be serializable to MsgPack tuple by Serde - rust tuple or vector or struct (by default structs serialized as tuple)
615    ///
616    /// order of fields in serializes tuple is order od parameters of procedure
617    ///
618    ///  # Examples
619    ///
620    /// lua function on tarantool
621    /// ```lua
622    /// function test(a,b)
623    ///   return a,b,11
624    ///  end
625    /// ```
626    ///
627    /// rust code
628    /// ```text
629    ///  let response = client
630    ///         .prepare_fn_call("test")
631    ///         .bind_ref(&("aa", "aa"))?
632    ///         .bind(1)?
633    ///         .execute().await?;
634    ///
635    #[inline(always)]
636    pub async fn execute(self) -> io::Result<TarantoolResponse>
637    {
638        self.client.send_command(CommandPacket::call_raw(self.function.as_str(), serialize_array(&self.params)?).unwrap())
639            .await
640    }
641}