1use std::ops::{Deref, DerefMut};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use std::{fmt, io};
5
6use log::*;
7use serde::de::DeserializeOwned;
8use serde::Serialize;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio::task::JoinHandle;
11
12use crate::common::{
13 Connection, FramedTransport, HeapSecretKey, InmemoryTransport, Interest, Reconnectable,
14 Transport, UntypedRequest, UntypedResponse,
15};
16
17mod builder;
18pub use builder::*;
19
20mod channel;
21pub use channel::*;
22
23mod config;
24pub use config::*;
25
26mod reconnect;
27pub use reconnect::*;
28
29mod shutdown;
30pub use shutdown::*;
31
32const SLEEP_DURATION: Duration = Duration::from_millis(1);
34
35pub struct UntypedClient {
41 channel: UntypedChannel,
43
44 watcher: ConnectionWatcher,
46
47 shutdown: Box<dyn Shutdown>,
49
50 shutdown_on_drop: bool,
52
53 task: Option<JoinHandle<io::Result<()>>>,
55}
56
57impl fmt::Debug for UntypedClient {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.debug_struct("UntypedClient")
60 .field("channel", &self.channel)
61 .field("shutdown", &"...")
62 .field("task", &self.task)
63 .field("shutdown_on_drop", &self.shutdown_on_drop)
64 .finish()
65 }
66}
67
68impl Drop for UntypedClient {
69 fn drop(&mut self) {
70 if self.shutdown_on_drop {
71 if let Some(task) = self.task.take() {
73 debug!("Shutdown on drop = true, so aborting client task");
74 task.abort();
75 }
76 }
77 }
78}
79
80impl UntypedClient {
81 pub fn into_typed_client<T, U>(mut self) -> Client<T, U> {
83 Client {
84 channel: self.clone_channel().into_typed_channel(),
85 watcher: self.watcher.clone(),
86 shutdown: self.shutdown.clone(),
87 shutdown_on_drop: self.shutdown_on_drop,
88 task: self.task.take(),
89 }
90 }
91
92 pub fn into_channel(self) -> UntypedChannel {
94 self.clone_channel()
95 }
96
97 pub fn clone_channel(&self) -> UntypedChannel {
99 self.channel.clone()
100 }
101
102 pub async fn wait(mut self) -> io::Result<()> {
106 match self.task.take().unwrap().await {
107 Ok(x) => x,
108 Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)),
109 }
110 }
111
112 pub fn abort(&self) {
114 if let Some(task) = self.task.as_ref() {
115 task.abort();
116 }
117 }
118
119 pub fn clone_shutdown(&self) -> Box<dyn Shutdown> {
122 self.shutdown.clone()
123 }
124
125 pub async fn shutdown(&self) -> io::Result<()> {
127 self.shutdown.shutdown().await
128 }
129
130 pub fn will_shutdown_on_drop(&mut self) -> bool {
133 self.shutdown_on_drop
134 }
135
136 pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) {
139 self.shutdown_on_drop = shutdown_on_drop;
140 }
141
142 pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
144 self.watcher.clone()
145 }
146
147 pub fn on_connection_change<F>(&self, f: F) -> JoinHandle<()>
150 where
151 F: FnMut(ConnectionState) + Send + 'static,
152 {
153 self.watcher.on_change(f)
154 }
155
156 pub fn is_finished(&self) -> bool {
158 self.task.is_none() || self.task.as_ref().unwrap().is_finished()
159 }
160
161 pub fn spawn_inmemory(
170 transport: FramedTransport<InmemoryTransport>,
171 config: ClientConfig,
172 ) -> Self {
173 let connection = Connection::Client {
174 id: rand::random(),
175 reauth_otp: HeapSecretKey::generate(32).unwrap(),
176 transport,
177 };
178 Self::spawn(connection, config)
179 }
180
181 pub(crate) fn spawn<V>(mut connection: Connection<V>, config: ClientConfig) -> Self
183 where
184 V: Transport + 'static,
185 {
186 let post_office = Arc::new(PostOffice::default());
187 let weak_post_office = Arc::downgrade(&post_office);
188 let (tx, mut rx) = mpsc::channel::<UntypedRequest<'static>>(1);
189 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<oneshot::Sender<io::Result<()>>>(1);
190
191 connection.clear();
193
194 let ClientConfig {
195 mut reconnect_strategy,
196 shutdown_on_drop,
197 silence_duration,
198 } = config;
199
200 let shutdown_tx_2 = shutdown_tx.clone();
203 let (watcher_tx, watcher_rx) = watch::channel(ConnectionState::Connected);
204 let task = tokio::spawn(async move {
205 let mut needs_reconnect = false;
206 let mut last_read_frame_time = Instant::now();
207
208 let _shutdown_tx = shutdown_tx_2;
214
215 loop {
216 if needs_reconnect {
218 info!("Client encountered issue, attempting to reconnect");
219 debug!("Using strategy {reconnect_strategy:?}");
220 match reconnect_strategy.reconnect(&mut connection).await {
221 Ok(()) => {
222 info!("Client successfully reconnected!");
223 needs_reconnect = false;
224 last_read_frame_time = Instant::now();
225 watcher_tx.send_replace(ConnectionState::Connected);
226 }
227 Err(x) => {
228 error!("Unable to re-establish connection: {x}");
229 watcher_tx.send_replace(ConnectionState::Disconnected);
230 break Err(x);
231 }
232 }
233 }
234
235 macro_rules! silence_needs_reconnect {
236 () => {{
237 info!(
238 "Client exceeded {}s without server activity, so attempting to reconnect",
239 silence_duration.as_secs_f32(),
240 );
241 needs_reconnect = true;
242 watcher_tx.send_replace(ConnectionState::Reconnecting);
243 continue;
244 }};
245 }
246
247 let silence_time_remaining = silence_duration
248 .checked_sub(last_read_frame_time.elapsed())
249 .unwrap_or_default();
250
251 if silence_time_remaining.as_millis() == 0 {
255 silence_needs_reconnect!();
256 }
257
258 let ready = tokio::select! {
259 cb = shutdown_rx.recv() => {
261 info!("Client got shutdown signal, so exiting event loop");
262 let cb = cb.expect("Impossible: shutdown channel closed!");
263 let _ = cb.send(Ok(()));
264 watcher_tx.send_replace(ConnectionState::Disconnected);
265 break Ok(());
266 }
267 _ = tokio::time::sleep(silence_time_remaining) => {
268 silence_needs_reconnect!();
269 }
270 result = connection.ready(Interest::READABLE | Interest::WRITABLE) => {
271 match result {
272 Ok(result) => result,
273 Err(x) => {
274 error!("Failed to examine ready state: {x}");
275 needs_reconnect = true;
276 watcher_tx.send_replace(ConnectionState::Reconnecting);
277 continue;
278 }
279 }
280 }
281 };
282
283 let mut read_blocked = !ready.is_readable();
284 let mut write_blocked = !ready.is_writable();
285
286 if ready.is_readable() {
287 match connection.try_read_frame() {
288 Ok(Some(frame)) if frame.is_empty() => {
291 trace!("Client received heartbeat");
292 last_read_frame_time = Instant::now();
293 }
294
295 Ok(Some(frame)) => {
297 last_read_frame_time = Instant::now();
298 match UntypedResponse::from_slice(frame.as_item()) {
299 Ok(response) => {
300 if log_enabled!(Level::Trace) {
301 trace!(
302 "Client receiving (id:{} | origin: {}): {}",
303 response.id,
304 response.origin_id,
305 String::from_utf8_lossy(&response.payload).to_string()
306 );
307 }
308
309 let (id, origin_id) = if log_enabled!(Level::Trace) {
313 (response.id.to_string(), response.origin_id.to_string())
314 } else {
315 (String::new(), String::new())
316 };
317
318 if post_office
321 .deliver_untyped_response(response.into_owned())
322 .await
323 {
324 trace!("Client delivered response {id} to {origin_id}");
325 } else {
326 trace!("Client dropped response {id} to {origin_id}");
327 }
328 }
329 Err(x) => {
330 error!("Invalid response: {x}");
331 }
332 }
333 }
334
335 Ok(None) => {
336 info!("Connection closed");
337 needs_reconnect = true;
338 watcher_tx.send_replace(ConnectionState::Reconnecting);
339 continue;
340 }
341 Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
342 Err(x) => {
343 error!("Failed to read next frame: {x}");
344 needs_reconnect = true;
345 watcher_tx.send_replace(ConnectionState::Reconnecting);
346 continue;
347 }
348 }
349 }
350
351 if ready.is_writable() {
352 if let Ok(request) = rx.try_recv() {
356 if log_enabled!(Level::Trace) {
357 trace!(
358 "Client sending {}",
359 String::from_utf8_lossy(&request.to_bytes()).to_string()
360 );
361 }
362 match connection.try_write_frame(request.to_bytes()) {
363 Ok(()) => (),
364 Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
365 Err(x) => {
366 error!("Failed to write frame: {x}");
367 needs_reconnect = true;
368 watcher_tx.send_replace(ConnectionState::Reconnecting);
369 continue;
370 }
371 }
372 } else {
373 match connection.try_flush() {
380 Ok(0) => write_blocked = true,
381 Ok(_) => (),
382 Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
383 Err(x) => {
384 error!("Failed to flush outgoing data: {x}");
385 needs_reconnect = true;
386 watcher_tx.send_replace(ConnectionState::Reconnecting);
387 continue;
388 }
389 }
390 }
391 }
392
393 if read_blocked && write_blocked {
395 tokio::time::sleep(SLEEP_DURATION).await;
396 }
397 }
398 });
399
400 let channel = UntypedChannel {
401 tx,
402 post_office: weak_post_office,
403 };
404
405 Self {
406 channel,
407 watcher: ConnectionWatcher(watcher_rx),
408 shutdown: Box::new(shutdown_tx),
409 shutdown_on_drop,
410 task: Some(task),
411 }
412 }
413}
414
415impl Deref for UntypedClient {
416 type Target = UntypedChannel;
417
418 fn deref(&self) -> &Self::Target {
419 &self.channel
420 }
421}
422
423impl DerefMut for UntypedClient {
424 fn deref_mut(&mut self) -> &mut Self::Target {
425 &mut self.channel
426 }
427}
428
429impl From<UntypedClient> for UntypedChannel {
430 fn from(client: UntypedClient) -> Self {
431 client.into_channel()
432 }
433}
434
435pub struct Client<T, U> {
437 channel: Channel<T, U>,
439
440 watcher: ConnectionWatcher,
442
443 shutdown: Box<dyn Shutdown>,
445
446 shutdown_on_drop: bool,
448
449 task: Option<JoinHandle<io::Result<()>>>,
451}
452
453impl<T, U> fmt::Debug for Client<T, U> {
454 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
455 f.debug_struct("Client")
456 .field("channel", &self.channel)
457 .field("shutdown", &"...")
458 .field("task", &self.task)
459 .field("shutdown_on_drop", &self.shutdown_on_drop)
460 .finish()
461 }
462}
463
464impl<T, U> Drop for Client<T, U> {
465 fn drop(&mut self) {
466 if self.shutdown_on_drop {
467 if let Some(task) = self.task.take() {
469 debug!("Shutdown on drop = true, so aborting client task");
470 task.abort();
471 }
472 }
473 }
474}
475
476impl<T, U> Client<T, U>
477where
478 T: Send + Sync + Serialize + 'static,
479 U: Send + Sync + DeserializeOwned + 'static,
480{
481 pub fn into_untyped_client(mut self) -> UntypedClient {
483 UntypedClient {
484 channel: self.clone_channel().into_untyped_channel(),
485 watcher: self.watcher.clone(),
486 shutdown: self.shutdown.clone(),
487 shutdown_on_drop: self.shutdown_on_drop,
488 task: self.task.take(),
489 }
490 }
491
492 pub fn spawn_inmemory(
501 transport: FramedTransport<InmemoryTransport>,
502 config: ClientConfig,
503 ) -> Self {
504 UntypedClient::spawn_inmemory(transport, config).into_typed_client()
505 }
506}
507
508impl Client<(), ()> {
509 pub fn build() -> ClientBuilder<(), ()> {
511 ClientBuilder::new()
512 }
513
514 pub fn tcp<T>(connector: impl Into<TcpConnector<T>>) -> ClientBuilder<(), TcpConnector<T>> {
516 ClientBuilder::new().connector(connector.into())
517 }
518
519 #[cfg(unix)]
521 pub fn unix_socket(
522 connector: impl Into<UnixSocketConnector>,
523 ) -> ClientBuilder<(), UnixSocketConnector> {
524 ClientBuilder::new().connector(connector.into())
525 }
526
527 #[cfg(windows)]
529 pub fn local_windows_pipe(
530 connector: impl Into<WindowsPipeConnector>,
531 ) -> ClientBuilder<(), WindowsPipeConnector> {
532 let mut connector = connector.into();
533 connector.local = true;
534 ClientBuilder::new().connector(connector)
535 }
536
537 #[cfg(windows)]
539 pub fn windows_pipe(
540 connector: impl Into<WindowsPipeConnector>,
541 ) -> ClientBuilder<(), WindowsPipeConnector> {
542 ClientBuilder::new().connector(connector.into())
543 }
544}
545
546impl<T, U> Client<T, U> {
547 pub fn into_channel(self) -> Channel<T, U> {
549 self.clone_channel()
550 }
551
552 pub fn clone_channel(&self) -> Channel<T, U> {
554 self.channel.clone()
555 }
556
557 pub async fn wait(mut self) -> io::Result<()> {
561 match self.task.take().unwrap().await {
562 Ok(x) => x,
563 Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)),
564 }
565 }
566
567 pub fn abort(&self) {
569 if let Some(task) = self.task.as_ref() {
570 task.abort();
571 }
572 }
573
574 pub fn clone_shutdown(&self) -> Box<dyn Shutdown> {
577 self.shutdown.clone()
578 }
579
580 pub async fn shutdown(&self) -> io::Result<()> {
582 self.shutdown.shutdown().await
583 }
584
585 pub fn will_shutdown_on_drop(&mut self) -> bool {
588 self.shutdown_on_drop
589 }
590
591 pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) {
594 self.shutdown_on_drop = shutdown_on_drop;
595 }
596
597 pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
599 self.watcher.clone()
600 }
601
602 pub fn on_connection_change<F>(&self, f: F) -> JoinHandle<()>
605 where
606 F: FnMut(ConnectionState) + Send + 'static,
607 {
608 self.watcher.on_change(f)
609 }
610
611 pub fn is_finished(&self) -> bool {
613 self.task.is_none() || self.task.as_ref().unwrap().is_finished()
614 }
615}
616
617impl<T, U> Deref for Client<T, U> {
618 type Target = Channel<T, U>;
619
620 fn deref(&self) -> &Self::Target {
621 &self.channel
622 }
623}
624
625impl<T, U> DerefMut for Client<T, U> {
626 fn deref_mut(&mut self) -> &mut Self::Target {
627 &mut self.channel
628 }
629}
630
631impl<T, U> From<Client<T, U>> for Channel<T, U> {
632 fn from(client: Client<T, U>) -> Self {
633 client.clone_channel()
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640 use crate::client::ClientConfig;
641 use crate::common::{Ready, Request, Response, TestTransport};
642
643 mod typed {
644 use test_log::test;
645
646 use super::*;
647 type TestClient = Client<u8, u8>;
648
649 fn spawn_test_client<T>(
650 connection: Connection<T>,
651 reconnect_strategy: ReconnectStrategy,
652 ) -> TestClient
653 where
654 T: Transport + 'static,
655 {
656 UntypedClient::spawn(
657 connection,
658 ClientConfig {
659 reconnect_strategy,
660 ..Default::default()
661 },
662 )
663 .into_typed_client()
664 }
665
666 #[inline]
668 fn new_test_transport() -> TestTransport {
669 TestTransport {
670 f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
671 f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
672 f_ready: Box::new(|_| Ok(Ready::EMPTY)),
673 f_reconnect: Box::new(|| Ok(())),
674 }
675 }
676
677 #[test(tokio::test)]
678 async fn should_write_queued_requests_as_outgoing_frames() {
679 let (client, mut server) = Connection::pair(100);
680
681 let mut client = spawn_test_client(client, ReconnectStrategy::Fail);
682 client.fire(Request::new(1u8)).await.unwrap();
683 client.fire(Request::new(2u8)).await.unwrap();
684 client.fire(Request::new(3u8)).await.unwrap();
685
686 assert_eq!(
687 server
688 .read_frame_as::<Request<u8>>()
689 .await
690 .unwrap()
691 .unwrap()
692 .payload,
693 1
694 );
695 assert_eq!(
696 server
697 .read_frame_as::<Request<u8>>()
698 .await
699 .unwrap()
700 .unwrap()
701 .payload,
702 2
703 );
704 assert_eq!(
705 server
706 .read_frame_as::<Request<u8>>()
707 .await
708 .unwrap()
709 .unwrap()
710 .payload,
711 3
712 );
713 }
714
715 #[test(tokio::test)]
716 async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() {
717 let (client, mut server) = Connection::pair(100);
718
719 tokio::spawn(async move {
721 let request = server
722 .read_frame_as::<Request<u8>>()
723 .await
724 .unwrap()
725 .unwrap();
726 server
727 .write_frame_for(&Response::new(request.id, 2u8))
728 .await
729 .unwrap();
730 });
731
732 let mut client = spawn_test_client(client, ReconnectStrategy::Fail);
733 assert_eq!(client.send(Request::new(1u8)).await.unwrap().payload, 2);
734 }
735
736 #[test(tokio::test)]
737 async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() {
738 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
739 spawn_test_client(
740 Connection::test_client({
741 let mut transport = new_test_transport();
742
743 transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into()));
744
745 transport.f_reconnect = Box::new(move || {
747 reconnect_tx.try_send(()).expect("reconnect tx blocked");
748 Ok(())
749 });
750
751 transport
752 }),
753 ReconnectStrategy::FixedInterval {
754 interval: Duration::from_millis(50),
755 max_retries: None,
756 timeout: None,
757 },
758 );
759
760 reconnect_rx.recv().await.expect("Reconnect did not occur");
761 }
762
763 #[test(tokio::test)]
764 async fn should_attempt_to_reconnect_if_connection_closed_by_server() {
765 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
766 spawn_test_client(
767 Connection::test_client({
768 let mut transport = new_test_transport();
769
770 transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
772
773 transport.f_try_read = Box::new(|_| Ok(0));
775
776 transport.f_reconnect = Box::new(move || {
778 reconnect_tx.try_send(()).expect("reconnect tx blocked");
779 Ok(())
780 });
781
782 transport
783 }),
784 ReconnectStrategy::FixedInterval {
785 interval: Duration::from_millis(50),
786 max_retries: None,
787 timeout: None,
788 },
789 );
790
791 reconnect_rx.recv().await.expect("Reconnect did not occur");
792 }
793
794 #[test(tokio::test)]
795 async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() {
796 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
797 spawn_test_client(
798 Connection::test_client({
799 let mut transport = new_test_transport();
800
801 transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
803
804 transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into()));
806
807 transport.f_reconnect = Box::new(move || {
809 reconnect_tx.try_send(()).expect("reconnect tx blocked");
810 Ok(())
811 });
812
813 transport
814 }),
815 ReconnectStrategy::FixedInterval {
816 interval: Duration::from_millis(50),
817 max_retries: None,
818 timeout: None,
819 },
820 );
821
822 reconnect_rx.recv().await.expect("Reconnect did not occur");
823 }
824
825 #[test(tokio::test)]
826 async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() {
827 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
828 let mut client = spawn_test_client(
829 Connection::test_client({
830 let mut transport = new_test_transport();
831
832 transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
834
835 transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into()));
837
838 transport.f_reconnect = Box::new(move || {
840 reconnect_tx.try_send(()).expect("reconnect tx blocked");
841 Ok(())
842 });
843
844 transport
845 }),
846 ReconnectStrategy::FixedInterval {
847 interval: Duration::from_millis(50),
848 max_retries: None,
849 timeout: None,
850 },
851 );
852
853 client
855 .fire(Request::new(123u8))
856 .await
857 .expect("Failed to queue request");
858
859 reconnect_rx.recv().await.expect("Reconnect did not occur");
860 }
861
862 #[test(tokio::test)]
863 async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() {
864 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
865 let mut client = spawn_test_client(
866 Connection::test_client({
867 let mut transport = new_test_transport();
868
869 transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
871
872 transport.f_try_write = Box::new(|buf| unsafe {
875 static mut CNT: u8 = 0;
876 CNT += 1;
877 if CNT == 1 {
878 Ok(buf.len() / 2)
879 } else if CNT == 2 {
880 Err(io::ErrorKind::WouldBlock.into())
881 } else {
882 Err(io::ErrorKind::Other.into())
883 }
884 });
885
886 transport.f_reconnect = Box::new(move || {
888 reconnect_tx.try_send(()).expect("reconnect tx blocked");
889 Ok(())
890 });
891
892 transport
893 }),
894 ReconnectStrategy::FixedInterval {
895 interval: Duration::from_millis(50),
896 max_retries: None,
897 timeout: None,
898 },
899 );
900
901 client
903 .fire(Request::new(123u8))
904 .await
905 .expect("Failed to queue request");
906
907 reconnect_rx.recv().await.expect("Reconnect did not occur");
908 }
909
910 #[test(tokio::test)]
911 async fn should_exit_if_reconnect_strategy_has_failed_to_connect() {
912 let (client, server) = Connection::pair(100);
913
914 let client = spawn_test_client(client, ReconnectStrategy::Fail);
917 assert!(!client.is_finished(), "Client unexpectedly died");
918 drop(server);
919 assert_eq!(
920 client.wait().await.unwrap_err().kind(),
921 io::ErrorKind::ConnectionAborted
922 );
923 }
924
925 #[test(tokio::test)]
926 async fn should_exit_if_shutdown_signal_detected() {
927 let (client, _server) = Connection::pair(100);
928
929 let client = spawn_test_client(client, ReconnectStrategy::Fail);
930 client.shutdown().await.unwrap();
931
932 client.wait().await.unwrap();
936 }
937
938 #[test(tokio::test)]
939 async fn should_not_exit_if_shutdown_channel_is_closed() {
940 let (client, mut server) = Connection::pair(100);
941
942 tokio::spawn(async move {
944 let request = server
945 .read_frame_as::<Request<u8>>()
946 .await
947 .unwrap()
948 .unwrap();
949 server
950 .write_frame_for(&Response::new(request.id, 2u8))
951 .await
952 .unwrap();
953 });
954
955 let mut channel = spawn_test_client(client, ReconnectStrategy::Fail).into_channel();
958 assert_eq!(channel.send(Request::new(1u8)).await.unwrap().payload, 2);
959 }
960 }
961
962 mod untyped {
963 use test_log::test;
964
965 use super::*;
966 type TestClient = UntypedClient;
967
968 #[inline]
970 fn new_test_transport() -> TestTransport {
971 TestTransport {
972 f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
973 f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
974 f_ready: Box::new(|_| Ok(Ready::EMPTY)),
975 f_reconnect: Box::new(|| Ok(())),
976 }
977 }
978
979 #[test(tokio::test)]
980 async fn should_write_queued_requests_as_outgoing_frames() {
981 let (client, mut server) = Connection::pair(100);
982
983 let mut client = TestClient::spawn(client, Default::default());
984 client
985 .fire(Request::new(1u8).to_untyped_request().unwrap())
986 .await
987 .unwrap();
988 client
989 .fire(Request::new(2u8).to_untyped_request().unwrap())
990 .await
991 .unwrap();
992 client
993 .fire(Request::new(3u8).to_untyped_request().unwrap())
994 .await
995 .unwrap();
996
997 assert_eq!(
998 server
999 .read_frame_as::<Request<u8>>()
1000 .await
1001 .unwrap()
1002 .unwrap()
1003 .payload,
1004 1
1005 );
1006 assert_eq!(
1007 server
1008 .read_frame_as::<Request<u8>>()
1009 .await
1010 .unwrap()
1011 .unwrap()
1012 .payload,
1013 2
1014 );
1015 assert_eq!(
1016 server
1017 .read_frame_as::<Request<u8>>()
1018 .await
1019 .unwrap()
1020 .unwrap()
1021 .payload,
1022 3
1023 );
1024 }
1025
1026 #[test(tokio::test)]
1027 async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() {
1028 let (client, mut server) = Connection::pair(100);
1029
1030 tokio::spawn(async move {
1032 let request = server
1033 .read_frame_as::<Request<u8>>()
1034 .await
1035 .unwrap()
1036 .unwrap();
1037 server
1038 .write_frame_for(&Response::new(request.id, 2u8))
1039 .await
1040 .unwrap();
1041 });
1042
1043 let mut client = TestClient::spawn(client, Default::default());
1044 assert_eq!(
1045 client
1046 .send(Request::new(1u8).to_untyped_request().unwrap())
1047 .await
1048 .unwrap()
1049 .to_typed_response::<u8>()
1050 .unwrap()
1051 .payload,
1052 2
1053 );
1054 }
1055
1056 #[test(tokio::test)]
1057 async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() {
1058 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1059 TestClient::spawn(
1060 Connection::test_client({
1061 let mut transport = new_test_transport();
1062
1063 transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into()));
1064
1065 transport.f_reconnect = Box::new(move || {
1067 reconnect_tx.try_send(()).expect("reconnect tx blocked");
1068 Ok(())
1069 });
1070
1071 transport
1072 }),
1073 ClientConfig {
1074 reconnect_strategy: ReconnectStrategy::FixedInterval {
1075 interval: Duration::from_millis(50),
1076 max_retries: None,
1077 timeout: None,
1078 },
1079 ..Default::default()
1080 },
1081 );
1082
1083 reconnect_rx.recv().await.expect("Reconnect did not occur");
1084 }
1085
1086 #[test(tokio::test)]
1087 async fn should_attempt_to_reconnect_if_connection_closed_by_server() {
1088 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1089 TestClient::spawn(
1090 Connection::test_client({
1091 let mut transport = new_test_transport();
1092
1093 transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
1095
1096 transport.f_try_read = Box::new(|_| Ok(0));
1098
1099 transport.f_reconnect = Box::new(move || {
1101 reconnect_tx.try_send(()).expect("reconnect tx blocked");
1102 Ok(())
1103 });
1104
1105 transport
1106 }),
1107 ClientConfig {
1108 reconnect_strategy: ReconnectStrategy::FixedInterval {
1109 interval: Duration::from_millis(50),
1110 max_retries: None,
1111 timeout: None,
1112 },
1113 ..Default::default()
1114 },
1115 );
1116
1117 reconnect_rx.recv().await.expect("Reconnect did not occur");
1118 }
1119
1120 #[test(tokio::test)]
1121 async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() {
1122 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1123 TestClient::spawn(
1124 Connection::test_client({
1125 let mut transport = new_test_transport();
1126
1127 transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
1129
1130 transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into()));
1132
1133 transport.f_reconnect = Box::new(move || {
1135 reconnect_tx.try_send(()).expect("reconnect tx blocked");
1136 Ok(())
1137 });
1138
1139 transport
1140 }),
1141 ClientConfig {
1142 reconnect_strategy: ReconnectStrategy::FixedInterval {
1143 interval: Duration::from_millis(50),
1144 max_retries: None,
1145 timeout: None,
1146 },
1147 ..Default::default()
1148 },
1149 );
1150
1151 reconnect_rx.recv().await.expect("Reconnect did not occur");
1152 }
1153
1154 #[test(tokio::test)]
1155 async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() {
1156 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1157 let mut client = TestClient::spawn(
1158 Connection::test_client({
1159 let mut transport = new_test_transport();
1160
1161 transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
1163
1164 transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into()));
1166
1167 transport.f_reconnect = Box::new(move || {
1169 reconnect_tx.try_send(()).expect("reconnect tx blocked");
1170 Ok(())
1171 });
1172
1173 transport
1174 }),
1175 ClientConfig {
1176 reconnect_strategy: ReconnectStrategy::FixedInterval {
1177 interval: Duration::from_millis(50),
1178 max_retries: None,
1179 timeout: None,
1180 },
1181 ..Default::default()
1182 },
1183 );
1184
1185 client
1187 .fire(Request::new(123u8).to_untyped_request().unwrap())
1188 .await
1189 .expect("Failed to queue request");
1190
1191 reconnect_rx.recv().await.expect("Reconnect did not occur");
1192 }
1193
1194 #[test(tokio::test)]
1195 async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() {
1196 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1197 let mut client = TestClient::spawn(
1198 Connection::test_client({
1199 let mut transport = new_test_transport();
1200
1201 transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
1203
1204 transport.f_try_write = Box::new(|buf| unsafe {
1207 static mut CNT: u8 = 0;
1208 CNT += 1;
1209 if CNT == 1 {
1210 Ok(buf.len() / 2)
1211 } else if CNT == 2 {
1212 Err(io::ErrorKind::WouldBlock.into())
1213 } else {
1214 Err(io::ErrorKind::Other.into())
1215 }
1216 });
1217
1218 transport.f_reconnect = Box::new(move || {
1220 reconnect_tx.try_send(()).expect("reconnect tx blocked");
1221 Ok(())
1222 });
1223
1224 transport
1225 }),
1226 ClientConfig {
1227 reconnect_strategy: ReconnectStrategy::FixedInterval {
1228 interval: Duration::from_millis(50),
1229 max_retries: None,
1230 timeout: None,
1231 },
1232 ..Default::default()
1233 },
1234 );
1235
1236 client
1238 .fire(Request::new(123u8).to_untyped_request().unwrap())
1239 .await
1240 .expect("Failed to queue request");
1241
1242 reconnect_rx.recv().await.expect("Reconnect did not occur");
1243 }
1244
1245 #[test(tokio::test)]
1246 async fn should_exit_if_reconnect_strategy_has_failed_to_connect() {
1247 let (client, server) = Connection::pair(100);
1248
1249 let client = TestClient::spawn(client, Default::default());
1252 assert!(!client.is_finished(), "Client unexpectedly died");
1253 drop(server);
1254 assert_eq!(
1255 client.wait().await.unwrap_err().kind(),
1256 io::ErrorKind::ConnectionAborted
1257 );
1258 }
1259
1260 #[test(tokio::test)]
1261 async fn should_exit_if_shutdown_signal_detected() {
1262 let (client, _server) = Connection::pair(100);
1263
1264 let client = TestClient::spawn(client, Default::default());
1265 client.shutdown().await.unwrap();
1266
1267 client.wait().await.unwrap();
1271 }
1272
1273 #[test(tokio::test)]
1274 async fn should_not_exit_if_shutdown_channel_is_closed() {
1275 let (client, mut server) = Connection::pair(100);
1276
1277 tokio::spawn(async move {
1279 let request = server
1280 .read_frame_as::<Request<u8>>()
1281 .await
1282 .unwrap()
1283 .unwrap();
1284 server
1285 .write_frame_for(&Response::new(request.id, 2u8))
1286 .await
1287 .unwrap();
1288 });
1289
1290 let mut channel = TestClient::spawn(client, Default::default()).into_channel();
1293 assert_eq!(
1294 channel
1295 .send(Request::new(1u8).to_untyped_request().unwrap())
1296 .await
1297 .unwrap()
1298 .to_typed_response::<u8>()
1299 .unwrap()
1300 .payload,
1301 2
1302 );
1303 }
1304
1305 #[test(tokio::test)]
1306 async fn should_attempt_to_reconnect_if_no_activity_from_server_within_silence_duration() {
1307 let (client, _) = Connection::pair(100);
1308
1309 let client = TestClient::spawn(
1312 client,
1313 ClientConfig {
1314 silence_duration: Duration::from_millis(100),
1315 reconnect_strategy: ReconnectStrategy::FixedInterval {
1316 interval: Duration::from_millis(50),
1317 max_retries: Some(3),
1318 timeout: None,
1319 },
1320 ..Default::default()
1321 },
1322 );
1323
1324 let (tx, mut rx) = mpsc::unbounded_channel();
1325 client.on_connection_change(move |state| tx.send(state).unwrap());
1326 assert_eq!(rx.recv().await, Some(ConnectionState::Reconnecting));
1327 assert_eq!(rx.recv().await, Some(ConnectionState::Disconnected));
1328 assert_eq!(rx.recv().await, None);
1329 }
1330 }
1331}