tarantool/network/client/
mod.rs

1//! Tarantool based async [`Client`].
2//!
3//! Can be used only from inside Tarantool as it makes heavy use of fibers and coio.
4//!
5//! # Example
6//! ```no_run
7//! # async {
8//! use tarantool::network::client::Client;
9//! // Most of the client's methods are in the `AsClient` trait
10//! use tarantool::network::client::AsClient as _;
11//!
12//! let client = Client::connect("localhost", 3301).await.unwrap();
13//! client.ping().await.unwrap();
14//!
15//! // Requests can also be easily combined with fiber::r#async::timeout
16//! use tarantool::fiber::r#async::timeout::IntoTimeout as _;
17//! use std::time::Duration;
18//!
19//! client.ping().timeout(Duration::from_secs(10)).await.unwrap();
20//! # };
21//! ```
22//!
23//! # Reusing Connection
24//! Client can be cloned, and safely moved to a different fiber if needed, to reuse the same connection.
25//! When multiple fibers use the same connection, all requests are pipelined through the same network socket, but each fiber
26//! gets back a correct response. Reducing the number of active sockets lowers the overhead of system calls and increases
27//! the overall server performance.
28//!
29//! # Implementation
30//! Internally the client uses [`Protocol`] to get bytes that it needs to send
31//! and push bytes that it gets from the network.
32//!
33//! On creation the client spawns sender and receiver worker threads. Which in turn
34//! use coio based [`TcpStream`] as the transport layer.
35
36pub mod reconnect;
37pub mod tcp;
38
39use std::collections::HashMap;
40use std::io::Cursor;
41use std::rc::Rc;
42use std::sync::Arc;
43use std::time::Duration;
44
45use self::tcp::TcpStream;
46
47use super::protocol::api::{Call, Eval, Execute, Ping, Request};
48use super::protocol::{self, Protocol, SyncIndex};
49use crate::error;
50use crate::error::BoxError;
51use crate::fiber;
52use crate::fiber::r#async::oneshot;
53use crate::fiber::r#async::IntoOnDrop as _;
54use crate::fiber::FiberId;
55use crate::fiber::NoYieldsRefCell;
56use crate::tuple::{ToTupleBuffer, Tuple};
57use crate::unwrap_ok_or;
58
59use futures::{AsyncReadExt, AsyncWriteExt};
60
61#[deprecated = "use `ClientError` instead"]
62pub type Error = ClientError;
63
64/// Error returned by [`Client`].
65#[derive(thiserror::Error, Debug)]
66pub enum ClientError {
67    /// The connection was closed because of this error.
68    ///
69    /// The error is wrapped in a [`Arc`], because some libraries require
70    /// error types to implement [`Sync`], which isn't implemented for [`Rc`].
71    #[error("{0}")]
72    ConnectionClosed(Arc<crate::error::Error>),
73
74    /// Error happened during encoding of the request.
75    ///
76    /// The error is wrapped in a [`Arc`], because some libraries require
77    /// error types to implement [`Sync`], which isn't implemented for [`Rc`].
78    #[error("{0}")]
79    RequestEncode(crate::error::Error),
80
81    /// Error happened during decoding of the response.
82    ///
83    /// The error is wrapped in a [`Arc`], because some libraries require
84    /// error types to implement [`Sync`], which isn't implemented for [`Rc`].
85    #[error("{0}")]
86    ResponseDecode(crate::error::Error),
87
88    /// Service responded with an error.
89    ///
90    /// The error is wrapped in a [`Arc`], because some libraries require
91    /// error types to implement [`Sync`], which isn't implemented for [`Rc`].
92    #[error("{0}")]
93    ErrorResponse(BoxError),
94}
95
96impl From<ClientError> for crate::error::Error {
97    #[inline(always)]
98    fn from(err: ClientError) -> Self {
99        match err {
100            ClientError::ConnectionClosed(err) => crate::error::Error::ConnectionClosed(err),
101            ClientError::RequestEncode(err) => err,
102            ClientError::ResponseDecode(err) => err,
103            ClientError::ErrorResponse(err) => crate::error::Error::Remote(err),
104        }
105    }
106}
107
108#[derive(Clone, Debug)]
109enum State {
110    Alive,
111    ClosedManually,
112    /// This can only be [`Error::ConnectionClosed`].
113    ClosedWithError(Arc<error::Error>),
114}
115
116impl State {
117    fn is_alive(&self) -> bool {
118        matches!(self, Self::Alive)
119    }
120
121    fn is_closed(&self) -> bool {
122        !self.is_alive()
123    }
124}
125
126#[derive(Debug)]
127struct ClientInner {
128    protocol: Protocol,
129    awaiting_response: HashMap<SyncIndex, oneshot::Sender<Result<(), Arc<error::Error>>>>,
130    state: State,
131    /// The same tcp stream sender & receiver fibers a working with. Only stored
132    /// here for closing.
133    stream: TcpStream,
134    sender_fiber_id: Option<FiberId>,
135    receiver_fiber_id: Option<FiberId>,
136    clients_count: usize,
137}
138
139impl ClientInner {
140    pub fn new(config: protocol::Config, stream: TcpStream) -> Self {
141        #[cfg(feature = "picodata")]
142        if config.auth_method == crate::auth::AuthMethod::Ldap {
143            crate::say_warn!(
144                "You're using the 'ldap' authentication method, which implies sending the password UNENCRYPTED over the TCP connection. TLS is not yet implemented for IPROTO connections so make sure your communication channel is secure by other means."
145            )
146        }
147        Self {
148            protocol: Protocol::with_config(config),
149            awaiting_response: HashMap::new(),
150            state: State::Alive,
151            stream,
152            sender_fiber_id: None,
153            receiver_fiber_id: None,
154            clients_count: 1,
155        }
156    }
157}
158
159/// Wakes sender if `protocol` has new outgoing data.
160fn maybe_wake_sender(client: &ClientInner) {
161    if client.protocol.ready_outgoing_len() == 0 {
162        // No point in waking the sender if there's nothing to send
163        return;
164    }
165    if let Some(id) = client.sender_fiber_id {
166        fiber::wakeup(id);
167    }
168}
169
170/// Actual client that can be used to send and receive messages to tarantool instance.
171///
172/// Can be cloned and moved into different fibers for connection to be reused.
173///
174/// See [`super::client`] for examples and [`AsClient`] trait for API.
175// WARNING: Attention should be payed not to borrow inner client across await and yield points.
176#[derive(Debug)]
177pub struct Client(Rc<NoYieldsRefCell<ClientInner>>);
178
179impl Client {
180    /// Creates a new client and tries to establish connection
181    /// to `url:port`
182    ///
183    /// # Errors
184    /// Error is returned if an attempt to connect failed.
185    pub async fn connect(url: &str, port: u16) -> Result<Self, ClientError> {
186        Self::connect_with_config(url, port, Default::default()).await
187    }
188
189    /// Creates a new client and tries to establish connection
190    /// to `url:port` with given timeout. When timeout is None
191    /// function behaves as a synchronous.
192    ///
193    /// Takes explicit `config` in comparison to [`Client::connect`]
194    /// where default values are used.
195    ///
196    /// # Errors
197    /// Error is returned if an attempt to connect failed.
198    pub async fn connect_with_config(
199        url: &str,
200        port: u16,
201        config: protocol::Config,
202    ) -> Result<Self, ClientError> {
203        let timeout = config.connect_timeout.unwrap_or(Duration::MAX);
204        let stream = TcpStream::connect_timeout(url, port, timeout)
205            .map_err(|e| ClientError::ConnectionClosed(Arc::new(e.into())))?;
206        let client = ClientInner::new(config, stream.clone());
207        let client = Rc::new(NoYieldsRefCell::new(client));
208
209        let receiver_fiber_id = fiber::Builder::new()
210            .func_async(receiver(client.clone(), stream.clone()))
211            .name(format!("iproto-in/{url}:{port}"))
212            .start_non_joinable()
213            .unwrap();
214
215        let sender_fiber_id = fiber::Builder::new()
216            .func_async(sender(client.clone(), stream))
217            .name(format!("iproto-out/{url}:{port}"))
218            .start_non_joinable()
219            .unwrap();
220
221        {
222            let mut client_mut = client.borrow_mut();
223            client_mut.receiver_fiber_id = Some(receiver_fiber_id);
224            client_mut.sender_fiber_id = Some(sender_fiber_id);
225        }
226
227        Ok(Self(client))
228    }
229
230    fn check_state(&self) -> Result<(), Arc<error::Error>> {
231        match &self.0.borrow().state {
232            State::Alive => Ok(()),
233            State::ClosedManually => unreachable!("All client handles are dropped at this point"),
234            State::ClosedWithError(err) => Err(err.clone()),
235        }
236    }
237}
238
239/// Generic API for an entity that behaves as Tarantool Client.
240#[async_trait::async_trait(?Send)]
241pub trait AsClient {
242    /// Send [`Request`] and wait for response.
243    /// This function yields.
244    ///
245    /// # Errors
246    /// In case of `ClosedWithErr` it is suggested to recreate the connection.
247    /// Other errors are self-descriptive.
248    async fn send<R: Request>(&self, request: &R) -> Result<R::Response, ClientError>;
249
250    /// Execute a PING command.
251    async fn ping(&self) -> Result<(), ClientError> {
252        self.send(&Ping).await
253    }
254
255    /// Call a remote stored procedure.
256    ///
257    /// `conn.call("func", &("1", "2", "3"))` is the remote-call equivalent of `func('1', '2', '3')`.
258    /// That is, `conn.call` is a remote stored-procedure call.
259    /// The return from `conn.call` is whatever the function returns.
260    async fn call<T>(&self, fn_name: &str, args: &T) -> Result<Tuple, ClientError>
261    where
262        T: ToTupleBuffer + ?Sized,
263    {
264        self.send(&Call { fn_name, args }).await
265    }
266
267    /// Evaluates and executes the expression in Lua-string, which may be any statement or series of statements.
268    ///
269    /// An execute privilege is required; if the user does not have it, an administrator may grant it with
270    /// `box.schema.user.grant(username, 'execute', 'universe')`.
271    ///
272    /// To ensure that the return from `eval` is whatever the Lua expression returns, begin the Lua-string with the
273    /// word `return`.
274    async fn eval<T>(&self, expr: &str, args: &T) -> Result<Tuple, ClientError>
275    where
276        T: ToTupleBuffer + ?Sized,
277    {
278        self.send(&Eval { args, expr }).await
279    }
280
281    /// Execute sql query remotely.
282    async fn execute<T>(&self, sql: &str, bind_params: &T) -> Result<Vec<Tuple>, ClientError>
283    where
284        T: ToTupleBuffer + ?Sized,
285    {
286        self.send(&Execute { sql, bind_params }).await
287    }
288}
289
290#[async_trait::async_trait(?Send)]
291impl AsClient for Client {
292    async fn send<R: Request>(&self, request: &R) -> Result<R::Response, ClientError> {
293        if let Err(e) = self.check_state() {
294            return Err(ClientError::ConnectionClosed(e));
295        }
296
297        let res = self.0.borrow_mut().protocol.send_request(request);
298        let sync = unwrap_ok_or!(res,
299            Err(e) => {
300                return Err(ClientError::RequestEncode(e));
301            }
302        );
303
304        let (tx, rx) = oneshot::channel();
305        self.0.borrow_mut().awaiting_response.insert(sync, tx);
306        maybe_wake_sender(&self.0.borrow());
307        // Cleanup `awaiting_response` entry in case of `send` future cancelation
308        // at this `.await`.
309        // `send` can be canceled for example with `Timeout`.
310        let res = rx
311            .on_drop(|| {
312                let _ = self.0.borrow_mut().awaiting_response.remove(&sync);
313            })
314            .await
315            .expect("Channel should be open");
316        if let Err(e) = res {
317            return Err(ClientError::ConnectionClosed(e));
318        }
319
320        let res = self
321            .0
322            .borrow_mut()
323            .protocol
324            .take_response::<R>(sync)
325            .expect("Is present at this point");
326        let response = unwrap_ok_or!(res,
327            Err(error::Error::Remote(response)) => {
328                return Err(ClientError::ErrorResponse(response));
329            }
330            Err(e) => {
331                return Err(ClientError::ResponseDecode(e));
332            }
333        );
334        Ok(response)
335    }
336}
337
338impl Drop for Client {
339    fn drop(&mut self) {
340        let clients_count = self.0.borrow().clients_count;
341        if clients_count == 1 {
342            let mut client = self.0.borrow_mut();
343            // Stop fibers
344            client.state = State::ClosedManually;
345
346            let receiver_fiber_id = client.receiver_fiber_id;
347            let sender_fiber_id = client.sender_fiber_id;
348
349            // We need to close the stream here, because otherwise receiver will
350            // never wake up, because our async runtime blocks forever until the
351            // future is ready.
352            if let Err(e) = client.stream.close() {
353                crate::say_error!("Client::drop: failed closing tcp stream: {e}");
354            }
355
356            // Drop ref before executing code that switches fibers.
357            drop(client);
358
359            // Cancel the worker fibers and wake them up so they can exit their loops
360            if let Some(id) = receiver_fiber_id {
361                fiber::cancel(id);
362                fiber::wakeup(id);
363            }
364
365            if let Some(id) = sender_fiber_id {
366                fiber::cancel(id);
367                fiber::wakeup(id);
368            }
369        } else {
370            self.0.borrow_mut().clients_count -= 1;
371        }
372    }
373}
374
375impl Clone for Client {
376    fn clone(&self) -> Self {
377        self.0.borrow_mut().clients_count += 1;
378        Self(self.0.clone())
379    }
380}
381
382macro_rules! handle_result {
383    ($client:expr, $e:expr) => {
384        match $e {
385            Ok(value) => value,
386            Err(err) => {
387                let err = Arc::new(error::Error::from(err));
388                // Notify all subscribers on closing
389                let subscriptions: HashMap<_, _> = $client.awaiting_response.drain().collect();
390                for (_, subscription) in subscriptions {
391                    // We don't care about errors at this point
392                    let _ = subscription.send(Err(err.clone()));
393                }
394                $client.state = State::ClosedWithError(err);
395                return;
396            }
397        }
398    };
399}
400
401/// Sender work loop. Yields on each iteration and during awaits.
402async fn sender(client: Rc<NoYieldsRefCell<ClientInner>>, mut writer: TcpStream) {
403    loop {
404        if client.borrow().state.is_closed() || fiber::is_cancelled() {
405            return;
406        }
407        // TODO: limit max send size
408        let data = client.borrow_mut().protocol.take_outgoing_data();
409        if data.is_empty() {
410            // Wait for explicit wakeup, it should happen when there is new outgoing data
411            fiber::fiber_yield();
412        } else {
413            let result = writer.write_all(&data).await;
414            handle_result!(client.borrow_mut(), result);
415        }
416    }
417}
418
419/// Receiver work loop. Yields on each iteration and during awaits.
420// Clippy falsely reports that we're holding a `NoYieldsRefCell` reference across an
421// `await`, even though we're explicitly dropping the reference right before
422// awaiting. Thank you clippy, very helpful!
423#[allow(clippy::await_holding_refcell_ref)]
424async fn receiver(client_cell: Rc<NoYieldsRefCell<ClientInner>>, mut reader: TcpStream) {
425    let mut buf = vec![0_u8; 4096];
426    loop {
427        let client = client_cell.borrow();
428        if client.state.is_closed() || fiber::is_cancelled() {
429            return;
430        }
431
432        let size = client.protocol.read_size_hint();
433        if buf.len() < size {
434            buf.resize(size, 0);
435        }
436        let buf_slice = &mut buf[0..size];
437
438        // Reference must be dropped before yielding.
439        drop(client);
440
441        let res = reader.read_exact(buf_slice).await;
442
443        let mut client = client_cell.borrow_mut();
444        handle_result!(client, res);
445
446        let result = client
447            .protocol
448            .process_incoming(&mut Cursor::new(buf_slice));
449        let result = handle_result!(client, result);
450        if let Some(sync) = result {
451            let subscription = client.awaiting_response.remove(&sync);
452            if let Some(subscription) = subscription {
453                subscription
454                    .send(Ok(()))
455                    .expect("cannot be closed at this point");
456            } else {
457                crate::say_warn!("received unwaited message for {sync:?}");
458            }
459        }
460
461        // Wake sender to handle the greeting we may have just received
462        maybe_wake_sender(&client);
463    }
464}
465
466#[cfg(feature = "internal_test")]
467mod tests {
468    use super::*;
469    use crate::error::TarantoolErrorCode;
470    use crate::fiber::r#async::timeout::IntoTimeout as _;
471    use crate::space::Space;
472    use crate::test::util::listen_port;
473    use std::time::Duration;
474
475    async fn test_client() -> Client {
476        Client::connect_with_config(
477            "localhost",
478            listen_port(),
479            protocol::Config {
480                creds: Some(("test_user".into(), "password".into())),
481                auth_method: crate::auth::AuthMethod::ChapSha1,
482                ..Default::default()
483            },
484        )
485        .timeout(Duration::from_secs(3))
486        .await
487        .unwrap()
488    }
489
490    #[crate::test(tarantool = "crate")]
491    async fn connect_with_timeout() {
492        // Without timeout, the connection hangs on address like this
493        let client = Client::connect_with_config(
494            "123123", // Invalid host
495            listen_port(),
496            protocol::Config {
497                connect_timeout: Some(Duration::from_secs(1)),
498                ..Default::default()
499            },
500        );
501        let res = client.await.unwrap_err();
502        if cfg!(target_os = "macos") {
503            assert!(res.to_string().contains("No route to host"));
504        } else {
505            assert_eq!(res.to_string(), "connect timeout");
506        }
507    }
508
509    #[crate::test(tarantool = "crate")]
510    async fn connect() {
511        let _client = Client::connect("localhost", listen_port()).await.unwrap();
512    }
513
514    #[crate::test(tarantool = "crate")]
515    async fn connect_failure() {
516        // Can be any other unused port
517        let err = Client::connect("localhost", 0).await.unwrap_err();
518        assert!(matches!(dbg!(err), ClientError::ConnectionClosed(_)))
519    }
520
521    #[crate::test(tarantool = "crate")]
522    async fn ping() {
523        let client = test_client().await;
524
525        for _ in 0..5 {
526            client.ping().timeout(Duration::from_secs(3)).await.unwrap();
527        }
528    }
529
530    #[crate::test(tarantool = "crate")]
531    fn ping_concurrent() {
532        let client = fiber::block_on(test_client());
533        let fiber_a = fiber::start_async(async {
534            client.ping().timeout(Duration::from_secs(3)).await.unwrap()
535        });
536        let fiber_b = fiber::start_async(async {
537            client.ping().timeout(Duration::from_secs(3)).await.unwrap()
538        });
539        fiber_a.join();
540        fiber_b.join();
541    }
542
543    #[crate::test(tarantool = "crate")]
544    async fn execute() {
545        Space::find("test_s1")
546            .unwrap()
547            .insert(&(6001, "6001"))
548            .unwrap();
549        Space::find("test_s1")
550            .unwrap()
551            .insert(&(6002, "6002"))
552            .unwrap();
553
554        let client = test_client().await;
555
556        let lua = crate::lua_state();
557        // Error is silently ignored on older versions, before 'compat' was introduced.
558        _ = lua.exec("require'compat'.sql_seq_scan_default = 'old'");
559
560        let result = client
561            .execute(r#"SELECT * FROM "test_s1""#, &())
562            .timeout(Duration::from_secs(3))
563            .await
564            .unwrap();
565        assert!(result.len() >= 2);
566
567        let result = client
568            .execute(r#"SELECT * FROM "test_s1" WHERE "id" = ?"#, &(6002,))
569            .timeout(Duration::from_secs(3))
570            .await
571            .unwrap();
572
573        assert_eq!(result.len(), 1);
574        assert_eq!(
575            result.first().unwrap().decode::<(u64, String)>().unwrap(),
576            (6002, "6002".into())
577        );
578    }
579
580    #[crate::test(tarantool = "crate")]
581    async fn call() {
582        let client = test_client().await;
583
584        let result = client
585            .call("test_stored_proc", &(1, 2))
586            .timeout(Duration::from_secs(3))
587            .await
588            .unwrap();
589        assert_eq!(result.decode::<(i32,)>().unwrap(), (3,));
590    }
591
592    #[crate::test(tarantool = "crate")]
593    async fn invalid_call() {
594        let client = test_client().await;
595
596        let err = client
597            .call("unexistent_proc", &())
598            .timeout(Duration::from_secs(3))
599            .await
600            .unwrap_err();
601
602        let err = error::Error::from(err);
603        let error::Error::Remote(err) = err else {
604            panic!()
605        };
606
607        assert_eq!(err.error_code(), TarantoolErrorCode::NoSuchProc as u32);
608
609        #[rustfmt::skip]
610        assert_eq!(err.to_string(), "NoSuchProc: Procedure 'unexistent_proc' is not defined");
611    }
612
613    #[crate::test(tarantool = "crate")]
614    async fn eval() {
615        let client = test_client().await;
616
617        // Ok result
618        let result = client
619            .eval("return ...", &(1, 2))
620            .timeout(Duration::from_secs(3))
621            .await
622            .unwrap();
623        assert_eq!(result.decode::<(i32, i32)>().unwrap(), (1, 2));
624
625        // Error result
626        let err = client
627            .eval("box.error(420)", &())
628            .timeout(Duration::from_secs(3))
629            .await
630            .unwrap_err();
631
632        let err = error::Error::from(err);
633        let error::Error::Remote(err) = err else {
634            panic!()
635        };
636
637        assert_eq!(err.error_code(), 420);
638    }
639
640    /// A regression test for https://git.picodata.io/picodata/picodata/tarantool-module/-/merge_requests/302
641    #[crate::test(tarantool = "crate")]
642    async fn client_count_regression() {
643        let client = test_client().await;
644        // Should close sender and receiver fibers
645        client.0.borrow_mut().stream.close().unwrap();
646        // Receiver wakes and closes
647        fiber::reschedule();
648
649        let fiber_id = client.0.borrow().sender_fiber_id.unwrap();
650        let fiber_exists = fiber::wakeup(fiber_id);
651        debug_assert!(fiber_exists);
652
653        // Sender wakes and closes
654        fiber::reschedule();
655        // Sender and receiver stopped and dropped their refs
656        assert_eq!(Rc::strong_count(&client.0), 1);
657
658        // Cloning a client produces 2 refs
659        let client_clone = client.clone();
660        assert_eq!(Rc::strong_count(&client.0), 2);
661        // Here if client checked by Rc refs <= 3 it would assume it is the last and set state to ClosedManually
662        drop(client_clone);
663        assert_eq!(Rc::strong_count(&client.0), 1);
664
665        // This would panic on unreachable if previous drop have set the state
666        client.check_state().unwrap_err();
667    }
668
669    #[crate::test(tarantool = "crate")]
670    async fn concurrent_messages_one_fiber() {
671        let client = test_client().await;
672        let mut ping_futures = vec![];
673        for _ in 0..10 {
674            ping_futures.push(client.ping());
675        }
676        for res in futures::future::join_all(ping_futures).await {
677            res.unwrap();
678        }
679    }
680
681    #[crate::test(tarantool = "crate")]
682    async fn data_always_present_in_response() {
683        let client = test_client().await;
684
685        // Even though we do a return without value,
686        // error `ResponseDataNotFound` is never returned, the result is Ok(_) instead.
687        client.eval("return", &()).await.unwrap();
688        client.call("LUA", &("return",)).await.unwrap();
689    }
690
691    #[crate::test(tarantool = "crate")]
692    async fn big_data() {
693        // NOTE: random looking constants in this test are random.
694        // I'm just increasing the entropy for good luck.
695        use crate::tuple::RawByteBuf;
696
697        #[crate::proc(tarantool = "crate")]
698        fn proc_big_data<'a>(s: &'a serde_bytes::Bytes) -> usize {
699            s.len() + 17
700        }
701
702        let proc = crate::define_stored_proc_for_tests!(proc_big_data);
703        let client = test_client().await;
704
705        // Note: tarantool on macos has a bug where it will try to read more
706        // data than macos allows to be read at once.
707        #[cfg(target_os = "macos")]
708        const N: u32 = 0x1fff_ff69;
709
710        #[cfg(not(target_os = "macos"))]
711        const N: u32 = 0x6fff_ff69;
712
713        // SAFETY: this is basically a generation of a random array
714        #[allow(clippy::uninit_vec)]
715        let s = unsafe {
716            let buf_size = (N + 6) as usize;
717            let mut data = Vec::<u8>::with_capacity(buf_size);
718            data.set_len(buf_size);
719            data[0] = b'\x91';
720            data[1] = b'\xc6'; // BIN32
721            data[2..6].copy_from_slice(&N.to_be_bytes());
722            RawByteBuf::from(data)
723        };
724
725        let t0 = std::time::Instant::now();
726        let t = client.call(&proc, &s).await.unwrap();
727        dbg!(t0.elapsed());
728
729        if let Ok((len,)) = t.decode::<(u32,)>() {
730            assert_eq!(len, N + 17);
731        } else {
732            let ((len,),): ((u32,),) = t.decode().unwrap();
733            assert_eq!(len, N + 17);
734        }
735    }
736
737    #[cfg(feature = "picodata")]
738    #[crate::test(tarantool = "crate")]
739    async fn md5_auth_method() {
740        use crate::auth::AuthMethod;
741        use std::time::Duration;
742
743        let username = "Johnny";
744        let password = "B. Goode";
745
746        // NOTE: because we test our fork of `tarantool` here (see `picodata` feature flag on a test), we can
747        // pass `auth_type` parameter right into `box.schema.user.create`. This won't work in default `tarantool`.
748        crate::lua_state()
749            .exec_with(
750                "local username, password = ...
751                box.cfg { }
752                box.schema.user.create(username, { if_not_exists = true, auth_type = 'md5', password = password })
753                box.schema.user.grant(username, 'super', nil, nil, { if_not_exists = true })",
754                (username, password),
755            )
756            .unwrap();
757
758        // Successful connection
759        {
760            let client = Client::connect_with_config(
761                "localhost",
762                listen_port(),
763                protocol::Config {
764                    creds: Some((username.into(), password.into())),
765                    auth_method: AuthMethod::Md5,
766                    ..Default::default()
767                },
768            )
769            .timeout(Duration::from_secs(3))
770            .await
771            .unwrap();
772
773            // network::Client will not try actually connecting until we send the
774            // first request
775            client
776                .eval("print('\\x1b[32mit works!\\x1b[0m')", &())
777                .await
778                .unwrap();
779        }
780
781        // Wrong password
782        {
783            let client = Client::connect_with_config(
784                "localhost",
785                listen_port(),
786                protocol::Config {
787                    creds: Some((username.into(), "wrong password".into())),
788                    auth_method: AuthMethod::Md5,
789                    ..Default::default()
790                },
791            )
792            .timeout(Duration::from_secs(3))
793            .await
794            .unwrap();
795
796            // network::Client will not try actually connecting until we send the
797            // first request
798            let err = client.eval("return", &()).await.unwrap_err().to_string();
799            #[rustfmt::skip]
800            assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
801        }
802
803        // Wrong auth method
804        {
805            let client = Client::connect_with_config(
806                "localhost",
807                listen_port(),
808                protocol::Config {
809                    creds: Some((username.into(), password.into())),
810                    auth_method: AuthMethod::ChapSha1,
811                    ..Default::default()
812                },
813            )
814            .timeout(Duration::from_secs(3))
815            .await
816            .unwrap();
817
818            // network::Client will not try actually connecting until we send the
819            // first request
820            let err = client.eval("return", &()).await.unwrap_err().to_string();
821            #[rustfmt::skip]
822            assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
823        }
824
825        crate::lua_state()
826            // This is the default
827            .exec_with(
828                "local username = ...
829                box.cfg { auth_type = 'chap-sha1' }
830                box.schema.user.drop(username)",
831                username,
832            )
833            .unwrap();
834    }
835
836    #[cfg(feature = "picodata")]
837    #[crate::test(tarantool = "crate")]
838    async fn ldap_auth_method() {
839        use crate::auth::AuthMethod;
840        use std::time::Duration;
841
842        let username = "Johnny";
843        let password = "B. Goode";
844
845        let _guard = crate::unwrap_ok_or!(
846            crate::test::util::setup_ldap_auth(username, password),
847            Err(e) => {
848                println!("{e}, skipping ldap test");
849                return;
850            }
851        );
852
853        // Successfull connection
854        {
855            let client = Client::connect_with_config(
856                "localhost",
857                listen_port(),
858                protocol::Config {
859                    creds: Some((username.into(), password.into())),
860                    auth_method: AuthMethod::Ldap,
861                    ..Default::default()
862                },
863            )
864            .timeout(Duration::from_secs(3))
865            .await
866            .unwrap();
867
868            // network::Client will not try actually connecting until we send the
869            // first request
870            client
871                .eval("print('\\x1b[32mit works!\\x1b[0m')", &())
872                .await
873                .unwrap();
874        }
875
876        // Wrong password
877        {
878            let client = Client::connect_with_config(
879                "localhost",
880                listen_port(),
881                protocol::Config {
882                    creds: Some((username.into(), "wrong password".into())),
883                    auth_method: AuthMethod::Ldap,
884                    ..Default::default()
885                },
886            )
887            .timeout(Duration::from_secs(3))
888            .await
889            .unwrap();
890
891            // network::Client will not try actually connecting until we send the
892            // first request
893            let err = client.eval("return", &()).await.unwrap_err().to_string();
894            #[rustfmt::skip]
895            assert_eq!(err, "server responded with error: System: Invalid credentials");
896        }
897
898        // Wrong auth method
899        {
900            let client = Client::connect_with_config(
901                "localhost",
902                listen_port(),
903                protocol::Config {
904                    creds: Some((username.into(), password.into())),
905                    auth_method: AuthMethod::ChapSha1,
906                    ..Default::default()
907                },
908            )
909            .timeout(Duration::from_secs(3))
910            .await
911            .unwrap();
912
913            // network::Client will not try actually connecting until we send the
914            // first request
915            let err = client.eval("return", &()).await.unwrap_err().to_string();
916            #[rustfmt::skip]
917            assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
918        }
919    }
920
921    #[crate::test(tarantool = "crate")]
922    async fn extended_error_info() {
923        let client = test_client().await;
924
925        let res = client
926            .eval(
927                "error1 = box.error.new(box.error.UNSUPPORTED, 'this', 'that')
928                error2 = box.error.new('MyCode', 'my message')
929                error3 = box.error.new('MyOtherCode', 'my other message')
930                error2:set_prev(error3)
931                error1:set_prev(error2)
932                error1:raise()",
933                &(),
934            )
935            .timeout(Duration::from_secs(3))
936            .await;
937
938        let error::Error::Remote(e) = error::Error::from(res.unwrap_err()) else {
939            panic!();
940        };
941
942        assert_eq!(e.error_code(), TarantoolErrorCode::Unsupported as u32);
943        assert_eq!(e.message(), "this does not support that");
944        assert_eq!(e.error_type(), "ClientError");
945        assert_eq!(e.file(), Some("eval"));
946        assert_eq!(e.line(), Some(1));
947        let fields_len = e.fields().len();
948        // Starting from tarantool 2.11.5 it will contain `name` field
949        assert!(fields_len == 1 || fields_len == 0);
950        if fields_len == 1 {
951            assert_eq!(e.fields()["name"], rmpv::Value::from("UNSUPPORTED"));
952        }
953
954        let e = e.cause().unwrap();
955
956        assert_eq!(e.error_code(), 0);
957        assert_eq!(e.message(), "my message");
958        assert_eq!(e.error_type(), "CustomError");
959        assert_eq!(e.file(), Some("eval"));
960        assert_eq!(e.line(), Some(2));
961        assert_eq!(e.fields().len(), 1);
962        assert_eq!(e.fields()["custom_type"], rmpv::Value::from("MyCode"));
963
964        let e = e.cause().unwrap();
965
966        assert_eq!(e.error_code(), 0);
967        assert_eq!(e.message(), "my other message");
968        assert_eq!(e.error_type(), "CustomError");
969        assert_eq!(e.file(), Some("eval"));
970        assert_eq!(e.line(), Some(3));
971        assert_eq!(e.fields().len(), 1);
972        assert_eq!(e.fields()["custom_type"], rmpv::Value::from("MyOtherCode"));
973
974        assert!(e.cause().is_none());
975    }
976
977    #[crate::test(tarantool = "crate")]
978    async fn custom_error_code_from_proc() {
979        #[crate::proc(tarantool = "crate")]
980        fn proc_custom_error_code() -> Result<(), crate::error::Error> {
981            Err(BoxError::new(666666_u32, "na ah").into())
982        }
983        let error_line = line!() - 2; // where `BoxError` is constructed
984
985        let proc = crate::define_stored_proc_for_tests!(proc_custom_error_code);
986        let client = test_client().await;
987
988        let res = client
989            .call(&proc, &())
990            .timeout(Duration::from_secs(3))
991            .await;
992
993        let e = match error::Error::from(res.unwrap_err()) {
994            error::Error::Remote(e) => e,
995            other => {
996                panic!("unexpected error: {}", other);
997            }
998        };
999
1000        assert_eq!(e.error_code(), 666666);
1001        assert_eq!(e.message(), "na ah");
1002        assert_eq!(e.error_type(), "ClientError");
1003        assert_eq!(e.file(), Some(file!()));
1004        assert_eq!(e.line(), Some(error_line));
1005    }
1006
1007    #[crate::test(tarantool = "crate")]
1008    async fn check_error_location() {
1009        // The line number reported for the error will point to the #[proc]
1010        // macro attribute because tarantool::error::Error doesn't actually
1011        // store the error location and the location is captured when
1012        // IntoBoxError::set_last_error is called which happens in the code
1013        // generated by the proc macro. So if you want to have a more helpful
1014        // error location you should construct the BoxError explicitly.
1015        let error_line = line!() + 1;
1016        #[crate::proc(tarantool = "crate")]
1017        fn proc_check_error_location_implicit() -> Result<(), error::Error> {
1018            Err(error::Error::other("not good"))
1019        }
1020
1021        let proc = crate::define_stored_proc_for_tests!(proc_check_error_location_implicit);
1022        let client = test_client().await;
1023
1024        let res = client
1025            .call(&proc, &())
1026            .timeout(Duration::from_secs(3))
1027            .await;
1028
1029        let e = match error::Error::from(res.unwrap_err()) {
1030            error::Error::Remote(e) => e,
1031            other => {
1032                panic!("unexpected error: {}", other);
1033            }
1034        };
1035
1036        assert_eq!(e.file(), Some(file!()));
1037        assert_eq!(e.line(), Some(error_line));
1038    }
1039}