1pub mod reconnect;
37pub mod tcp;
38
39use std::collections::HashMap;
40use std::io::Cursor;
41use std::rc::Rc;
42use std::sync::Arc;
43use std::time::Duration;
44
45use self::tcp::TcpStream;
46
47use super::protocol::api::{Call, Eval, Execute, Ping, Request};
48use super::protocol::{self, Protocol, SyncIndex};
49use crate::error;
50use crate::error::BoxError;
51use crate::fiber;
52use crate::fiber::r#async::oneshot;
53use crate::fiber::r#async::IntoOnDrop as _;
54use crate::fiber::FiberId;
55use crate::fiber::NoYieldsRefCell;
56use crate::tuple::{ToTupleBuffer, Tuple};
57use crate::unwrap_ok_or;
58
59use futures::{AsyncReadExt, AsyncWriteExt};
60
61#[deprecated = "use `ClientError` instead"]
62pub type Error = ClientError;
63
64#[derive(thiserror::Error, Debug)]
66pub enum ClientError {
67 #[error("{0}")]
72 ConnectionClosed(Arc<crate::error::Error>),
73
74 #[error("{0}")]
79 RequestEncode(crate::error::Error),
80
81 #[error("{0}")]
86 ResponseDecode(crate::error::Error),
87
88 #[error("{0}")]
93 ErrorResponse(BoxError),
94}
95
96impl From<ClientError> for crate::error::Error {
97 #[inline(always)]
98 fn from(err: ClientError) -> Self {
99 match err {
100 ClientError::ConnectionClosed(err) => crate::error::Error::ConnectionClosed(err),
101 ClientError::RequestEncode(err) => err,
102 ClientError::ResponseDecode(err) => err,
103 ClientError::ErrorResponse(err) => crate::error::Error::Remote(err),
104 }
105 }
106}
107
108#[derive(Clone, Debug)]
109enum State {
110 Alive,
111 ClosedManually,
112 ClosedWithError(Arc<error::Error>),
114}
115
116impl State {
117 fn is_alive(&self) -> bool {
118 matches!(self, Self::Alive)
119 }
120
121 fn is_closed(&self) -> bool {
122 !self.is_alive()
123 }
124}
125
126#[derive(Debug)]
127struct ClientInner {
128 protocol: Protocol,
129 awaiting_response: HashMap<SyncIndex, oneshot::Sender<Result<(), Arc<error::Error>>>>,
130 state: State,
131 stream: TcpStream,
134 sender_fiber_id: Option<FiberId>,
135 receiver_fiber_id: Option<FiberId>,
136 clients_count: usize,
137}
138
139impl ClientInner {
140 pub fn new(config: protocol::Config, stream: TcpStream) -> Self {
141 #[cfg(feature = "picodata")]
142 if config.auth_method == crate::auth::AuthMethod::Ldap {
143 crate::say_warn!(
144 "You're using the 'ldap' authentication method, which implies sending the password UNENCRYPTED over the TCP connection. TLS is not yet implemented for IPROTO connections so make sure your communication channel is secure by other means."
145 )
146 }
147 Self {
148 protocol: Protocol::with_config(config),
149 awaiting_response: HashMap::new(),
150 state: State::Alive,
151 stream,
152 sender_fiber_id: None,
153 receiver_fiber_id: None,
154 clients_count: 1,
155 }
156 }
157}
158
159fn maybe_wake_sender(client: &ClientInner) {
161 if client.protocol.ready_outgoing_len() == 0 {
162 return;
164 }
165 if let Some(id) = client.sender_fiber_id {
166 fiber::wakeup(id);
167 }
168}
169
170#[derive(Debug)]
177pub struct Client(Rc<NoYieldsRefCell<ClientInner>>);
178
179impl Client {
180 pub async fn connect(url: &str, port: u16) -> Result<Self, ClientError> {
186 Self::connect_with_config(url, port, Default::default()).await
187 }
188
189 pub async fn connect_with_config(
199 url: &str,
200 port: u16,
201 config: protocol::Config,
202 ) -> Result<Self, ClientError> {
203 let timeout = config.connect_timeout.unwrap_or(Duration::MAX);
204 let stream = TcpStream::connect_timeout(url, port, timeout)
205 .map_err(|e| ClientError::ConnectionClosed(Arc::new(e.into())))?;
206 let client = ClientInner::new(config, stream.clone());
207 let client = Rc::new(NoYieldsRefCell::new(client));
208
209 let receiver_fiber_id = fiber::Builder::new()
210 .func_async(receiver(client.clone(), stream.clone()))
211 .name(format!("iproto-in/{url}:{port}"))
212 .start_non_joinable()
213 .unwrap();
214
215 let sender_fiber_id = fiber::Builder::new()
216 .func_async(sender(client.clone(), stream))
217 .name(format!("iproto-out/{url}:{port}"))
218 .start_non_joinable()
219 .unwrap();
220
221 {
222 let mut client_mut = client.borrow_mut();
223 client_mut.receiver_fiber_id = Some(receiver_fiber_id);
224 client_mut.sender_fiber_id = Some(sender_fiber_id);
225 }
226
227 Ok(Self(client))
228 }
229
230 fn check_state(&self) -> Result<(), Arc<error::Error>> {
231 match &self.0.borrow().state {
232 State::Alive => Ok(()),
233 State::ClosedManually => unreachable!("All client handles are dropped at this point"),
234 State::ClosedWithError(err) => Err(err.clone()),
235 }
236 }
237}
238
239#[async_trait::async_trait(?Send)]
241pub trait AsClient {
242 async fn send<R: Request>(&self, request: &R) -> Result<R::Response, ClientError>;
249
250 async fn ping(&self) -> Result<(), ClientError> {
252 self.send(&Ping).await
253 }
254
255 async fn call<T>(&self, fn_name: &str, args: &T) -> Result<Tuple, ClientError>
261 where
262 T: ToTupleBuffer + ?Sized,
263 {
264 self.send(&Call { fn_name, args }).await
265 }
266
267 async fn eval<T>(&self, expr: &str, args: &T) -> Result<Tuple, ClientError>
275 where
276 T: ToTupleBuffer + ?Sized,
277 {
278 self.send(&Eval { args, expr }).await
279 }
280
281 async fn execute<T>(&self, sql: &str, bind_params: &T) -> Result<Vec<Tuple>, ClientError>
283 where
284 T: ToTupleBuffer + ?Sized,
285 {
286 self.send(&Execute { sql, bind_params }).await
287 }
288}
289
290#[async_trait::async_trait(?Send)]
291impl AsClient for Client {
292 async fn send<R: Request>(&self, request: &R) -> Result<R::Response, ClientError> {
293 if let Err(e) = self.check_state() {
294 return Err(ClientError::ConnectionClosed(e));
295 }
296
297 let res = self.0.borrow_mut().protocol.send_request(request);
298 let sync = unwrap_ok_or!(res,
299 Err(e) => {
300 return Err(ClientError::RequestEncode(e));
301 }
302 );
303
304 let (tx, rx) = oneshot::channel();
305 self.0.borrow_mut().awaiting_response.insert(sync, tx);
306 maybe_wake_sender(&self.0.borrow());
307 let res = rx
311 .on_drop(|| {
312 let _ = self.0.borrow_mut().awaiting_response.remove(&sync);
313 })
314 .await
315 .expect("Channel should be open");
316 if let Err(e) = res {
317 return Err(ClientError::ConnectionClosed(e));
318 }
319
320 let res = self
321 .0
322 .borrow_mut()
323 .protocol
324 .take_response::<R>(sync)
325 .expect("Is present at this point");
326 let response = unwrap_ok_or!(res,
327 Err(error::Error::Remote(response)) => {
328 return Err(ClientError::ErrorResponse(response));
329 }
330 Err(e) => {
331 return Err(ClientError::ResponseDecode(e));
332 }
333 );
334 Ok(response)
335 }
336}
337
338impl Drop for Client {
339 fn drop(&mut self) {
340 let clients_count = self.0.borrow().clients_count;
341 if clients_count == 1 {
342 let mut client = self.0.borrow_mut();
343 client.state = State::ClosedManually;
345
346 let receiver_fiber_id = client.receiver_fiber_id;
347 let sender_fiber_id = client.sender_fiber_id;
348
349 if let Err(e) = client.stream.close() {
353 crate::say_error!("Client::drop: failed closing tcp stream: {e}");
354 }
355
356 drop(client);
358
359 if let Some(id) = receiver_fiber_id {
361 fiber::cancel(id);
362 fiber::wakeup(id);
363 }
364
365 if let Some(id) = sender_fiber_id {
366 fiber::cancel(id);
367 fiber::wakeup(id);
368 }
369 } else {
370 self.0.borrow_mut().clients_count -= 1;
371 }
372 }
373}
374
375impl Clone for Client {
376 fn clone(&self) -> Self {
377 self.0.borrow_mut().clients_count += 1;
378 Self(self.0.clone())
379 }
380}
381
382macro_rules! handle_result {
383 ($client:expr, $e:expr) => {
384 match $e {
385 Ok(value) => value,
386 Err(err) => {
387 let err = Arc::new(error::Error::from(err));
388 let subscriptions: HashMap<_, _> = $client.awaiting_response.drain().collect();
390 for (_, subscription) in subscriptions {
391 let _ = subscription.send(Err(err.clone()));
393 }
394 $client.state = State::ClosedWithError(err);
395 return;
396 }
397 }
398 };
399}
400
401async fn sender(client: Rc<NoYieldsRefCell<ClientInner>>, mut writer: TcpStream) {
403 loop {
404 if client.borrow().state.is_closed() || fiber::is_cancelled() {
405 return;
406 }
407 let data = client.borrow_mut().protocol.take_outgoing_data();
409 if data.is_empty() {
410 fiber::fiber_yield();
412 } else {
413 let result = writer.write_all(&data).await;
414 handle_result!(client.borrow_mut(), result);
415 }
416 }
417}
418
419#[allow(clippy::await_holding_refcell_ref)]
424async fn receiver(client_cell: Rc<NoYieldsRefCell<ClientInner>>, mut reader: TcpStream) {
425 let mut buf = vec![0_u8; 4096];
426 loop {
427 let client = client_cell.borrow();
428 if client.state.is_closed() || fiber::is_cancelled() {
429 return;
430 }
431
432 let size = client.protocol.read_size_hint();
433 if buf.len() < size {
434 buf.resize(size, 0);
435 }
436 let buf_slice = &mut buf[0..size];
437
438 drop(client);
440
441 let res = reader.read_exact(buf_slice).await;
442
443 let mut client = client_cell.borrow_mut();
444 handle_result!(client, res);
445
446 let result = client
447 .protocol
448 .process_incoming(&mut Cursor::new(buf_slice));
449 let result = handle_result!(client, result);
450 if let Some(sync) = result {
451 let subscription = client.awaiting_response.remove(&sync);
452 if let Some(subscription) = subscription {
453 subscription
454 .send(Ok(()))
455 .expect("cannot be closed at this point");
456 } else {
457 crate::say_warn!("received unwaited message for {sync:?}");
458 }
459 }
460
461 maybe_wake_sender(&client);
463 }
464}
465
466#[cfg(feature = "internal_test")]
467mod tests {
468 use super::*;
469 use crate::error::TarantoolErrorCode;
470 use crate::fiber::r#async::timeout::IntoTimeout as _;
471 use crate::space::Space;
472 use crate::test::util::listen_port;
473 use std::time::Duration;
474
475 async fn test_client() -> Client {
476 Client::connect_with_config(
477 "localhost",
478 listen_port(),
479 protocol::Config {
480 creds: Some(("test_user".into(), "password".into())),
481 auth_method: crate::auth::AuthMethod::ChapSha1,
482 ..Default::default()
483 },
484 )
485 .timeout(Duration::from_secs(3))
486 .await
487 .unwrap()
488 }
489
490 #[crate::test(tarantool = "crate")]
491 async fn connect_with_timeout() {
492 let client = Client::connect_with_config(
494 "123123", listen_port(),
496 protocol::Config {
497 connect_timeout: Some(Duration::from_secs(1)),
498 ..Default::default()
499 },
500 );
501 let res = client.await.unwrap_err();
502 if cfg!(target_os = "macos") {
503 assert!(res.to_string().contains("No route to host"));
504 } else {
505 assert_eq!(res.to_string(), "connect timeout");
506 }
507 }
508
509 #[crate::test(tarantool = "crate")]
510 async fn connect() {
511 let _client = Client::connect("localhost", listen_port()).await.unwrap();
512 }
513
514 #[crate::test(tarantool = "crate")]
515 async fn connect_failure() {
516 let err = Client::connect("localhost", 0).await.unwrap_err();
518 assert!(matches!(dbg!(err), ClientError::ConnectionClosed(_)))
519 }
520
521 #[crate::test(tarantool = "crate")]
522 async fn ping() {
523 let client = test_client().await;
524
525 for _ in 0..5 {
526 client.ping().timeout(Duration::from_secs(3)).await.unwrap();
527 }
528 }
529
530 #[crate::test(tarantool = "crate")]
531 fn ping_concurrent() {
532 let client = fiber::block_on(test_client());
533 let fiber_a = fiber::start_async(async {
534 client.ping().timeout(Duration::from_secs(3)).await.unwrap()
535 });
536 let fiber_b = fiber::start_async(async {
537 client.ping().timeout(Duration::from_secs(3)).await.unwrap()
538 });
539 fiber_a.join();
540 fiber_b.join();
541 }
542
543 #[crate::test(tarantool = "crate")]
544 async fn execute() {
545 Space::find("test_s1")
546 .unwrap()
547 .insert(&(6001, "6001"))
548 .unwrap();
549 Space::find("test_s1")
550 .unwrap()
551 .insert(&(6002, "6002"))
552 .unwrap();
553
554 let client = test_client().await;
555
556 let lua = crate::lua_state();
557 _ = lua.exec("require'compat'.sql_seq_scan_default = 'old'");
559
560 let result = client
561 .execute(r#"SELECT * FROM "test_s1""#, &())
562 .timeout(Duration::from_secs(3))
563 .await
564 .unwrap();
565 assert!(result.len() >= 2);
566
567 let result = client
568 .execute(r#"SELECT * FROM "test_s1" WHERE "id" = ?"#, &(6002,))
569 .timeout(Duration::from_secs(3))
570 .await
571 .unwrap();
572
573 assert_eq!(result.len(), 1);
574 assert_eq!(
575 result.first().unwrap().decode::<(u64, String)>().unwrap(),
576 (6002, "6002".into())
577 );
578 }
579
580 #[crate::test(tarantool = "crate")]
581 async fn call() {
582 let client = test_client().await;
583
584 let result = client
585 .call("test_stored_proc", &(1, 2))
586 .timeout(Duration::from_secs(3))
587 .await
588 .unwrap();
589 assert_eq!(result.decode::<(i32,)>().unwrap(), (3,));
590 }
591
592 #[crate::test(tarantool = "crate")]
593 async fn invalid_call() {
594 let client = test_client().await;
595
596 let err = client
597 .call("unexistent_proc", &())
598 .timeout(Duration::from_secs(3))
599 .await
600 .unwrap_err();
601
602 let err = error::Error::from(err);
603 let error::Error::Remote(err) = err else {
604 panic!()
605 };
606
607 assert_eq!(err.error_code(), TarantoolErrorCode::NoSuchProc as u32);
608
609 #[rustfmt::skip]
610 assert_eq!(err.to_string(), "NoSuchProc: Procedure 'unexistent_proc' is not defined");
611 }
612
613 #[crate::test(tarantool = "crate")]
614 async fn eval() {
615 let client = test_client().await;
616
617 let result = client
619 .eval("return ...", &(1, 2))
620 .timeout(Duration::from_secs(3))
621 .await
622 .unwrap();
623 assert_eq!(result.decode::<(i32, i32)>().unwrap(), (1, 2));
624
625 let err = client
627 .eval("box.error(420)", &())
628 .timeout(Duration::from_secs(3))
629 .await
630 .unwrap_err();
631
632 let err = error::Error::from(err);
633 let error::Error::Remote(err) = err else {
634 panic!()
635 };
636
637 assert_eq!(err.error_code(), 420);
638 }
639
640 #[crate::test(tarantool = "crate")]
642 async fn client_count_regression() {
643 let client = test_client().await;
644 client.0.borrow_mut().stream.close().unwrap();
646 fiber::reschedule();
648
649 let fiber_id = client.0.borrow().sender_fiber_id.unwrap();
650 let fiber_exists = fiber::wakeup(fiber_id);
651 debug_assert!(fiber_exists);
652
653 fiber::reschedule();
655 assert_eq!(Rc::strong_count(&client.0), 1);
657
658 let client_clone = client.clone();
660 assert_eq!(Rc::strong_count(&client.0), 2);
661 drop(client_clone);
663 assert_eq!(Rc::strong_count(&client.0), 1);
664
665 client.check_state().unwrap_err();
667 }
668
669 #[crate::test(tarantool = "crate")]
670 async fn concurrent_messages_one_fiber() {
671 let client = test_client().await;
672 let mut ping_futures = vec![];
673 for _ in 0..10 {
674 ping_futures.push(client.ping());
675 }
676 for res in futures::future::join_all(ping_futures).await {
677 res.unwrap();
678 }
679 }
680
681 #[crate::test(tarantool = "crate")]
682 async fn data_always_present_in_response() {
683 let client = test_client().await;
684
685 client.eval("return", &()).await.unwrap();
688 client.call("LUA", &("return",)).await.unwrap();
689 }
690
691 #[crate::test(tarantool = "crate")]
692 async fn big_data() {
693 use crate::tuple::RawByteBuf;
696
697 #[crate::proc(tarantool = "crate")]
698 fn proc_big_data<'a>(s: &'a serde_bytes::Bytes) -> usize {
699 s.len() + 17
700 }
701
702 let proc = crate::define_stored_proc_for_tests!(proc_big_data);
703 let client = test_client().await;
704
705 #[cfg(target_os = "macos")]
708 const N: u32 = 0x1fff_ff69;
709
710 #[cfg(not(target_os = "macos"))]
711 const N: u32 = 0x6fff_ff69;
712
713 #[allow(clippy::uninit_vec)]
715 let s = unsafe {
716 let buf_size = (N + 6) as usize;
717 let mut data = Vec::<u8>::with_capacity(buf_size);
718 data.set_len(buf_size);
719 data[0] = b'\x91';
720 data[1] = b'\xc6'; data[2..6].copy_from_slice(&N.to_be_bytes());
722 RawByteBuf::from(data)
723 };
724
725 let t0 = std::time::Instant::now();
726 let t = client.call(&proc, &s).await.unwrap();
727 dbg!(t0.elapsed());
728
729 if let Ok((len,)) = t.decode::<(u32,)>() {
730 assert_eq!(len, N + 17);
731 } else {
732 let ((len,),): ((u32,),) = t.decode().unwrap();
733 assert_eq!(len, N + 17);
734 }
735 }
736
737 #[cfg(feature = "picodata")]
738 #[crate::test(tarantool = "crate")]
739 async fn md5_auth_method() {
740 use crate::auth::AuthMethod;
741 use std::time::Duration;
742
743 let username = "Johnny";
744 let password = "B. Goode";
745
746 crate::lua_state()
749 .exec_with(
750 "local username, password = ...
751 box.cfg { }
752 box.schema.user.create(username, { if_not_exists = true, auth_type = 'md5', password = password })
753 box.schema.user.grant(username, 'super', nil, nil, { if_not_exists = true })",
754 (username, password),
755 )
756 .unwrap();
757
758 {
760 let client = Client::connect_with_config(
761 "localhost",
762 listen_port(),
763 protocol::Config {
764 creds: Some((username.into(), password.into())),
765 auth_method: AuthMethod::Md5,
766 ..Default::default()
767 },
768 )
769 .timeout(Duration::from_secs(3))
770 .await
771 .unwrap();
772
773 client
776 .eval("print('\\x1b[32mit works!\\x1b[0m')", &())
777 .await
778 .unwrap();
779 }
780
781 {
783 let client = Client::connect_with_config(
784 "localhost",
785 listen_port(),
786 protocol::Config {
787 creds: Some((username.into(), "wrong password".into())),
788 auth_method: AuthMethod::Md5,
789 ..Default::default()
790 },
791 )
792 .timeout(Duration::from_secs(3))
793 .await
794 .unwrap();
795
796 let err = client.eval("return", &()).await.unwrap_err().to_string();
799 #[rustfmt::skip]
800 assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
801 }
802
803 {
805 let client = Client::connect_with_config(
806 "localhost",
807 listen_port(),
808 protocol::Config {
809 creds: Some((username.into(), password.into())),
810 auth_method: AuthMethod::ChapSha1,
811 ..Default::default()
812 },
813 )
814 .timeout(Duration::from_secs(3))
815 .await
816 .unwrap();
817
818 let err = client.eval("return", &()).await.unwrap_err().to_string();
821 #[rustfmt::skip]
822 assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
823 }
824
825 crate::lua_state()
826 .exec_with(
828 "local username = ...
829 box.cfg { auth_type = 'chap-sha1' }
830 box.schema.user.drop(username)",
831 username,
832 )
833 .unwrap();
834 }
835
836 #[cfg(feature = "picodata")]
837 #[crate::test(tarantool = "crate")]
838 async fn ldap_auth_method() {
839 use crate::auth::AuthMethod;
840 use std::time::Duration;
841
842 let username = "Johnny";
843 let password = "B. Goode";
844
845 let _guard = crate::unwrap_ok_or!(
846 crate::test::util::setup_ldap_auth(username, password),
847 Err(e) => {
848 println!("{e}, skipping ldap test");
849 return;
850 }
851 );
852
853 {
855 let client = Client::connect_with_config(
856 "localhost",
857 listen_port(),
858 protocol::Config {
859 creds: Some((username.into(), password.into())),
860 auth_method: AuthMethod::Ldap,
861 ..Default::default()
862 },
863 )
864 .timeout(Duration::from_secs(3))
865 .await
866 .unwrap();
867
868 client
871 .eval("print('\\x1b[32mit works!\\x1b[0m')", &())
872 .await
873 .unwrap();
874 }
875
876 {
878 let client = Client::connect_with_config(
879 "localhost",
880 listen_port(),
881 protocol::Config {
882 creds: Some((username.into(), "wrong password".into())),
883 auth_method: AuthMethod::Ldap,
884 ..Default::default()
885 },
886 )
887 .timeout(Duration::from_secs(3))
888 .await
889 .unwrap();
890
891 let err = client.eval("return", &()).await.unwrap_err().to_string();
894 #[rustfmt::skip]
895 assert_eq!(err, "server responded with error: System: Invalid credentials");
896 }
897
898 {
900 let client = Client::connect_with_config(
901 "localhost",
902 listen_port(),
903 protocol::Config {
904 creds: Some((username.into(), password.into())),
905 auth_method: AuthMethod::ChapSha1,
906 ..Default::default()
907 },
908 )
909 .timeout(Duration::from_secs(3))
910 .await
911 .unwrap();
912
913 let err = client.eval("return", &()).await.unwrap_err().to_string();
916 #[rustfmt::skip]
917 assert_eq!(err, "server responded with error: PasswordMismatch: User not found or supplied credentials are invalid");
918 }
919 }
920
921 #[crate::test(tarantool = "crate")]
922 async fn extended_error_info() {
923 let client = test_client().await;
924
925 let res = client
926 .eval(
927 "error1 = box.error.new(box.error.UNSUPPORTED, 'this', 'that')
928 error2 = box.error.new('MyCode', 'my message')
929 error3 = box.error.new('MyOtherCode', 'my other message')
930 error2:set_prev(error3)
931 error1:set_prev(error2)
932 error1:raise()",
933 &(),
934 )
935 .timeout(Duration::from_secs(3))
936 .await;
937
938 let error::Error::Remote(e) = error::Error::from(res.unwrap_err()) else {
939 panic!();
940 };
941
942 assert_eq!(e.error_code(), TarantoolErrorCode::Unsupported as u32);
943 assert_eq!(e.message(), "this does not support that");
944 assert_eq!(e.error_type(), "ClientError");
945 assert_eq!(e.file(), Some("eval"));
946 assert_eq!(e.line(), Some(1));
947 let fields_len = e.fields().len();
948 assert!(fields_len == 1 || fields_len == 0);
950 if fields_len == 1 {
951 assert_eq!(e.fields()["name"], rmpv::Value::from("UNSUPPORTED"));
952 }
953
954 let e = e.cause().unwrap();
955
956 assert_eq!(e.error_code(), 0);
957 assert_eq!(e.message(), "my message");
958 assert_eq!(e.error_type(), "CustomError");
959 assert_eq!(e.file(), Some("eval"));
960 assert_eq!(e.line(), Some(2));
961 assert_eq!(e.fields().len(), 1);
962 assert_eq!(e.fields()["custom_type"], rmpv::Value::from("MyCode"));
963
964 let e = e.cause().unwrap();
965
966 assert_eq!(e.error_code(), 0);
967 assert_eq!(e.message(), "my other message");
968 assert_eq!(e.error_type(), "CustomError");
969 assert_eq!(e.file(), Some("eval"));
970 assert_eq!(e.line(), Some(3));
971 assert_eq!(e.fields().len(), 1);
972 assert_eq!(e.fields()["custom_type"], rmpv::Value::from("MyOtherCode"));
973
974 assert!(e.cause().is_none());
975 }
976
977 #[crate::test(tarantool = "crate")]
978 async fn custom_error_code_from_proc() {
979 #[crate::proc(tarantool = "crate")]
980 fn proc_custom_error_code() -> Result<(), crate::error::Error> {
981 Err(BoxError::new(666666_u32, "na ah").into())
982 }
983 let error_line = line!() - 2; let proc = crate::define_stored_proc_for_tests!(proc_custom_error_code);
986 let client = test_client().await;
987
988 let res = client
989 .call(&proc, &())
990 .timeout(Duration::from_secs(3))
991 .await;
992
993 let e = match error::Error::from(res.unwrap_err()) {
994 error::Error::Remote(e) => e,
995 other => {
996 panic!("unexpected error: {}", other);
997 }
998 };
999
1000 assert_eq!(e.error_code(), 666666);
1001 assert_eq!(e.message(), "na ah");
1002 assert_eq!(e.error_type(), "ClientError");
1003 assert_eq!(e.file(), Some(file!()));
1004 assert_eq!(e.line(), Some(error_line));
1005 }
1006
1007 #[crate::test(tarantool = "crate")]
1008 async fn check_error_location() {
1009 let error_line = line!() + 1;
1016 #[crate::proc(tarantool = "crate")]
1017 fn proc_check_error_location_implicit() -> Result<(), error::Error> {
1018 Err(error::Error::other("not good"))
1019 }
1020
1021 let proc = crate::define_stored_proc_for_tests!(proc_check_error_location_implicit);
1022 let client = test_client().await;
1023
1024 let res = client
1025 .call(&proc, &())
1026 .timeout(Duration::from_secs(3))
1027 .await;
1028
1029 let e = match error::Error::from(res.unwrap_err()) {
1030 error::Error::Remote(e) => e,
1031 other => {
1032 panic!("unexpected error: {}", other);
1033 }
1034 };
1035
1036 assert_eq!(e.file(), Some(file!()));
1037 assert_eq!(e.line(), Some(error_line));
1038 }
1039}