1pub mod reconnect;
37pub mod stream;
38pub mod tcp;
39pub mod tls;
40
41use std::collections::HashMap;
42use std::io::Cursor;
43use std::rc::Rc;
44use std::sync::Arc;
45use std::time::Duration;
46
47use self::stream::Stream;
48use self::tcp::TcpStream;
49use self::tls::{TlsConnector, TlsStream};
50
51use super::protocol::api::{Call, Eval, Execute, Ping, Request};
52use super::protocol::{self, Protocol, SyncIndex};
53use crate::error;
54use crate::error::BoxError;
55use crate::fiber;
56use crate::fiber::r#async::oneshot;
57use crate::fiber::r#async::IntoOnDrop as _;
58use crate::fiber::FiberId;
59use crate::fiber::NoYieldsRefCell;
60use crate::tuple::{ToTupleBuffer, Tuple};
61use crate::unwrap_ok_or;
62
63use futures::{AsyncReadExt, AsyncWriteExt};
64
65#[deprecated = "use `ClientError` instead"]
66pub type Error = ClientError;
67
68#[derive(thiserror::Error, Debug)]
70pub enum ClientError {
71 #[error("{0}")]
76 ConnectionClosed(Arc<crate::error::Error>),
77
78 #[error("{0}")]
83 RequestEncode(crate::error::Error),
84
85 #[error("{0}")]
90 ResponseDecode(crate::error::Error),
91
92 #[error("{0}")]
97 ErrorResponse(BoxError),
98}
99
100impl From<ClientError> for crate::error::Error {
101 #[inline(always)]
102 fn from(err: ClientError) -> Self {
103 match err {
104 ClientError::ConnectionClosed(err) => crate::error::Error::ConnectionClosed(err),
105 ClientError::RequestEncode(err) => err,
106 ClientError::ResponseDecode(err) => err,
107 ClientError::ErrorResponse(err) => crate::error::Error::Remote(err),
108 }
109 }
110}
111
112#[derive(Clone, Debug)]
113enum State {
114 Alive,
115 ClosedManually,
116 ClosedWithError(Arc<error::Error>),
118}
119
120impl State {
121 fn is_alive(&self) -> bool {
122 matches!(self, Self::Alive)
123 }
124
125 fn is_closed(&self) -> bool {
126 !self.is_alive()
127 }
128}
129
130#[derive(Debug)]
131struct ClientInner {
132 protocol: Protocol,
133 awaiting_response: HashMap<SyncIndex, oneshot::Sender<Result<(), Arc<error::Error>>>>,
134 state: State,
135 stream: Stream,
138 sender_fiber_id: Option<FiberId>,
139 receiver_fiber_id: Option<FiberId>,
140 clients_count: usize,
141}
142
143impl ClientInner {
144 pub fn new(config: protocol::Config, stream: Stream) -> Self {
145 Self {
146 protocol: Protocol::with_config(config),
147 awaiting_response: HashMap::new(),
148 state: State::Alive,
149 stream,
150 sender_fiber_id: None,
151 receiver_fiber_id: None,
152 clients_count: 1,
153 }
154 }
155}
156
157fn maybe_wake_sender(client: &ClientInner) {
159 if client.protocol.ready_outgoing_len() == 0 {
160 return;
162 }
163 if let Some(id) = client.sender_fiber_id {
164 fiber::wakeup(id);
165 }
166}
167
168#[derive(Debug)]
175pub struct Client(Rc<NoYieldsRefCell<ClientInner>>);
176
177impl Client {
178 pub async fn connect(url: &str, port: u16) -> Result<Self, ClientError> {
184 Self::connect_with_config(url, port, Default::default()).await
185 }
186
187 pub async fn connect_with_config(
197 url: &str,
198 port: u16,
199 config: protocol::Config,
200 ) -> Result<Self, ClientError> {
201 Self::connect_with_config_and_tls(url, port, config, None).await
202 }
203
204 pub async fn connect_with_config_and_tls(
205 url: &str,
206 port: u16,
207 config: protocol::Config,
208 tls_connector: Option<TlsConnector>,
209 ) -> Result<Self, ClientError> {
210 let timeout = config.connect_timeout.unwrap_or(Duration::MAX);
211 let tcp_stream = TcpStream::connect_timeout_async(url, port, timeout)
212 .await
213 .map_err(|e| ClientError::ConnectionClosed(Arc::new(e.into())))?;
214
215 let stream = match tls_connector {
216 Some(tls_connector) => {
217 let tls_stream = TlsStream::connect(&tls_connector, tcp_stream, url)
218 .await
219 .map_err(|e| ClientError::ConnectionClosed(Arc::new(e.into())))?;
220 Stream::Secure(tls_stream)
221 }
222 None => Stream::Plain(tcp_stream),
223 };
224
225 let client = ClientInner::new(config, stream.clone());
226 let client = Rc::new(NoYieldsRefCell::new(client));
227
228 let receiver_fiber_id = fiber::Builder::new()
229 .func_async(receiver(client.clone(), stream.clone()))
230 .name(format!("iproto-in/{url}:{port}"))
231 .start_non_joinable()
232 .unwrap();
233
234 let sender_fiber_id = fiber::Builder::new()
235 .func_async(sender(client.clone(), stream))
236 .name(format!("iproto-out/{url}:{port}"))
237 .start_non_joinable()
238 .unwrap();
239
240 {
241 let mut client_mut = client.borrow_mut();
242 client_mut.receiver_fiber_id = Some(receiver_fiber_id);
243 client_mut.sender_fiber_id = Some(sender_fiber_id);
244 }
245
246 Ok(Self(client))
247 }
248
249 fn check_state(&self) -> Result<(), Arc<error::Error>> {
250 match &self.0.borrow().state {
251 State::Alive => Ok(()),
252 State::ClosedManually => unreachable!("All client handles are dropped at this point"),
253 State::ClosedWithError(err) => Err(err.clone()),
254 }
255 }
256}
257
258#[async_trait::async_trait(?Send)]
260pub trait AsClient {
261 async fn send<R: Request>(&self, request: &R) -> Result<R::Response, ClientError>;
268
269 async fn ping(&self) -> Result<(), ClientError> {
271 self.send(&Ping).await
272 }
273
274 async fn call<T>(&self, fn_name: &str, args: &T) -> Result<Tuple, ClientError>
280 where
281 T: ToTupleBuffer + ?Sized,
282 {
283 self.send(&Call { fn_name, args }).await
284 }
285
286 async fn eval<T>(&self, expr: &str, args: &T) -> Result<Tuple, ClientError>
294 where
295 T: ToTupleBuffer + ?Sized,
296 {
297 self.send(&Eval { args, expr }).await
298 }
299
300 async fn execute<T>(&self, sql: &str, bind_params: &T) -> Result<Vec<Tuple>, ClientError>
302 where
303 T: ToTupleBuffer + ?Sized,
304 {
305 self.send(&Execute { sql, bind_params }).await
306 }
307}
308
309#[async_trait::async_trait(?Send)]
310impl AsClient for Client {
311 async fn send<R: Request>(&self, request: &R) -> Result<R::Response, ClientError> {
312 if let Err(e) = self.check_state() {
313 return Err(ClientError::ConnectionClosed(e));
314 }
315
316 let res = self.0.borrow_mut().protocol.send_request(request);
317 let sync = unwrap_ok_or!(res,
318 Err(e) => {
319 return Err(ClientError::RequestEncode(e));
320 }
321 );
322
323 let (tx, rx) = oneshot::channel();
324 self.0.borrow_mut().awaiting_response.insert(sync, tx);
325 maybe_wake_sender(&self.0.borrow());
326 let res = rx
330 .on_drop(|| {
331 let _ = self.0.borrow_mut().awaiting_response.remove(&sync);
332 })
333 .await
334 .expect("Channel should be open");
335 if let Err(e) = res {
336 return Err(ClientError::ConnectionClosed(e));
337 }
338
339 let res = self
340 .0
341 .borrow_mut()
342 .protocol
343 .take_response::<R>(sync)
344 .expect("Is present at this point");
345 let response = unwrap_ok_or!(res,
346 Err(error::Error::Remote(response)) => {
347 return Err(ClientError::ErrorResponse(response));
348 }
349 Err(e) => {
350 return Err(ClientError::ResponseDecode(e));
351 }
352 );
353
354 Ok(response)
355 }
356}
357
358impl Drop for Client {
359 fn drop(&mut self) {
360 let clients_count = self.0.borrow().clients_count;
361 if clients_count == 1 {
362 let mut client = self.0.borrow_mut();
363 client.state = State::ClosedManually;
365
366 let receiver_fiber_id = client.receiver_fiber_id;
367 let sender_fiber_id = client.sender_fiber_id;
368
369 if let Err(e) = client.stream.shutdown() {
373 crate::say_error!("Client::drop: failed closing iproto stream: {e}");
374 }
375
376 drop(client);
378
379 if let Some(id) = receiver_fiber_id {
381 fiber::cancel(id);
382 fiber::wakeup(id);
383 }
384
385 if let Some(id) = sender_fiber_id {
386 fiber::cancel(id);
387 fiber::wakeup(id);
388 }
389 } else {
390 self.0.borrow_mut().clients_count -= 1;
391 }
392 }
393}
394
395impl Clone for Client {
396 fn clone(&self) -> Self {
397 self.0.borrow_mut().clients_count += 1;
398 Self(self.0.clone())
399 }
400}
401
402macro_rules! handle_result {
403 ($client:expr, $e:expr) => {
404 match $e {
405 Ok(value) => value,
406 Err(err) => {
407 let err = Arc::new(error::Error::from(err));
408 let subscriptions: HashMap<_, _> = $client.awaiting_response.drain().collect();
410 for (_, subscription) in subscriptions {
411 let _ = subscription.send(Err(err.clone()));
413 }
414 $client.state = State::ClosedWithError(err);
415 return;
416 }
417 }
418 };
419}
420
421async fn sender(client: Rc<NoYieldsRefCell<ClientInner>>, mut writer: impl AsyncWriteExt + Unpin) {
423 loop {
424 if client.borrow().state.is_closed() || fiber::is_cancelled() {
425 return;
426 }
427 let data = client.borrow_mut().protocol.take_outgoing_data();
429 if data.is_empty() {
430 fiber::fiber_yield();
432 } else {
433 let result = writer.write_all(&data).await;
434 handle_result!(client.borrow_mut(), result);
435 }
436 }
437}
438
439#[allow(clippy::await_holding_refcell_ref)]
444async fn receiver(
445 client_cell: Rc<NoYieldsRefCell<ClientInner>>,
446 mut reader: impl AsyncReadExt + Unpin,
447) {
448 let mut buf = vec![0_u8; 4096];
449 loop {
450 let client = client_cell.borrow();
451 if client.state.is_closed() || fiber::is_cancelled() {
452 return;
453 }
454
455 let size = client.protocol.read_size_hint();
456 if buf.len() < size {
457 buf.resize(size, 0);
458 }
459 let buf_slice = &mut buf[0..size];
460
461 drop(client);
463
464 let res = reader.read_exact(buf_slice).await;
465
466 let mut client = client_cell.borrow_mut();
467 handle_result!(client, res);
468
469 let result = client
470 .protocol
471 .process_incoming(&mut Cursor::new(buf_slice));
472 let result = handle_result!(client, result);
473 if let Some(sync) = result {
474 let subscription = client.awaiting_response.remove(&sync);
475 if let Some(subscription) = subscription {
476 subscription
477 .send(Ok(()))
478 .expect("cannot be closed at this point");
479 } else {
480 crate::say_warn!("received unwaited message for {sync:?}");
481 }
482 }
483
484 maybe_wake_sender(&client);
486 }
487}
488
489#[cfg(feature = "internal_test")]
490mod tests {
491 use super::*;
492 use crate::error::TarantoolErrorCode;
493 use crate::fiber::r#async::timeout::IntoTimeout as _;
494 use crate::space::Space;
495 use crate::test::util::listen_port;
496 use std::time::Duration;
497
498 async fn test_client() -> Client {
499 Client::connect_with_config(
500 "localhost",
501 listen_port(),
502 protocol::Config {
503 creds: Some(("test_user".into(), "password".into())),
504 auth_method: crate::auth::AuthMethod::ChapSha1,
505 ..Default::default()
506 },
507 )
508 .timeout(Duration::from_secs(3))
509 .await
510 .unwrap()
511 }
512
513 #[crate::test(tarantool = "crate")]
514 async fn connect_with_timeout() {
515 let client = Client::connect_with_config(
517 "123123", listen_port(),
519 protocol::Config {
520 connect_timeout: Some(Duration::from_secs(1)),
521 ..Default::default()
522 },
523 );
524 let res = client.await.unwrap_err();
525 if cfg!(target_os = "macos") {
526 assert!(res.to_string().contains("No route to host"));
527 } else {
528 assert_eq!(res.to_string(), "connect timeout");
529 }
530 }
531
532 #[crate::test(tarantool = "crate")]
533 async fn connect() {
534 let _client = Client::connect("localhost", listen_port()).await.unwrap();
535 }
536
537 #[crate::test(tarantool = "crate")]
538 async fn connect_failure() {
539 let err = Client::connect("localhost", 0).await.unwrap_err();
541 assert!(matches!(dbg!(err), ClientError::ConnectionClosed(_)))
542 }
543
544 #[crate::test(tarantool = "crate")]
545 async fn ping() {
546 let client = test_client().await;
547
548 for _ in 0..5 {
549 client.ping().timeout(Duration::from_secs(3)).await.unwrap();
550 }
551 }
552
553 #[crate::test(tarantool = "crate")]
554 async fn connect_with_cluster_uuid_and_ping() {
555 let client = Client::connect_with_config(
557 "localhost",
558 listen_port(),
559 protocol::Config {
560 creds: Some(("test_user".into(), "password".into())),
561 auth_method: crate::auth::AuthMethod::ChapSha1,
562 cluster_uuid: Some("11111111-2222-3333-4444-555555555555".into()),
563 ..Default::default()
564 },
565 )
566 .timeout(Duration::from_secs(3))
567 .await
568 .unwrap();
569
570 client.ping().timeout(Duration::from_secs(3)).await.unwrap();
571 }
572
573 #[crate::test(tarantool = "crate")]
574 fn ping_concurrent() {
575 let client = fiber::block_on(test_client());
576 let fiber_a = fiber::start_async(async {
577 client.ping().timeout(Duration::from_secs(3)).await.unwrap()
578 });
579 let fiber_b = fiber::start_async(async {
580 client.ping().timeout(Duration::from_secs(3)).await.unwrap()
581 });
582 fiber_a.join();
583 fiber_b.join();
584 }
585
586 #[crate::test(tarantool = "crate")]
587 async fn execute() {
588 Space::find("test_s1")
589 .unwrap()
590 .insert(&(6001, "6001"))
591 .unwrap();
592 Space::find("test_s1")
593 .unwrap()
594 .insert(&(6002, "6002"))
595 .unwrap();
596
597 let client = test_client().await;
598
599 let lua = crate::lua_state();
600 _ = lua.exec("require'compat'.sql_seq_scan_default = 'old'");
602
603 let result = client
604 .execute(r#"SELECT * FROM "test_s1""#, &())
605 .timeout(Duration::from_secs(3))
606 .await
607 .unwrap();
608 assert!(result.len() >= 2);
609
610 let result = client
611 .execute(r#"SELECT * FROM "test_s1" WHERE "id" = ?"#, &(6002,))
612 .timeout(Duration::from_secs(3))
613 .await
614 .unwrap();
615
616 assert_eq!(result.len(), 1);
617 assert_eq!(
618 result.first().unwrap().decode::<(u64, String)>().unwrap(),
619 (6002, "6002".into())
620 );
621 }
622
623 #[crate::test(tarantool = "crate")]
624 async fn call() {
625 let client = test_client().await;
626
627 let result = client
628 .call("test_stored_proc", &(1, 2))
629 .timeout(Duration::from_secs(3))
630 .await
631 .unwrap();
632 assert_eq!(result.decode::<(i32,)>().unwrap(), (3,));
633 }
634
635 #[crate::test(tarantool = "crate")]
636 async fn invalid_call() {
637 let client = test_client().await;
638
639 let err = client
640 .call("unexistent_proc", &())
641 .timeout(Duration::from_secs(3))
642 .await
643 .unwrap_err();
644
645 let err = error::Error::from(err);
646 let error::Error::Remote(err) = err else {
647 panic!()
648 };
649
650 assert_eq!(err.error_code(), TarantoolErrorCode::NoSuchProc as u32);
651
652 #[rustfmt::skip]
653 assert_eq!(err.to_string(), "NoSuchProc: Procedure 'unexistent_proc' is not defined");
654 }
655
656 #[crate::test(tarantool = "crate")]
657 async fn eval() {
658 let client = test_client().await;
659
660 let result = client
662 .eval("return ...", &(1, 2))
663 .timeout(Duration::from_secs(3))
664 .await
665 .unwrap();
666 assert_eq!(result.decode::<(i32, i32)>().unwrap(), (1, 2));
667
668 let err = client
670 .eval("box.error(420)", &())
671 .timeout(Duration::from_secs(3))
672 .await
673 .unwrap_err();
674
675 let err = error::Error::from(err);
676 let error::Error::Remote(err) = err else {
677 panic!()
678 };
679
680 assert_eq!(err.error_code(), 420);
681 }
682
683 #[crate::test(tarantool = "crate")]
685 async fn client_count_regression() {
686 let client = test_client().await;
687 client.0.borrow_mut().stream.shutdown().unwrap();
689 fiber::reschedule();
691
692 let fiber_id = client.0.borrow().sender_fiber_id.unwrap();
693 let fiber_exists = fiber::wakeup(fiber_id);
694 debug_assert!(fiber_exists);
695
696 fiber::reschedule();
698 assert_eq!(Rc::strong_count(&client.0), 1);
700
701 let client_clone = client.clone();
703 assert_eq!(Rc::strong_count(&client.0), 2);
704 drop(client_clone);
706 assert_eq!(Rc::strong_count(&client.0), 1);
707
708 client.check_state().unwrap_err();
710 }
711
712 #[crate::test(tarantool = "crate")]
713 async fn concurrent_messages_one_fiber() {
714 let client = test_client().await;
715 let mut ping_futures = vec![];
716 for _ in 0..10 {
717 ping_futures.push(client.ping());
718 }
719 for res in futures::future::join_all(ping_futures).await {
720 res.unwrap();
721 }
722 }
723
724 #[crate::test(tarantool = "crate")]
725 async fn data_always_present_in_response() {
726 let client = test_client().await;
727
728 client.eval("return", &()).await.unwrap();
731 client.call("LUA", &("return",)).await.unwrap();
732 }
733
734 #[crate::test(tarantool = "crate")]
735 async fn big_data() {
736 use crate::tuple::RawByteBuf;
739
740 #[crate::proc(tarantool = "crate")]
741 fn proc_big_data<'a>(s: &'a serde_bytes::Bytes) -> usize {
742 s.len() + 17
743 }
744
745 let proc = crate::define_stored_proc_for_tests!(proc_big_data);
746 let client = test_client().await;
747
748 #[cfg(target_os = "macos")]
751 const N: u32 = 0x1fff_ff69;
752
753 #[cfg(not(target_os = "macos"))]
754 const N: u32 = 0x6fff_ff69;
755
756 #[allow(clippy::uninit_vec)]
758 let s = unsafe {
759 let buf_size = (N + 6) as usize;
760 let mut data = Vec::<u8>::with_capacity(buf_size);
761 data.set_len(buf_size);
762 data[0] = b'\x91';
763 data[1] = b'\xc6'; data[2..6].copy_from_slice(&N.to_be_bytes());
765 RawByteBuf::from(data)
766 };
767
768 let t0 = std::time::Instant::now();
769 let t = client.call(&proc, &s).await.unwrap();
770 dbg!(t0.elapsed());
771
772 if let Ok((len,)) = t.decode::<(u32,)>() {
773 assert_eq!(len, N + 17);
774 } else {
775 let ((len,),): ((u32,),) = t.decode().unwrap();
776 assert_eq!(len, N + 17);
777 }
778 }
779
780 #[cfg(feature = "picodata")]
781 #[crate::test(tarantool = "crate")]
782 async fn md5_auth_method() {
783 use crate::auth::AuthMethod;
784 use std::time::Duration;
785
786 let username = "Johnny";
787 let password = "B. Goode";
788
789 crate::lua_state()
792 .exec_with(
793 "local username, password = ...
794 box.cfg { }
795 box.schema.user.create(username, { if_not_exists = true, auth_type = 'md5', password = password })
796 box.schema.user.grant(username, 'super', nil, nil, { if_not_exists = true })",
797 (username, password),
798 )
799 .unwrap();
800
801 {
803 let client = Client::connect_with_config(
804 "localhost",
805 listen_port(),
806 protocol::Config {
807 creds: Some((username.into(), password.into())),
808 auth_method: AuthMethod::Md5,
809 ..Default::default()
810 },
811 )
812 .timeout(Duration::from_secs(3))
813 .await
814 .unwrap();
815
816 client
819 .eval("print('\\x1b[32mit works!\\x1b[0m')", &())
820 .await
821 .unwrap();
822 }
823
824 {
826 let client = Client::connect_with_config(
827 "localhost",
828 listen_port(),
829 protocol::Config {
830 creds: Some((username.into(), "wrong password".into())),
831 auth_method: AuthMethod::Md5,
832 ..Default::default()
833 },
834 )
835 .timeout(Duration::from_secs(3))
836 .await
837 .unwrap();
838
839 let err = client.eval("return", &()).await.unwrap_err().to_string();
842 #[rustfmt::skip]
843 assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
844 }
845
846 {
848 let client = Client::connect_with_config(
849 "localhost",
850 listen_port(),
851 protocol::Config {
852 creds: Some((username.into(), password.into())),
853 auth_method: AuthMethod::ChapSha1,
854 ..Default::default()
855 },
856 )
857 .timeout(Duration::from_secs(3))
858 .await
859 .unwrap();
860
861 let err = client.eval("return", &()).await.unwrap_err().to_string();
864 #[rustfmt::skip]
865 assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
866 }
867
868 crate::lua_state()
869 .exec_with(
871 "local username = ...
872 box.cfg { auth_type = 'chap-sha1' }
873 box.schema.user.drop(username)",
874 username,
875 )
876 .unwrap();
877 }
878
879 #[cfg(feature = "picodata")]
880 #[crate::test(tarantool = "crate")]
881 async fn ldap_auth_method() {
882 use crate::auth::AuthMethod;
883 use std::time::Duration;
884
885 let username = "Johnny";
886 let password = "B. Goode";
887
888 let _guard = crate::unwrap_ok_or!(
889 crate::test::util::setup_ldap_auth(username, password),
890 Err(e) => {
891 println!("{e}, skipping ldap test");
892 return;
893 }
894 );
895
896 {
898 let client = Client::connect_with_config(
899 "localhost",
900 listen_port(),
901 protocol::Config {
902 creds: Some((username.into(), password.into())),
903 auth_method: AuthMethod::Ldap,
904 ..Default::default()
905 },
906 )
907 .timeout(Duration::from_secs(3))
908 .await
909 .unwrap();
910
911 client
914 .eval("print('\\x1b[32mit works!\\x1b[0m')", &())
915 .await
916 .unwrap();
917 }
918
919 {
921 let client = Client::connect_with_config(
922 "localhost",
923 listen_port(),
924 protocol::Config {
925 creds: Some((username.into(), "wrong password".into())),
926 auth_method: AuthMethod::Ldap,
927 ..Default::default()
928 },
929 )
930 .timeout(Duration::from_secs(3))
931 .await
932 .unwrap();
933
934 let err = client.eval("return", &()).await.unwrap_err().to_string();
937 #[rustfmt::skip]
938 assert_eq!(err, "server responded with error: System: Invalid credentials");
939 }
940
941 {
943 let client = Client::connect_with_config(
944 "localhost",
945 listen_port(),
946 protocol::Config {
947 creds: Some((username.into(), password.into())),
948 auth_method: AuthMethod::ChapSha1,
949 ..Default::default()
950 },
951 )
952 .timeout(Duration::from_secs(3))
953 .await
954 .unwrap();
955
956 let err = client.eval("return", &()).await.unwrap_err().to_string();
959 #[rustfmt::skip]
960 assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
961 }
962 }
963
964 #[crate::test(tarantool = "crate")]
965 async fn extended_error_info() {
966 let client = test_client().await;
967
968 let res = client
969 .eval(
970 "error1 = box.error.new(box.error.UNSUPPORTED, 'this', 'that')
971 error2 = box.error.new('MyCode', 'my message')
972 error3 = box.error.new('MyOtherCode', 'my other message')
973 error2:set_prev(error3)
974 error1:set_prev(error2)
975 error1:raise()",
976 &(),
977 )
978 .timeout(Duration::from_secs(3))
979 .await;
980
981 let error::Error::Remote(e) = error::Error::from(res.unwrap_err()) else {
982 panic!();
983 };
984
985 assert_eq!(e.error_code(), TarantoolErrorCode::Unsupported as u32);
986 assert_eq!(e.message(), "this does not support that");
987 assert_eq!(e.error_type(), "ClientError");
988 assert_eq!(e.file(), Some("eval"));
989 assert_eq!(e.line(), Some(1));
990 let fields_len = e.fields().len();
991 assert!(fields_len == 1 || fields_len == 0);
993 if fields_len == 1 {
994 assert_eq!(e.fields()["name"], rmpv::Value::from("UNSUPPORTED"));
995 }
996
997 let e = e.cause().unwrap();
998
999 assert_eq!(e.error_code(), 0);
1000 assert_eq!(e.message(), "my message");
1001 assert_eq!(e.error_type(), "CustomError");
1002 assert_eq!(e.file(), Some("eval"));
1003 assert_eq!(e.line(), Some(2));
1004 assert_eq!(e.fields().len(), 1);
1005 assert_eq!(e.fields()["custom_type"], rmpv::Value::from("MyCode"));
1006
1007 let e = e.cause().unwrap();
1008
1009 assert_eq!(e.error_code(), 0);
1010 assert_eq!(e.message(), "my other message");
1011 assert_eq!(e.error_type(), "CustomError");
1012 assert_eq!(e.file(), Some("eval"));
1013 assert_eq!(e.line(), Some(3));
1014 assert_eq!(e.fields().len(), 1);
1015 assert_eq!(e.fields()["custom_type"], rmpv::Value::from("MyOtherCode"));
1016
1017 assert!(e.cause().is_none());
1018 }
1019
1020 #[crate::test(tarantool = "crate")]
1021 async fn custom_error_code_from_proc() {
1022 #[crate::proc(tarantool = "crate")]
1023 fn proc_custom_error_code() -> Result<(), crate::error::Error> {
1024 Err(BoxError::new(666666_u32, "na ah").into())
1025 }
1026 let error_line = line!() - 2; let proc = crate::define_stored_proc_for_tests!(proc_custom_error_code);
1029 let client = test_client().await;
1030
1031 let res = client
1032 .call(&proc, &())
1033 .timeout(Duration::from_secs(3))
1034 .await;
1035
1036 let e = match error::Error::from(res.unwrap_err()) {
1037 error::Error::Remote(e) => e,
1038 other => {
1039 panic!("unexpected error: {}", other);
1040 }
1041 };
1042
1043 assert_eq!(e.error_code(), 666666);
1044 assert_eq!(e.message(), "na ah");
1045 assert_eq!(e.error_type(), "ClientError");
1046 assert_eq!(e.file(), Some(file!()));
1047 assert_eq!(e.line(), Some(error_line));
1048 }
1049
1050 #[crate::test(tarantool = "crate")]
1051 async fn check_error_location() {
1052 let error_line = line!() + 1;
1059 #[crate::proc(tarantool = "crate")]
1060 fn proc_check_error_location_implicit() -> Result<(), error::Error> {
1061 Err(error::Error::other("not good"))
1062 }
1063
1064 let proc = crate::define_stored_proc_for_tests!(proc_check_error_location_implicit);
1065 let client = test_client().await;
1066
1067 let res = client
1068 .call(&proc, &())
1069 .timeout(Duration::from_secs(3))
1070 .await;
1071
1072 let e = match error::Error::from(res.unwrap_err()) {
1073 error::Error::Remote(e) => e,
1074 other => {
1075 panic!("unexpected error: {}", other);
1076 }
1077 };
1078
1079 assert_eq!(e.file(), Some(file!()));
1080 assert_eq!(e.line(), Some(error_line));
1081 }
1082}