tarantool/network/client/
mod.rs

1//! Tarantool based async [`Client`].
2//!
3//! Can be used only from inside Tarantool as it makes heavy use of fibers and coio.
4//!
5//! # Example
6//! ```no_run
7//! # async {
8//! use tarantool::network::client::Client;
9//! // Most of the client's methods are in the `AsClient` trait
10//! use tarantool::network::client::AsClient as _;
11//!
12//! let client = Client::connect("localhost", 3301).await.unwrap();
13//! client.ping().await.unwrap();
14//!
15//! // Requests can also be easily combined with fiber::r#async::timeout
16//! use tarantool::fiber::r#async::timeout::IntoTimeout as _;
17//! use std::time::Duration;
18//!
19//! client.ping().timeout(Duration::from_secs(10)).await.unwrap();
20//! # };
21//! ```
22//!
23//! # Reusing Connection
24//! Client can be cloned, and safely moved to a different fiber if needed, to reuse the same connection.
25//! When multiple fibers use the same connection, all requests are pipelined through the same network socket, but each fiber
26//! gets back a correct response. Reducing the number of active sockets lowers the overhead of system calls and increases
27//! the overall server performance.
28//!
29//! # Implementation
30//! Internally the client uses [`Protocol`] to get bytes that it needs to send
31//! and push bytes that it gets from the network.
32//!
33//! On creation the client spawns sender and receiver worker threads. Which in turn
34//! use coio based [`TcpStream`] as the transport layer.
35
36pub mod reconnect;
37pub mod stream;
38pub mod tcp;
39pub mod tls;
40
41use std::collections::HashMap;
42use std::io::Cursor;
43use std::rc::Rc;
44use std::sync::Arc;
45use std::time::Duration;
46
47use self::stream::Stream;
48use self::tcp::TcpStream;
49use self::tls::{TlsConnector, TlsStream};
50
51use super::protocol::api::{Call, Eval, Execute, Ping, Request};
52use super::protocol::{self, Protocol, SyncIndex};
53use crate::error;
54use crate::error::BoxError;
55use crate::fiber;
56use crate::fiber::r#async::oneshot;
57use crate::fiber::r#async::IntoOnDrop as _;
58use crate::fiber::FiberId;
59use crate::fiber::NoYieldsRefCell;
60use crate::tuple::{ToTupleBuffer, Tuple};
61use crate::unwrap_ok_or;
62
63use futures::{AsyncReadExt, AsyncWriteExt};
64
65#[deprecated = "use `ClientError` instead"]
66pub type Error = ClientError;
67
68/// Error returned by [`Client`].
69#[derive(thiserror::Error, Debug)]
70pub enum ClientError {
71    /// The connection was closed because of this error.
72    ///
73    /// The error is wrapped in a [`Arc`], because some libraries require
74    /// error types to implement [`Sync`], which isn't implemented for [`Rc`].
75    #[error("{0}")]
76    ConnectionClosed(Arc<crate::error::Error>),
77
78    /// Error happened during encoding of the request.
79    ///
80    /// The error is wrapped in a [`Arc`], because some libraries require
81    /// error types to implement [`Sync`], which isn't implemented for [`Rc`].
82    #[error("{0}")]
83    RequestEncode(crate::error::Error),
84
85    /// Error happened during decoding of the response.
86    ///
87    /// The error is wrapped in a [`Arc`], because some libraries require
88    /// error types to implement [`Sync`], which isn't implemented for [`Rc`].
89    #[error("{0}")]
90    ResponseDecode(crate::error::Error),
91
92    /// Service responded with an error.
93    ///
94    /// The error is wrapped in a [`Arc`], because some libraries require
95    /// error types to implement [`Sync`], which isn't implemented for [`Rc`].
96    #[error("{0}")]
97    ErrorResponse(BoxError),
98}
99
100impl From<ClientError> for crate::error::Error {
101    #[inline(always)]
102    fn from(err: ClientError) -> Self {
103        match err {
104            ClientError::ConnectionClosed(err) => crate::error::Error::ConnectionClosed(err),
105            ClientError::RequestEncode(err) => err,
106            ClientError::ResponseDecode(err) => err,
107            ClientError::ErrorResponse(err) => crate::error::Error::Remote(err),
108        }
109    }
110}
111
112#[derive(Clone, Debug)]
113enum State {
114    Alive,
115    ClosedManually,
116    /// This can only be [`Error::ConnectionClosed`].
117    ClosedWithError(Arc<error::Error>),
118}
119
120impl State {
121    fn is_alive(&self) -> bool {
122        matches!(self, Self::Alive)
123    }
124
125    fn is_closed(&self) -> bool {
126        !self.is_alive()
127    }
128}
129
130#[derive(Debug)]
131struct ClientInner {
132    protocol: Protocol,
133    awaiting_response: HashMap<SyncIndex, oneshot::Sender<Result<(), Arc<error::Error>>>>,
134    state: State,
135    /// The same tcp stream sender & receiver fibers a working with. Only stored
136    /// here for closing.
137    stream: Stream,
138    sender_fiber_id: Option<FiberId>,
139    receiver_fiber_id: Option<FiberId>,
140    clients_count: usize,
141}
142
143impl ClientInner {
144    pub fn new(config: protocol::Config, stream: Stream) -> Self {
145        Self {
146            protocol: Protocol::with_config(config),
147            awaiting_response: HashMap::new(),
148            state: State::Alive,
149            stream,
150            sender_fiber_id: None,
151            receiver_fiber_id: None,
152            clients_count: 1,
153        }
154    }
155}
156
157/// Wakes sender if `protocol` has new outgoing data.
158fn maybe_wake_sender(client: &ClientInner) {
159    if client.protocol.ready_outgoing_len() == 0 {
160        // No point in waking the sender if there's nothing to send
161        return;
162    }
163    if let Some(id) = client.sender_fiber_id {
164        fiber::wakeup(id);
165    }
166}
167
168/// Actual client that can be used to send and receive messages to tarantool instance.
169///
170/// Can be cloned and moved into different fibers for connection to be reused.
171///
172/// See [`super::client`] for examples and [`AsClient`] trait for API.
173// WARNING: Attention should be payed not to borrow inner client across await and yield points.
174#[derive(Debug)]
175pub struct Client(Rc<NoYieldsRefCell<ClientInner>>);
176
177impl Client {
178    /// Creates a new client and tries to establish connection
179    /// to `url:port`
180    ///
181    /// # Errors
182    /// Error is returned if an attempt to connect failed.
183    pub async fn connect(url: &str, port: u16) -> Result<Self, ClientError> {
184        Self::connect_with_config(url, port, Default::default()).await
185    }
186
187    /// Creates a new client and tries to establish connection
188    /// to `url:port` with given timeout. When timeout is None
189    /// function behaves as a synchronous.
190    ///
191    /// Takes explicit `config` in comparison to [`Client::connect`]
192    /// where default values are used.
193    ///
194    /// # Errors
195    /// Error is returned if an attempt to connect failed.
196    pub async fn connect_with_config(
197        url: &str,
198        port: u16,
199        config: protocol::Config,
200    ) -> Result<Self, ClientError> {
201        Self::connect_with_config_and_tls(url, port, config, None).await
202    }
203
204    pub async fn connect_with_config_and_tls(
205        url: &str,
206        port: u16,
207        config: protocol::Config,
208        tls_connector: Option<TlsConnector>,
209    ) -> Result<Self, ClientError> {
210        let timeout = config.connect_timeout.unwrap_or(Duration::MAX);
211        let tcp_stream = TcpStream::connect_timeout_async(url, port, timeout)
212            .await
213            .map_err(|e| ClientError::ConnectionClosed(Arc::new(e.into())))?;
214
215        let stream = match tls_connector {
216            Some(tls_connector) => {
217                let tls_stream = TlsStream::connect(&tls_connector, tcp_stream, url)
218                    .await
219                    .map_err(|e| ClientError::ConnectionClosed(Arc::new(e.into())))?;
220                Stream::Secure(tls_stream)
221            }
222            None => Stream::Plain(tcp_stream),
223        };
224
225        let client = ClientInner::new(config, stream.clone());
226        let client = Rc::new(NoYieldsRefCell::new(client));
227
228        let receiver_fiber_id = fiber::Builder::new()
229            .func_async(receiver(client.clone(), stream.clone()))
230            .name(format!("iproto-in/{url}:{port}"))
231            .start_non_joinable()
232            .unwrap();
233
234        let sender_fiber_id = fiber::Builder::new()
235            .func_async(sender(client.clone(), stream))
236            .name(format!("iproto-out/{url}:{port}"))
237            .start_non_joinable()
238            .unwrap();
239
240        {
241            let mut client_mut = client.borrow_mut();
242            client_mut.receiver_fiber_id = Some(receiver_fiber_id);
243            client_mut.sender_fiber_id = Some(sender_fiber_id);
244        }
245
246        Ok(Self(client))
247    }
248
249    fn check_state(&self) -> Result<(), Arc<error::Error>> {
250        match &self.0.borrow().state {
251            State::Alive => Ok(()),
252            State::ClosedManually => unreachable!("All client handles are dropped at this point"),
253            State::ClosedWithError(err) => Err(err.clone()),
254        }
255    }
256}
257
258/// Generic API for an entity that behaves as Tarantool Client.
259#[async_trait::async_trait(?Send)]
260pub trait AsClient {
261    /// Send [`Request`] and wait for response.
262    /// This function yields.
263    ///
264    /// # Errors
265    /// In case of `ClosedWithErr` it is suggested to recreate the connection.
266    /// Other errors are self-descriptive.
267    async fn send<R: Request>(&self, request: &R) -> Result<R::Response, ClientError>;
268
269    /// Execute a PING command.
270    async fn ping(&self) -> Result<(), ClientError> {
271        self.send(&Ping).await
272    }
273
274    /// Call a remote stored procedure.
275    ///
276    /// `conn.call("func", &("1", "2", "3"))` is the remote-call equivalent of `func('1', '2', '3')`.
277    /// That is, `conn.call` is a remote stored-procedure call.
278    /// The return from `conn.call` is whatever the function returns.
279    async fn call<T>(&self, fn_name: &str, args: &T) -> Result<Tuple, ClientError>
280    where
281        T: ToTupleBuffer + ?Sized,
282    {
283        self.send(&Call { fn_name, args }).await
284    }
285
286    /// Evaluates and executes the expression in Lua-string, which may be any statement or series of statements.
287    ///
288    /// An execute privilege is required; if the user does not have it, an administrator may grant it with
289    /// `box.schema.user.grant(username, 'execute', 'universe')`.
290    ///
291    /// To ensure that the return from `eval` is whatever the Lua expression returns, begin the Lua-string with the
292    /// word `return`.
293    async fn eval<T>(&self, expr: &str, args: &T) -> Result<Tuple, ClientError>
294    where
295        T: ToTupleBuffer + ?Sized,
296    {
297        self.send(&Eval { args, expr }).await
298    }
299
300    /// Execute sql query remotely.
301    async fn execute<T>(&self, sql: &str, bind_params: &T) -> Result<Vec<Tuple>, ClientError>
302    where
303        T: ToTupleBuffer + ?Sized,
304    {
305        self.send(&Execute { sql, bind_params }).await
306    }
307}
308
309#[async_trait::async_trait(?Send)]
310impl AsClient for Client {
311    async fn send<R: Request>(&self, request: &R) -> Result<R::Response, ClientError> {
312        if let Err(e) = self.check_state() {
313            return Err(ClientError::ConnectionClosed(e));
314        }
315
316        let res = self.0.borrow_mut().protocol.send_request(request);
317        let sync = unwrap_ok_or!(res,
318            Err(e) => {
319                return Err(ClientError::RequestEncode(e));
320            }
321        );
322
323        let (tx, rx) = oneshot::channel();
324        self.0.borrow_mut().awaiting_response.insert(sync, tx);
325        maybe_wake_sender(&self.0.borrow());
326        // Cleanup `awaiting_response` entry in case of `send` future cancelation
327        // at this `.await`.
328        // `send` can be canceled for example with `Timeout`.
329        let res = rx
330            .on_drop(|| {
331                let _ = self.0.borrow_mut().awaiting_response.remove(&sync);
332            })
333            .await
334            .expect("Channel should be open");
335        if let Err(e) = res {
336            return Err(ClientError::ConnectionClosed(e));
337        }
338
339        let res = self
340            .0
341            .borrow_mut()
342            .protocol
343            .take_response::<R>(sync)
344            .expect("Is present at this point");
345        let response = unwrap_ok_or!(res,
346            Err(error::Error::Remote(response)) => {
347                return Err(ClientError::ErrorResponse(response));
348            }
349            Err(e) => {
350                return Err(ClientError::ResponseDecode(e));
351            }
352        );
353
354        Ok(response)
355    }
356}
357
358impl Drop for Client {
359    fn drop(&mut self) {
360        let clients_count = self.0.borrow().clients_count;
361        if clients_count == 1 {
362            let mut client = self.0.borrow_mut();
363            // Stop fibers
364            client.state = State::ClosedManually;
365
366            let receiver_fiber_id = client.receiver_fiber_id;
367            let sender_fiber_id = client.sender_fiber_id;
368
369            // We need to close the stream here, because otherwise receiver will
370            // never wake up, because our async runtime blocks forever until the
371            // future is ready.
372            if let Err(e) = client.stream.shutdown() {
373                crate::say_error!("Client::drop: failed closing iproto stream: {e}");
374            }
375
376            // Drop ref before executing code that switches fibers.
377            drop(client);
378
379            // Cancel the worker fibers and wake them up so they can exit their loops
380            if let Some(id) = receiver_fiber_id {
381                fiber::cancel(id);
382                fiber::wakeup(id);
383            }
384
385            if let Some(id) = sender_fiber_id {
386                fiber::cancel(id);
387                fiber::wakeup(id);
388            }
389        } else {
390            self.0.borrow_mut().clients_count -= 1;
391        }
392    }
393}
394
395impl Clone for Client {
396    fn clone(&self) -> Self {
397        self.0.borrow_mut().clients_count += 1;
398        Self(self.0.clone())
399    }
400}
401
402macro_rules! handle_result {
403    ($client:expr, $e:expr) => {
404        match $e {
405            Ok(value) => value,
406            Err(err) => {
407                let err = Arc::new(error::Error::from(err));
408                // Notify all subscribers on closing
409                let subscriptions: HashMap<_, _> = $client.awaiting_response.drain().collect();
410                for (_, subscription) in subscriptions {
411                    // We don't care about errors at this point
412                    let _ = subscription.send(Err(err.clone()));
413                }
414                $client.state = State::ClosedWithError(err);
415                return;
416            }
417        }
418    };
419}
420
421/// Sender work loop. Yields on each iteration and during awaits.
422async fn sender(client: Rc<NoYieldsRefCell<ClientInner>>, mut writer: impl AsyncWriteExt + Unpin) {
423    loop {
424        if client.borrow().state.is_closed() || fiber::is_cancelled() {
425            return;
426        }
427        // TODO: limit max send size
428        let data = client.borrow_mut().protocol.take_outgoing_data();
429        if data.is_empty() {
430            // Wait for explicit wakeup, it should happen when there is new outgoing data
431            fiber::fiber_yield();
432        } else {
433            let result = writer.write_all(&data).await;
434            handle_result!(client.borrow_mut(), result);
435        }
436    }
437}
438
439/// Receiver work loop. Yields on each iteration and during awaits.
440// Clippy falsely reports that we're holding a `NoYieldsRefCell` reference across an
441// `await`, even though we're explicitly dropping the reference right before
442// awaiting. Thank you clippy, very helpful!
443#[allow(clippy::await_holding_refcell_ref)]
444async fn receiver(
445    client_cell: Rc<NoYieldsRefCell<ClientInner>>,
446    mut reader: impl AsyncReadExt + Unpin,
447) {
448    let mut buf = vec![0_u8; 4096];
449    loop {
450        let client = client_cell.borrow();
451        if client.state.is_closed() || fiber::is_cancelled() {
452            return;
453        }
454
455        let size = client.protocol.read_size_hint();
456        if buf.len() < size {
457            buf.resize(size, 0);
458        }
459        let buf_slice = &mut buf[0..size];
460
461        // Reference must be dropped before yielding.
462        drop(client);
463
464        let res = reader.read_exact(buf_slice).await;
465
466        let mut client = client_cell.borrow_mut();
467        handle_result!(client, res);
468
469        let result = client
470            .protocol
471            .process_incoming(&mut Cursor::new(buf_slice));
472        let result = handle_result!(client, result);
473        if let Some(sync) = result {
474            let subscription = client.awaiting_response.remove(&sync);
475            if let Some(subscription) = subscription {
476                subscription
477                    .send(Ok(()))
478                    .expect("cannot be closed at this point");
479            } else {
480                crate::say_warn!("received unwaited message for {sync:?}");
481            }
482        }
483
484        // Wake sender to handle the greeting we may have just received
485        maybe_wake_sender(&client);
486    }
487}
488
489#[cfg(feature = "internal_test")]
490mod tests {
491    use super::*;
492    use crate::error::TarantoolErrorCode;
493    use crate::fiber::r#async::timeout::IntoTimeout as _;
494    use crate::space::Space;
495    use crate::test::util::listen_port;
496    use std::time::Duration;
497
498    async fn test_client() -> Client {
499        Client::connect_with_config(
500            "localhost",
501            listen_port(),
502            protocol::Config {
503                creds: Some(("test_user".into(), "password".into())),
504                auth_method: crate::auth::AuthMethod::ChapSha1,
505                ..Default::default()
506            },
507        )
508        .timeout(Duration::from_secs(3))
509        .await
510        .unwrap()
511    }
512
513    #[crate::test(tarantool = "crate")]
514    async fn connect_with_timeout() {
515        // Without timeout, the connection hangs on address like this
516        let client = Client::connect_with_config(
517            "123123", // Invalid host
518            listen_port(),
519            protocol::Config {
520                connect_timeout: Some(Duration::from_secs(1)),
521                ..Default::default()
522            },
523        );
524        let res = client.await.unwrap_err();
525        if cfg!(target_os = "macos") {
526            assert!(res.to_string().contains("No route to host"));
527        } else {
528            assert_eq!(res.to_string(), "connect timeout");
529        }
530    }
531
532    #[crate::test(tarantool = "crate")]
533    async fn connect() {
534        let _client = Client::connect("localhost", listen_port()).await.unwrap();
535    }
536
537    #[crate::test(tarantool = "crate")]
538    async fn connect_failure() {
539        // Can be any other unused port
540        let err = Client::connect("localhost", 0).await.unwrap_err();
541        assert!(matches!(dbg!(err), ClientError::ConnectionClosed(_)))
542    }
543
544    #[crate::test(tarantool = "crate")]
545    async fn ping() {
546        let client = test_client().await;
547
548        for _ in 0..5 {
549            client.ping().timeout(Duration::from_secs(3)).await.unwrap();
550        }
551    }
552
553    #[crate::test(tarantool = "crate")]
554    async fn connect_with_cluster_uuid_and_ping() {
555        // Ensure that sending cluster_uuid via IPROTO_ID does not break vanilla Tarantool.
556        let client = Client::connect_with_config(
557            "localhost",
558            listen_port(),
559            protocol::Config {
560                creds: Some(("test_user".into(), "password".into())),
561                auth_method: crate::auth::AuthMethod::ChapSha1,
562                cluster_uuid: Some("11111111-2222-3333-4444-555555555555".into()),
563                ..Default::default()
564            },
565        )
566        .timeout(Duration::from_secs(3))
567        .await
568        .unwrap();
569
570        client.ping().timeout(Duration::from_secs(3)).await.unwrap();
571    }
572
573    #[crate::test(tarantool = "crate")]
574    fn ping_concurrent() {
575        let client = fiber::block_on(test_client());
576        let fiber_a = fiber::start_async(async {
577            client.ping().timeout(Duration::from_secs(3)).await.unwrap()
578        });
579        let fiber_b = fiber::start_async(async {
580            client.ping().timeout(Duration::from_secs(3)).await.unwrap()
581        });
582        fiber_a.join();
583        fiber_b.join();
584    }
585
586    #[crate::test(tarantool = "crate")]
587    async fn execute() {
588        Space::find("test_s1")
589            .unwrap()
590            .insert(&(6001, "6001"))
591            .unwrap();
592        Space::find("test_s1")
593            .unwrap()
594            .insert(&(6002, "6002"))
595            .unwrap();
596
597        let client = test_client().await;
598
599        let lua = crate::lua_state();
600        // Error is silently ignored on older versions, before 'compat' was introduced.
601        _ = lua.exec("require'compat'.sql_seq_scan_default = 'old'");
602
603        let result = client
604            .execute(r#"SELECT * FROM "test_s1""#, &())
605            .timeout(Duration::from_secs(3))
606            .await
607            .unwrap();
608        assert!(result.len() >= 2);
609
610        let result = client
611            .execute(r#"SELECT * FROM "test_s1" WHERE "id" = ?"#, &(6002,))
612            .timeout(Duration::from_secs(3))
613            .await
614            .unwrap();
615
616        assert_eq!(result.len(), 1);
617        assert_eq!(
618            result.first().unwrap().decode::<(u64, String)>().unwrap(),
619            (6002, "6002".into())
620        );
621    }
622
623    #[crate::test(tarantool = "crate")]
624    async fn call() {
625        let client = test_client().await;
626
627        let result = client
628            .call("test_stored_proc", &(1, 2))
629            .timeout(Duration::from_secs(3))
630            .await
631            .unwrap();
632        assert_eq!(result.decode::<(i32,)>().unwrap(), (3,));
633    }
634
635    #[crate::test(tarantool = "crate")]
636    async fn invalid_call() {
637        let client = test_client().await;
638
639        let err = client
640            .call("unexistent_proc", &())
641            .timeout(Duration::from_secs(3))
642            .await
643            .unwrap_err();
644
645        let err = error::Error::from(err);
646        let error::Error::Remote(err) = err else {
647            panic!()
648        };
649
650        assert_eq!(err.error_code(), TarantoolErrorCode::NoSuchProc as u32);
651
652        #[rustfmt::skip]
653        assert_eq!(err.to_string(), "NoSuchProc: Procedure 'unexistent_proc' is not defined");
654    }
655
656    #[crate::test(tarantool = "crate")]
657    async fn eval() {
658        let client = test_client().await;
659
660        // Ok result
661        let result = client
662            .eval("return ...", &(1, 2))
663            .timeout(Duration::from_secs(3))
664            .await
665            .unwrap();
666        assert_eq!(result.decode::<(i32, i32)>().unwrap(), (1, 2));
667
668        // Error result
669        let err = client
670            .eval("box.error(420)", &())
671            .timeout(Duration::from_secs(3))
672            .await
673            .unwrap_err();
674
675        let err = error::Error::from(err);
676        let error::Error::Remote(err) = err else {
677            panic!()
678        };
679
680        assert_eq!(err.error_code(), 420);
681    }
682
683    /// A regression test for https://git.picodata.io/picodata/picodata/tarantool-module/-/merge_requests/302
684    #[crate::test(tarantool = "crate")]
685    async fn client_count_regression() {
686        let client = test_client().await;
687        // Should close sender and receiver fibers
688        client.0.borrow_mut().stream.shutdown().unwrap();
689        // Receiver wakes and closes
690        fiber::reschedule();
691
692        let fiber_id = client.0.borrow().sender_fiber_id.unwrap();
693        let fiber_exists = fiber::wakeup(fiber_id);
694        debug_assert!(fiber_exists);
695
696        // Sender wakes and closes
697        fiber::reschedule();
698        // Sender and receiver stopped and dropped their refs
699        assert_eq!(Rc::strong_count(&client.0), 1);
700
701        // Cloning a client produces 2 refs
702        let client_clone = client.clone();
703        assert_eq!(Rc::strong_count(&client.0), 2);
704        // Here if client checked by Rc refs <= 3 it would assume it is the last and set state to ClosedManually
705        drop(client_clone);
706        assert_eq!(Rc::strong_count(&client.0), 1);
707
708        // This would panic on unreachable if previous drop have set the state
709        client.check_state().unwrap_err();
710    }
711
712    #[crate::test(tarantool = "crate")]
713    async fn concurrent_messages_one_fiber() {
714        let client = test_client().await;
715        let mut ping_futures = vec![];
716        for _ in 0..10 {
717            ping_futures.push(client.ping());
718        }
719        for res in futures::future::join_all(ping_futures).await {
720            res.unwrap();
721        }
722    }
723
724    #[crate::test(tarantool = "crate")]
725    async fn data_always_present_in_response() {
726        let client = test_client().await;
727
728        // Even though we do a return without value,
729        // error `ResponseDataNotFound` is never returned, the result is Ok(_) instead.
730        client.eval("return", &()).await.unwrap();
731        client.call("LUA", &("return",)).await.unwrap();
732    }
733
734    #[crate::test(tarantool = "crate")]
735    async fn big_data() {
736        // NOTE: random looking constants in this test are random.
737        // I'm just increasing the entropy for good luck.
738        use crate::tuple::RawByteBuf;
739
740        #[crate::proc(tarantool = "crate")]
741        fn proc_big_data<'a>(s: &'a serde_bytes::Bytes) -> usize {
742            s.len() + 17
743        }
744
745        let proc = crate::define_stored_proc_for_tests!(proc_big_data);
746        let client = test_client().await;
747
748        // Note: tarantool on macos has a bug where it will try to read more
749        // data than macos allows to be read at once.
750        #[cfg(target_os = "macos")]
751        const N: u32 = 0x1fff_ff69;
752
753        #[cfg(not(target_os = "macos"))]
754        const N: u32 = 0x6fff_ff69;
755
756        // SAFETY: this is basically a generation of a random array
757        #[allow(clippy::uninit_vec)]
758        let s = unsafe {
759            let buf_size = (N + 6) as usize;
760            let mut data = Vec::<u8>::with_capacity(buf_size);
761            data.set_len(buf_size);
762            data[0] = b'\x91';
763            data[1] = b'\xc6'; // BIN32
764            data[2..6].copy_from_slice(&N.to_be_bytes());
765            RawByteBuf::from(data)
766        };
767
768        let t0 = std::time::Instant::now();
769        let t = client.call(&proc, &s).await.unwrap();
770        dbg!(t0.elapsed());
771
772        if let Ok((len,)) = t.decode::<(u32,)>() {
773            assert_eq!(len, N + 17);
774        } else {
775            let ((len,),): ((u32,),) = t.decode().unwrap();
776            assert_eq!(len, N + 17);
777        }
778    }
779
780    #[cfg(feature = "picodata")]
781    #[crate::test(tarantool = "crate")]
782    async fn md5_auth_method() {
783        use crate::auth::AuthMethod;
784        use std::time::Duration;
785
786        let username = "Johnny";
787        let password = "B. Goode";
788
789        // NOTE: because we test our fork of `tarantool` here (see `picodata` feature flag on a test), we can
790        // pass `auth_type` parameter right into `box.schema.user.create`. This won't work in default `tarantool`.
791        crate::lua_state()
792            .exec_with(
793                "local username, password = ...
794                box.cfg { }
795                box.schema.user.create(username, { if_not_exists = true, auth_type = 'md5', password = password })
796                box.schema.user.grant(username, 'super', nil, nil, { if_not_exists = true })",
797                (username, password),
798            )
799            .unwrap();
800
801        // Successful connection
802        {
803            let client = Client::connect_with_config(
804                "localhost",
805                listen_port(),
806                protocol::Config {
807                    creds: Some((username.into(), password.into())),
808                    auth_method: AuthMethod::Md5,
809                    ..Default::default()
810                },
811            )
812            .timeout(Duration::from_secs(3))
813            .await
814            .unwrap();
815
816            // network::Client will not try actually connecting until we send the
817            // first request
818            client
819                .eval("print('\\x1b[32mit works!\\x1b[0m')", &())
820                .await
821                .unwrap();
822        }
823
824        // Wrong password
825        {
826            let client = Client::connect_with_config(
827                "localhost",
828                listen_port(),
829                protocol::Config {
830                    creds: Some((username.into(), "wrong password".into())),
831                    auth_method: AuthMethod::Md5,
832                    ..Default::default()
833                },
834            )
835            .timeout(Duration::from_secs(3))
836            .await
837            .unwrap();
838
839            // network::Client will not try actually connecting until we send the
840            // first request
841            let err = client.eval("return", &()).await.unwrap_err().to_string();
842            #[rustfmt::skip]
843            assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
844        }
845
846        // Wrong auth method
847        {
848            let client = Client::connect_with_config(
849                "localhost",
850                listen_port(),
851                protocol::Config {
852                    creds: Some((username.into(), password.into())),
853                    auth_method: AuthMethod::ChapSha1,
854                    ..Default::default()
855                },
856            )
857            .timeout(Duration::from_secs(3))
858            .await
859            .unwrap();
860
861            // network::Client will not try actually connecting until we send the
862            // first request
863            let err = client.eval("return", &()).await.unwrap_err().to_string();
864            #[rustfmt::skip]
865            assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
866        }
867
868        crate::lua_state()
869            // This is the default
870            .exec_with(
871                "local username = ...
872                box.cfg { auth_type = 'chap-sha1' }
873                box.schema.user.drop(username)",
874                username,
875            )
876            .unwrap();
877    }
878
879    #[cfg(feature = "picodata")]
880    #[crate::test(tarantool = "crate")]
881    async fn ldap_auth_method() {
882        use crate::auth::AuthMethod;
883        use std::time::Duration;
884
885        let username = "Johnny";
886        let password = "B. Goode";
887
888        let _guard = crate::unwrap_ok_or!(
889            crate::test::util::setup_ldap_auth(username, password),
890            Err(e) => {
891                println!("{e}, skipping ldap test");
892                return;
893            }
894        );
895
896        // Successfull connection
897        {
898            let client = Client::connect_with_config(
899                "localhost",
900                listen_port(),
901                protocol::Config {
902                    creds: Some((username.into(), password.into())),
903                    auth_method: AuthMethod::Ldap,
904                    ..Default::default()
905                },
906            )
907            .timeout(Duration::from_secs(3))
908            .await
909            .unwrap();
910
911            // network::Client will not try actually connecting until we send the
912            // first request
913            client
914                .eval("print('\\x1b[32mit works!\\x1b[0m')", &())
915                .await
916                .unwrap();
917        }
918
919        // Wrong password
920        {
921            let client = Client::connect_with_config(
922                "localhost",
923                listen_port(),
924                protocol::Config {
925                    creds: Some((username.into(), "wrong password".into())),
926                    auth_method: AuthMethod::Ldap,
927                    ..Default::default()
928                },
929            )
930            .timeout(Duration::from_secs(3))
931            .await
932            .unwrap();
933
934            // network::Client will not try actually connecting until we send the
935            // first request
936            let err = client.eval("return", &()).await.unwrap_err().to_string();
937            #[rustfmt::skip]
938            assert_eq!(err, "server responded with error: System: Invalid credentials");
939        }
940
941        // Wrong auth method
942        {
943            let client = Client::connect_with_config(
944                "localhost",
945                listen_port(),
946                protocol::Config {
947                    creds: Some((username.into(), password.into())),
948                    auth_method: AuthMethod::ChapSha1,
949                    ..Default::default()
950                },
951            )
952            .timeout(Duration::from_secs(3))
953            .await
954            .unwrap();
955
956            // network::Client will not try actually connecting until we send the
957            // first request
958            let err = client.eval("return", &()).await.unwrap_err().to_string();
959            #[rustfmt::skip]
960            assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
961        }
962    }
963
964    #[crate::test(tarantool = "crate")]
965    async fn extended_error_info() {
966        let client = test_client().await;
967
968        let res = client
969            .eval(
970                "error1 = box.error.new(box.error.UNSUPPORTED, 'this', 'that')
971                error2 = box.error.new('MyCode', 'my message')
972                error3 = box.error.new('MyOtherCode', 'my other message')
973                error2:set_prev(error3)
974                error1:set_prev(error2)
975                error1:raise()",
976                &(),
977            )
978            .timeout(Duration::from_secs(3))
979            .await;
980
981        let error::Error::Remote(e) = error::Error::from(res.unwrap_err()) else {
982            panic!();
983        };
984
985        assert_eq!(e.error_code(), TarantoolErrorCode::Unsupported as u32);
986        assert_eq!(e.message(), "this does not support that");
987        assert_eq!(e.error_type(), "ClientError");
988        assert_eq!(e.file(), Some("eval"));
989        assert_eq!(e.line(), Some(1));
990        let fields_len = e.fields().len();
991        // Starting from tarantool 2.11.5 it will contain `name` field
992        assert!(fields_len == 1 || fields_len == 0);
993        if fields_len == 1 {
994            assert_eq!(e.fields()["name"], rmpv::Value::from("UNSUPPORTED"));
995        }
996
997        let e = e.cause().unwrap();
998
999        assert_eq!(e.error_code(), 0);
1000        assert_eq!(e.message(), "my message");
1001        assert_eq!(e.error_type(), "CustomError");
1002        assert_eq!(e.file(), Some("eval"));
1003        assert_eq!(e.line(), Some(2));
1004        assert_eq!(e.fields().len(), 1);
1005        assert_eq!(e.fields()["custom_type"], rmpv::Value::from("MyCode"));
1006
1007        let e = e.cause().unwrap();
1008
1009        assert_eq!(e.error_code(), 0);
1010        assert_eq!(e.message(), "my other message");
1011        assert_eq!(e.error_type(), "CustomError");
1012        assert_eq!(e.file(), Some("eval"));
1013        assert_eq!(e.line(), Some(3));
1014        assert_eq!(e.fields().len(), 1);
1015        assert_eq!(e.fields()["custom_type"], rmpv::Value::from("MyOtherCode"));
1016
1017        assert!(e.cause().is_none());
1018    }
1019
1020    #[crate::test(tarantool = "crate")]
1021    async fn custom_error_code_from_proc() {
1022        #[crate::proc(tarantool = "crate")]
1023        fn proc_custom_error_code() -> Result<(), crate::error::Error> {
1024            Err(BoxError::new(666666_u32, "na ah").into())
1025        }
1026        let error_line = line!() - 2; // where `BoxError` is constructed
1027
1028        let proc = crate::define_stored_proc_for_tests!(proc_custom_error_code);
1029        let client = test_client().await;
1030
1031        let res = client
1032            .call(&proc, &())
1033            .timeout(Duration::from_secs(3))
1034            .await;
1035
1036        let e = match error::Error::from(res.unwrap_err()) {
1037            error::Error::Remote(e) => e,
1038            other => {
1039                panic!("unexpected error: {}", other);
1040            }
1041        };
1042
1043        assert_eq!(e.error_code(), 666666);
1044        assert_eq!(e.message(), "na ah");
1045        assert_eq!(e.error_type(), "ClientError");
1046        assert_eq!(e.file(), Some(file!()));
1047        assert_eq!(e.line(), Some(error_line));
1048    }
1049
1050    #[crate::test(tarantool = "crate")]
1051    async fn check_error_location() {
1052        // The line number reported for the error will point to the #[proc]
1053        // macro attribute because tarantool::error::Error doesn't actually
1054        // store the error location and the location is captured when
1055        // IntoBoxError::set_last_error is called which happens in the code
1056        // generated by the proc macro. So if you want to have a more helpful
1057        // error location you should construct the BoxError explicitly.
1058        let error_line = line!() + 1;
1059        #[crate::proc(tarantool = "crate")]
1060        fn proc_check_error_location_implicit() -> Result<(), error::Error> {
1061            Err(error::Error::other("not good"))
1062        }
1063
1064        let proc = crate::define_stored_proc_for_tests!(proc_check_error_location_implicit);
1065        let client = test_client().await;
1066
1067        let res = client
1068            .call(&proc, &())
1069            .timeout(Duration::from_secs(3))
1070            .await;
1071
1072        let e = match error::Error::from(res.unwrap_err()) {
1073            error::Error::Remote(e) => e,
1074            other => {
1075                panic!("unexpected error: {}", other);
1076            }
1077        };
1078
1079        assert_eq!(e.file(), Some(file!()));
1080        assert_eq!(e.line(), Some(error_line));
1081    }
1082}