1pub use crate::tarantool::packets::{CommandPacket, TarantoolRequest, TarantoolResponse, TarantoolSqlResponse};
2pub use crate::tarantool::tools::{serialize_to_vec_u8, serialize_array};
3use futures_channel::mpsc;
4use futures_channel::oneshot;
5use serde::Serialize;
6use std::io;
7use std::sync::{Arc, Mutex, RwLock};
8
9pub mod codec;
10mod dispatch;
11pub mod packets;
12mod tools;
13
14use crate::tarantool::dispatch::{
15 CallbackSender, Dispatch, ERROR_CLIENT_DISCONNECTED, ERROR_DISPATCH_THREAD_IS_DEAD,
16};
17pub use crate::tarantool::dispatch::{ClientConfig, ClientStatus};
18use rmpv::Value;
19
20impl ClientConfig {
21 pub fn build(self) -> Client {
22 Client::new(self)
23 }
24}
25
26pub enum IteratorType {
28 EQ = 0 , REQ = 1, ALL = 2, LT = 3, LE = 4, GE = 5, GT = 6, BitsAllSet = 7, BitsAnySet = 8, BitsAllNotSet = 9, OVERLAPS = 10, NEIGHBOR = 11,
40}
41
42#[derive(Clone)]
52pub struct Client {
53 command_sender: mpsc::UnboundedSender<(CommandPacket, CallbackSender)>,
54 dispatch: Arc<Mutex<Option<Dispatch>>>,
55 status: Arc<RwLock<ClientStatus>>,
56 notify_callbacks: Arc<Mutex<Vec<dispatch::ReconnectNotifySender>>>,
57}
58
59pub trait ExecWithParamaters {
60 fn bind_raw(self, param: Vec<u8>) -> io::Result<Self>
61 where Self: std::marker::Sized;
62
63 fn bind_named_ref<T : Serialize, T1:Into<String>>(self, name: T1, param: &T) -> io::Result<Self>
65 where Self: std::marker::Sized,
66
67 {
68 let mut name_s = name.into();
69 name_s.insert(0,':');
70
71 self.bind_raw(tools::serialize_one_element_map(
72 name_s,
73 tools::serialize_to_vec_u8(param)?)?)
74 }
75
76 fn bind_ref<T : Serialize>(self, param: &T) -> io::Result<Self>
78 where Self: std::marker::Sized
79 {
80 self.bind_raw(tools::serialize_to_vec_u8(param)?)
81 }
82
83 fn bind_named<T, T1>(self, name: T1,param: T) -> io::Result<Self>
85 where T: Serialize,
86 Self: std::marker::Sized,
87 T1:Into<String>
88 {
89 self.bind_named_ref(name, ¶m)
90 }
91
92
93 fn bind<T>(self, param: T) -> io::Result<Self>
95 where T: Serialize,
96 Self: std::marker::Sized
97 {
98 self.bind_ref(¶m)
99 }
100
101 fn bind_null(self) -> io::Result<Self>
103 where Self: std::marker::Sized
104 {
105 self.bind_ref(&Value::Nil)
106 }
107
108 fn bind_named_null<T1>(self, name: T1) -> io::Result<Self>
110 where Self: std::marker::Sized,
111 T1:Into<String>
112 {
113 self.bind_named_ref(name, &Value::Nil)
114 }
115}
116
117pub struct PreparedSql {
118 client: Client,
119 sql: String,
120 params: Vec<Vec<u8>>
121}
122
123pub struct PreparedFunctionCall {
124 client: Client,
125 function: String,
126 params: Vec<Vec<u8>>
127}
128
129impl Client {
130 pub fn new(config: ClientConfig) -> Client {
132 let (command_sender, command_receiver) = mpsc::unbounded();
133
134 let status = Arc::new(RwLock::new(ClientStatus::Init));
135 let notify_callbacks = Arc::new(Mutex::new(Vec::new()));
136
137 Client {
138 command_sender,
139 dispatch: Arc::new(Mutex::new(Some(Dispatch::new(
140 config,
141 command_receiver,
142 status.clone(),
143 notify_callbacks.clone(),
144 )))),
145 status,
146 notify_callbacks,
147 }
148 }
149
150 pub fn prepare_sql<T>(&self, sql: T) -> PreparedSql
162 where T:Into<String>
163 {
164 PreparedSql {
165 client : self.clone(),
166 sql: sql.into(),
167 params: vec![]
168 }
169 }
170
171 pub fn prepare_fn_call<T>(&self, function_name: T) -> PreparedFunctionCall
184 where T:Into<String>
185 {
186 PreparedFunctionCall {
187 client : self.clone(),
188 function: function_name.into(),
189 params: vec![]
190 }
191 }
192
193 pub fn get_status(&self) -> ClientStatus {
195 self.status.read().unwrap().clone()
196 }
197
198 pub fn subscribe_to_notify_stream(&self) -> mpsc::UnboundedReceiver<ClientStatus> {
200 let (callback_sender, callback_receiver) = mpsc::unbounded();
201 self.notify_callbacks.lock().unwrap().push(callback_sender);
202 callback_receiver
203 }
204
205 pub async fn send_command(&self, req: CommandPacket) -> io::Result<TarantoolResponse> {
207 if let Some(mut extracted_dispatch) = self.dispatch.clone().lock().unwrap().take() {
210 debug!("spawn coroutine!");
211 tokio::spawn(async move {
213 extracted_dispatch.run().await;
214 });
215 }
216
217 let (callback_sender, callback_receiver) = oneshot::channel();
218 let send_res = self.command_sender.unbounded_send((req, callback_sender));
219 if send_res.is_err() {
220 return Err(io::Error::new(
221 io::ErrorKind::Other,
222 ERROR_DISPATCH_THREAD_IS_DEAD,
223 ));
224 }
225 match callback_receiver.await {
226 Err(_) => Err(io::Error::new(
227 io::ErrorKind::Other,
228 ERROR_CLIENT_DISCONNECTED,
229 )),
230 Ok(res) => res,
231 }
232 }
233
234 #[inline(always)]
257 pub async fn call_fn<T>(&self, function: &str, params: &T) -> io::Result<TarantoolResponse>
258 where
259 T: Serialize,
260 {
261 self.send_command(CommandPacket::call(function, params).unwrap())
262 .await
263 }
264
265 #[inline(always)]
268 pub async fn call_fn1<T1>(&self, function: &str, param1: &T1) -> io::Result<TarantoolResponse>
269 where
270 T1: Serialize,
271 {
272 self.send_command(CommandPacket::call(function, &(param1,)).unwrap())
273 .await
274 }
275
276 #[inline(always)]
287 pub async fn call_fn2<T1, T2>(
288 &self,
289 function: &str,
290 param1: &T1,
291 param2: &T2,
292 ) -> io::Result<TarantoolResponse>
293 where
294 T1: Serialize,
295 T2: Serialize,
296 {
297 self.send_command(CommandPacket::call(function, &(param1, param2)).unwrap())
298 .await
299 }
300
301 #[inline(always)]
304 pub async fn call_fn3<T1, T2, T3>(
305 &self,
306 function: &str,
307 param1: &T1,
308 param2: &T2,
309 param3: &T3,
310 ) -> io::Result<TarantoolResponse>
311 where
312 T1: Serialize,
313 T2: Serialize,
314 T3: Serialize,
315 {
316 self.send_command(CommandPacket::call(function, &(param1, param2, param3)).unwrap())
317 .await
318 }
319
320 #[inline(always)]
323 pub async fn call_fn4<T1, T2, T3, T4>(
324 &self,
325 function: &str,
326 param1: &T1,
327 param2: &T2,
328 param3: &T3,
329 param4: &T4,
330 ) -> io::Result<TarantoolResponse>
331 where
332 T1: Serialize,
333 T2: Serialize,
334 T3: Serialize,
335 T4: Serialize,
336 {
337 self.send_command(CommandPacket::call(function, &(param1, param2, param3, param4)).unwrap())
338 .await
339 }
340
341 #[inline(always)]
344 pub async fn call_fn5<T1, T2, T3, T4, T5>(
345 &self,
346 function: &str,
347 param1: &T1,
348 param2: &T2,
349 param3: &T3,
350 param4: &T4,
351 param5: &T5,
352 ) -> io::Result<TarantoolResponse>
353 where
354 T1: Serialize,
355 T2: Serialize,
356 T3: Serialize,
357 T4: Serialize,
358 T5: Serialize,
359 {
360 self.send_command(
361 CommandPacket::call(function, &(param1, param2, param3, param4, param5)).unwrap(),
362 )
363 .await
364 }
365
366 #[inline(always)]
375 pub async fn select<T>(
376 &self,
377 space: i32,
378 index: i32,
379 key: &T,
380 offset: i32,
381 limit: i32,
382 iterator: IteratorType,
383 ) -> io::Result<TarantoolResponse>
384 where
385 T: Serialize,
386 {
387 self.send_command(
388 CommandPacket::select(space, index, key, offset, limit, iterator as i32).unwrap(),
389 )
390 .await
391 }
392
393 #[inline(always)]
398 pub async fn insert<T>(&self, space: i32, tuple: &T) -> io::Result<TarantoolResponse>
399 where
400 T: Serialize,
401 {
402 self.send_command(CommandPacket::insert(space, tuple).unwrap())
403 .await
404 }
405
406 #[inline(always)]
407 pub async fn replace<T>(&self, space: i32, tuple: &T) -> io::Result<TarantoolResponse>
417 where
418 T: Serialize,
419 {
420 self.send_command(CommandPacket::replace(space, tuple).unwrap())
421 .await
422 }
423
424 #[inline(always)]
425 pub async fn replace_raw(
436 &self,
437 space: i32,
438 tuple_raw: Vec<u8>,
439 ) -> io::Result<TarantoolResponse> {
440 self.send_command(CommandPacket::replace_raw(space, tuple_raw).unwrap())
441 .await
442 }
443
444 #[inline(always)]
456 pub async fn update<T, T2>(
457 &self,
458 space: i32,
459 key: &T2,
460 args: &T,
461 ) -> io::Result<TarantoolResponse>
462 where
463 T: Serialize,
464 T2: Serialize,
465 {
466 self.send_command(CommandPacket::update(space, key, args).unwrap())
467 .await
468 }
469
470 #[inline(always)]
479 pub async fn upsert<T, T2, T3>(
480 &self,
481 space: i32,
482 key: &T2,
483 def: &T3,
484 args: &T,
485 ) -> io::Result<TarantoolResponse>
486 where
487 T: Serialize,
488 T2: Serialize,
489 T3: Serialize,
490 {
491 self.send_command(CommandPacket::upsert(space, key, def, args).unwrap())
492 .await
493 }
494
495 #[inline(always)]
502 pub async fn delete<T>(&self, space: i32, key: &T) -> io::Result<TarantoolResponse>
503 where
504 T: Serialize,
505 {
506 self.send_command(CommandPacket::delete(space, key).unwrap())
507 .await
508 }
509
510 #[inline(always)]
518 pub async fn eval<T, T1>(&self, expression: T1, args: &T) -> io::Result<TarantoolResponse>
519 where
520 T: Serialize,
521 T1: Into<String>
522 {
523 self.send_command(CommandPacket::eval(expression.into(), args).unwrap())
524 .await
525 }
526
527 #[inline(always)]
535 pub async fn exec_sql<T, T1>(&self, sql: T1, args: &T) -> io::Result<TarantoolResponse>
536 where
537 T: Serialize,
538 T1: Into<String>
539 {
540 self.send_command(CommandPacket::exec_sql(sql.into().as_str(), args).unwrap())
541 .await
542 }
543
544 #[inline(always)]
566 pub async fn ping(&self) -> io::Result<TarantoolResponse> {
567 self.send_command(CommandPacket::ping().unwrap()).await
568 }
569}
570
571
572impl ExecWithParamaters for PreparedSql {
573
574 fn bind_raw(mut self, param: Vec<u8>) -> io::Result<PreparedSql> {
576 self.params.push(param);
577 Ok(self)
578 }
579}
580
581
582impl PreparedSql {
583
584 #[inline(always)]
595 pub async fn execute(self) -> io::Result<TarantoolSqlResponse>
596 {
597 self.client.send_command(CommandPacket::exec_sql_raw(&self.sql.as_str(), serialize_array(&self.params)?).unwrap())
598 .await.map(|val| val.into())
599 }
600}
601
602impl ExecWithParamaters for PreparedFunctionCall {
603
604 fn bind_raw(mut self, param: Vec<u8>) -> io::Result<PreparedFunctionCall> {
606 self.params.push(param);
607 Ok(self)
608 }
609}
610
611impl PreparedFunctionCall {
612 #[inline(always)]
636 pub async fn execute(self) -> io::Result<TarantoolResponse>
637 {
638 self.client.send_command(CommandPacket::call_raw(self.function.as_str(), serialize_array(&self.params)?).unwrap())
639 .await
640 }
641}