1use std::ops::{Deref, DerefMut};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use std::{fmt, io};
5
6use log::*;
7use serde::de::DeserializeOwned;
8use serde::Serialize;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio::task::JoinHandle;
11
12use crate::common::{
13    Connection, FramedTransport, HeapSecretKey, InmemoryTransport, Interest, Reconnectable,
14    Transport, UntypedRequest, UntypedResponse,
15};
16
17mod builder;
18pub use builder::*;
19
20mod channel;
21pub use channel::*;
22
23mod config;
24pub use config::*;
25
26mod reconnect;
27pub use reconnect::*;
28
29mod shutdown;
30pub use shutdown::*;
31
32const SLEEP_DURATION: Duration = Duration::from_millis(1);
34
35pub struct UntypedClient {
41    channel: UntypedChannel,
43
44    watcher: ConnectionWatcher,
46
47    shutdown: Box<dyn Shutdown>,
49
50    shutdown_on_drop: bool,
52
53    task: Option<JoinHandle<io::Result<()>>>,
55}
56
57impl fmt::Debug for UntypedClient {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.debug_struct("UntypedClient")
60            .field("channel", &self.channel)
61            .field("shutdown", &"...")
62            .field("task", &self.task)
63            .field("shutdown_on_drop", &self.shutdown_on_drop)
64            .finish()
65    }
66}
67
68impl Drop for UntypedClient {
69    fn drop(&mut self) {
70        if self.shutdown_on_drop {
71            if let Some(task) = self.task.take() {
73                debug!("Shutdown on drop = true, so aborting client task");
74                task.abort();
75            }
76        }
77    }
78}
79
80impl UntypedClient {
81    pub fn into_typed_client<T, U>(mut self) -> Client<T, U> {
83        Client {
84            channel: self.clone_channel().into_typed_channel(),
85            watcher: self.watcher.clone(),
86            shutdown: self.shutdown.clone(),
87            shutdown_on_drop: self.shutdown_on_drop,
88            task: self.task.take(),
89        }
90    }
91
92    pub fn into_channel(self) -> UntypedChannel {
94        self.clone_channel()
95    }
96
97    pub fn clone_channel(&self) -> UntypedChannel {
99        self.channel.clone()
100    }
101
102    pub async fn wait(mut self) -> io::Result<()> {
106        match self.task.take().unwrap().await {
107            Ok(x) => x,
108            Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)),
109        }
110    }
111
112    pub fn abort(&self) {
114        if let Some(task) = self.task.as_ref() {
115            task.abort();
116        }
117    }
118
119    pub fn clone_shutdown(&self) -> Box<dyn Shutdown> {
122        self.shutdown.clone()
123    }
124
125    pub async fn shutdown(&self) -> io::Result<()> {
127        self.shutdown.shutdown().await
128    }
129
130    pub fn will_shutdown_on_drop(&mut self) -> bool {
133        self.shutdown_on_drop
134    }
135
136    pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) {
139        self.shutdown_on_drop = shutdown_on_drop;
140    }
141
142    pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
144        self.watcher.clone()
145    }
146
147    pub fn on_connection_change<F>(&self, f: F) -> JoinHandle<()>
150    where
151        F: FnMut(ConnectionState) + Send + 'static,
152    {
153        self.watcher.on_change(f)
154    }
155
156    pub fn is_finished(&self) -> bool {
158        self.task.is_none() || self.task.as_ref().unwrap().is_finished()
159    }
160
161    pub fn spawn_inmemory(
170        transport: FramedTransport<InmemoryTransport>,
171        config: ClientConfig,
172    ) -> Self {
173        let connection = Connection::Client {
174            id: rand::random(),
175            reauth_otp: HeapSecretKey::generate(32).unwrap(),
176            transport,
177        };
178        Self::spawn(connection, config)
179    }
180
181    pub(crate) fn spawn<V>(mut connection: Connection<V>, config: ClientConfig) -> Self
183    where
184        V: Transport + 'static,
185    {
186        let post_office = Arc::new(PostOffice::default());
187        let weak_post_office = Arc::downgrade(&post_office);
188        let (tx, mut rx) = mpsc::channel::<UntypedRequest<'static>>(1);
189        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<oneshot::Sender<io::Result<()>>>(1);
190
191        connection.clear();
193
194        let ClientConfig {
195            mut reconnect_strategy,
196            shutdown_on_drop,
197            silence_duration,
198        } = config;
199
200        let shutdown_tx_2 = shutdown_tx.clone();
203        let (watcher_tx, watcher_rx) = watch::channel(ConnectionState::Connected);
204        let task = tokio::spawn(async move {
205            let mut needs_reconnect = false;
206            let mut last_read_frame_time = Instant::now();
207
208            let _shutdown_tx = shutdown_tx_2;
214
215            loop {
216                if needs_reconnect {
218                    info!("Client encountered issue, attempting to reconnect");
219                    debug!("Using strategy {reconnect_strategy:?}");
220                    match reconnect_strategy.reconnect(&mut connection).await {
221                        Ok(()) => {
222                            info!("Client successfully reconnected!");
223                            needs_reconnect = false;
224                            last_read_frame_time = Instant::now();
225                            watcher_tx.send_replace(ConnectionState::Connected);
226                        }
227                        Err(x) => {
228                            error!("Unable to re-establish connection: {x}");
229                            watcher_tx.send_replace(ConnectionState::Disconnected);
230                            break Err(x);
231                        }
232                    }
233                }
234
235                macro_rules! silence_needs_reconnect {
236                    () => {{
237                        info!(
238                            "Client exceeded {}s without server activity, so attempting to reconnect",
239                            silence_duration.as_secs_f32(),
240                        );
241                        needs_reconnect = true;
242                        watcher_tx.send_replace(ConnectionState::Reconnecting);
243                        continue;
244                    }};
245                }
246
247                let silence_time_remaining = silence_duration
248                    .checked_sub(last_read_frame_time.elapsed())
249                    .unwrap_or_default();
250
251                if silence_time_remaining.as_millis() == 0 {
255                    silence_needs_reconnect!();
256                }
257
258                let ready = tokio::select! {
259                    cb = shutdown_rx.recv() => {
261                        info!("Client got shutdown signal, so exiting event loop");
262                        let cb = cb.expect("Impossible: shutdown channel closed!");
263                        let _ = cb.send(Ok(()));
264                        watcher_tx.send_replace(ConnectionState::Disconnected);
265                        break Ok(());
266                    }
267                    _ = tokio::time::sleep(silence_time_remaining) => {
268                        silence_needs_reconnect!();
269                    }
270                    result = connection.ready(Interest::READABLE | Interest::WRITABLE) => {
271                        match result {
272                            Ok(result) => result,
273                            Err(x) => {
274                                error!("Failed to examine ready state: {x}");
275                                needs_reconnect = true;
276                                watcher_tx.send_replace(ConnectionState::Reconnecting);
277                                continue;
278                            }
279                        }
280                    }
281                };
282
283                let mut read_blocked = !ready.is_readable();
284                let mut write_blocked = !ready.is_writable();
285
286                if ready.is_readable() {
287                    match connection.try_read_frame() {
288                        Ok(Some(frame)) if frame.is_empty() => {
291                            trace!("Client received heartbeat");
292                            last_read_frame_time = Instant::now();
293                        }
294
295                        Ok(Some(frame)) => {
297                            last_read_frame_time = Instant::now();
298                            match UntypedResponse::from_slice(frame.as_item()) {
299                                Ok(response) => {
300                                    if log_enabled!(Level::Trace) {
301                                        trace!(
302                                            "Client receiving (id:{} | origin: {}): {}",
303                                            response.id,
304                                            response.origin_id,
305                                            String::from_utf8_lossy(&response.payload).to_string()
306                                        );
307                                    }
308
309                                    let (id, origin_id) = if log_enabled!(Level::Trace) {
313                                        (response.id.to_string(), response.origin_id.to_string())
314                                    } else {
315                                        (String::new(), String::new())
316                                    };
317
318                                    if post_office
321                                        .deliver_untyped_response(response.into_owned())
322                                        .await
323                                    {
324                                        trace!("Client delivered response {id} to {origin_id}");
325                                    } else {
326                                        trace!("Client dropped response {id} to {origin_id}");
327                                    }
328                                }
329                                Err(x) => {
330                                    error!("Invalid response: {x}");
331                                }
332                            }
333                        }
334
335                        Ok(None) => {
336                            info!("Connection closed");
337                            needs_reconnect = true;
338                            watcher_tx.send_replace(ConnectionState::Reconnecting);
339                            continue;
340                        }
341                        Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
342                        Err(x) => {
343                            error!("Failed to read next frame: {x}");
344                            needs_reconnect = true;
345                            watcher_tx.send_replace(ConnectionState::Reconnecting);
346                            continue;
347                        }
348                    }
349                }
350
351                if ready.is_writable() {
352                    if let Ok(request) = rx.try_recv() {
356                        if log_enabled!(Level::Trace) {
357                            trace!(
358                                "Client sending {}",
359                                String::from_utf8_lossy(&request.to_bytes()).to_string()
360                            );
361                        }
362                        match connection.try_write_frame(request.to_bytes()) {
363                            Ok(()) => (),
364                            Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
365                            Err(x) => {
366                                error!("Failed to write frame: {x}");
367                                needs_reconnect = true;
368                                watcher_tx.send_replace(ConnectionState::Reconnecting);
369                                continue;
370                            }
371                        }
372                    } else {
373                        match connection.try_flush() {
380                            Ok(0) => write_blocked = true,
381                            Ok(_) => (),
382                            Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
383                            Err(x) => {
384                                error!("Failed to flush outgoing data: {x}");
385                                needs_reconnect = true;
386                                watcher_tx.send_replace(ConnectionState::Reconnecting);
387                                continue;
388                            }
389                        }
390                    }
391                }
392
393                if read_blocked && write_blocked {
395                    tokio::time::sleep(SLEEP_DURATION).await;
396                }
397            }
398        });
399
400        let channel = UntypedChannel {
401            tx,
402            post_office: weak_post_office,
403        };
404
405        Self {
406            channel,
407            watcher: ConnectionWatcher(watcher_rx),
408            shutdown: Box::new(shutdown_tx),
409            shutdown_on_drop,
410            task: Some(task),
411        }
412    }
413}
414
415impl Deref for UntypedClient {
416    type Target = UntypedChannel;
417
418    fn deref(&self) -> &Self::Target {
419        &self.channel
420    }
421}
422
423impl DerefMut for UntypedClient {
424    fn deref_mut(&mut self) -> &mut Self::Target {
425        &mut self.channel
426    }
427}
428
429impl From<UntypedClient> for UntypedChannel {
430    fn from(client: UntypedClient) -> Self {
431        client.into_channel()
432    }
433}
434
435pub struct Client<T, U> {
437    channel: Channel<T, U>,
439
440    watcher: ConnectionWatcher,
442
443    shutdown: Box<dyn Shutdown>,
445
446    shutdown_on_drop: bool,
448
449    task: Option<JoinHandle<io::Result<()>>>,
451}
452
453impl<T, U> fmt::Debug for Client<T, U> {
454    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
455        f.debug_struct("Client")
456            .field("channel", &self.channel)
457            .field("shutdown", &"...")
458            .field("task", &self.task)
459            .field("shutdown_on_drop", &self.shutdown_on_drop)
460            .finish()
461    }
462}
463
464impl<T, U> Drop for Client<T, U> {
465    fn drop(&mut self) {
466        if self.shutdown_on_drop {
467            if let Some(task) = self.task.take() {
469                debug!("Shutdown on drop = true, so aborting client task");
470                task.abort();
471            }
472        }
473    }
474}
475
476impl<T, U> Client<T, U>
477where
478    T: Send + Sync + Serialize + 'static,
479    U: Send + Sync + DeserializeOwned + 'static,
480{
481    pub fn into_untyped_client(mut self) -> UntypedClient {
483        UntypedClient {
484            channel: self.clone_channel().into_untyped_channel(),
485            watcher: self.watcher.clone(),
486            shutdown: self.shutdown.clone(),
487            shutdown_on_drop: self.shutdown_on_drop,
488            task: self.task.take(),
489        }
490    }
491
492    pub fn spawn_inmemory(
501        transport: FramedTransport<InmemoryTransport>,
502        config: ClientConfig,
503    ) -> Self {
504        UntypedClient::spawn_inmemory(transport, config).into_typed_client()
505    }
506}
507
508impl Client<(), ()> {
509    pub fn build() -> ClientBuilder<(), ()> {
511        ClientBuilder::new()
512    }
513
514    pub fn tcp<T>(connector: impl Into<TcpConnector<T>>) -> ClientBuilder<(), TcpConnector<T>> {
516        ClientBuilder::new().connector(connector.into())
517    }
518
519    #[cfg(unix)]
521    pub fn unix_socket(
522        connector: impl Into<UnixSocketConnector>,
523    ) -> ClientBuilder<(), UnixSocketConnector> {
524        ClientBuilder::new().connector(connector.into())
525    }
526
527    #[cfg(windows)]
529    pub fn local_windows_pipe(
530        connector: impl Into<WindowsPipeConnector>,
531    ) -> ClientBuilder<(), WindowsPipeConnector> {
532        let mut connector = connector.into();
533        connector.local = true;
534        ClientBuilder::new().connector(connector)
535    }
536
537    #[cfg(windows)]
539    pub fn windows_pipe(
540        connector: impl Into<WindowsPipeConnector>,
541    ) -> ClientBuilder<(), WindowsPipeConnector> {
542        ClientBuilder::new().connector(connector.into())
543    }
544}
545
546impl<T, U> Client<T, U> {
547    pub fn into_channel(self) -> Channel<T, U> {
549        self.clone_channel()
550    }
551
552    pub fn clone_channel(&self) -> Channel<T, U> {
554        self.channel.clone()
555    }
556
557    pub async fn wait(mut self) -> io::Result<()> {
561        match self.task.take().unwrap().await {
562            Ok(x) => x,
563            Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)),
564        }
565    }
566
567    pub fn abort(&self) {
569        if let Some(task) = self.task.as_ref() {
570            task.abort();
571        }
572    }
573
574    pub fn clone_shutdown(&self) -> Box<dyn Shutdown> {
577        self.shutdown.clone()
578    }
579
580    pub async fn shutdown(&self) -> io::Result<()> {
582        self.shutdown.shutdown().await
583    }
584
585    pub fn will_shutdown_on_drop(&mut self) -> bool {
588        self.shutdown_on_drop
589    }
590
591    pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) {
594        self.shutdown_on_drop = shutdown_on_drop;
595    }
596
597    pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
599        self.watcher.clone()
600    }
601
602    pub fn on_connection_change<F>(&self, f: F) -> JoinHandle<()>
605    where
606        F: FnMut(ConnectionState) + Send + 'static,
607    {
608        self.watcher.on_change(f)
609    }
610
611    pub fn is_finished(&self) -> bool {
613        self.task.is_none() || self.task.as_ref().unwrap().is_finished()
614    }
615}
616
617impl<T, U> Deref for Client<T, U> {
618    type Target = Channel<T, U>;
619
620    fn deref(&self) -> &Self::Target {
621        &self.channel
622    }
623}
624
625impl<T, U> DerefMut for Client<T, U> {
626    fn deref_mut(&mut self) -> &mut Self::Target {
627        &mut self.channel
628    }
629}
630
631impl<T, U> From<Client<T, U>> for Channel<T, U> {
632    fn from(client: Client<T, U>) -> Self {
633        client.clone_channel()
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use crate::client::ClientConfig;
641    use crate::common::{Ready, Request, Response, TestTransport};
642
643    mod typed {
644        use test_log::test;
645
646        use super::*;
647        type TestClient = Client<u8, u8>;
648
649        fn spawn_test_client<T>(
650            connection: Connection<T>,
651            reconnect_strategy: ReconnectStrategy,
652        ) -> TestClient
653        where
654            T: Transport + 'static,
655        {
656            UntypedClient::spawn(
657                connection,
658                ClientConfig {
659                    reconnect_strategy,
660                    ..Default::default()
661                },
662            )
663            .into_typed_client()
664        }
665
666        #[inline]
668        fn new_test_transport() -> TestTransport {
669            TestTransport {
670                f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
671                f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
672                f_ready: Box::new(|_| Ok(Ready::EMPTY)),
673                f_reconnect: Box::new(|| Ok(())),
674            }
675        }
676
677        #[test(tokio::test)]
678        async fn should_write_queued_requests_as_outgoing_frames() {
679            let (client, mut server) = Connection::pair(100);
680
681            let mut client = spawn_test_client(client, ReconnectStrategy::Fail);
682            client.fire(Request::new(1u8)).await.unwrap();
683            client.fire(Request::new(2u8)).await.unwrap();
684            client.fire(Request::new(3u8)).await.unwrap();
685
686            assert_eq!(
687                server
688                    .read_frame_as::<Request<u8>>()
689                    .await
690                    .unwrap()
691                    .unwrap()
692                    .payload,
693                1
694            );
695            assert_eq!(
696                server
697                    .read_frame_as::<Request<u8>>()
698                    .await
699                    .unwrap()
700                    .unwrap()
701                    .payload,
702                2
703            );
704            assert_eq!(
705                server
706                    .read_frame_as::<Request<u8>>()
707                    .await
708                    .unwrap()
709                    .unwrap()
710                    .payload,
711                3
712            );
713        }
714
715        #[test(tokio::test)]
716        async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() {
717            let (client, mut server) = Connection::pair(100);
718
719            tokio::spawn(async move {
721                let request = server
722                    .read_frame_as::<Request<u8>>()
723                    .await
724                    .unwrap()
725                    .unwrap();
726                server
727                    .write_frame_for(&Response::new(request.id, 2u8))
728                    .await
729                    .unwrap();
730            });
731
732            let mut client = spawn_test_client(client, ReconnectStrategy::Fail);
733            assert_eq!(client.send(Request::new(1u8)).await.unwrap().payload, 2);
734        }
735
736        #[test(tokio::test)]
737        async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() {
738            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
739            spawn_test_client(
740                Connection::test_client({
741                    let mut transport = new_test_transport();
742
743                    transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into()));
744
745                    transport.f_reconnect = Box::new(move || {
747                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
748                        Ok(())
749                    });
750
751                    transport
752                }),
753                ReconnectStrategy::FixedInterval {
754                    interval: Duration::from_millis(50),
755                    max_retries: None,
756                    timeout: None,
757                },
758            );
759
760            reconnect_rx.recv().await.expect("Reconnect did not occur");
761        }
762
763        #[test(tokio::test)]
764        async fn should_attempt_to_reconnect_if_connection_closed_by_server() {
765            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
766            spawn_test_client(
767                Connection::test_client({
768                    let mut transport = new_test_transport();
769
770                    transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
772
773                    transport.f_try_read = Box::new(|_| Ok(0));
775
776                    transport.f_reconnect = Box::new(move || {
778                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
779                        Ok(())
780                    });
781
782                    transport
783                }),
784                ReconnectStrategy::FixedInterval {
785                    interval: Duration::from_millis(50),
786                    max_retries: None,
787                    timeout: None,
788                },
789            );
790
791            reconnect_rx.recv().await.expect("Reconnect did not occur");
792        }
793
794        #[test(tokio::test)]
795        async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() {
796            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
797            spawn_test_client(
798                Connection::test_client({
799                    let mut transport = new_test_transport();
800
801                    transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
803
804                    transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into()));
806
807                    transport.f_reconnect = Box::new(move || {
809                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
810                        Ok(())
811                    });
812
813                    transport
814                }),
815                ReconnectStrategy::FixedInterval {
816                    interval: Duration::from_millis(50),
817                    max_retries: None,
818                    timeout: None,
819                },
820            );
821
822            reconnect_rx.recv().await.expect("Reconnect did not occur");
823        }
824
825        #[test(tokio::test)]
826        async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() {
827            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
828            let mut client = spawn_test_client(
829                Connection::test_client({
830                    let mut transport = new_test_transport();
831
832                    transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
834
835                    transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into()));
837
838                    transport.f_reconnect = Box::new(move || {
840                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
841                        Ok(())
842                    });
843
844                    transport
845                }),
846                ReconnectStrategy::FixedInterval {
847                    interval: Duration::from_millis(50),
848                    max_retries: None,
849                    timeout: None,
850                },
851            );
852
853            client
855                .fire(Request::new(123u8))
856                .await
857                .expect("Failed to queue request");
858
859            reconnect_rx.recv().await.expect("Reconnect did not occur");
860        }
861
862        #[test(tokio::test)]
863        async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() {
864            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
865            let mut client = spawn_test_client(
866                Connection::test_client({
867                    let mut transport = new_test_transport();
868
869                    transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
871
872                    transport.f_try_write = Box::new(|buf| unsafe {
875                        static mut CNT: u8 = 0;
876                        CNT += 1;
877                        if CNT == 1 {
878                            Ok(buf.len() / 2)
879                        } else if CNT == 2 {
880                            Err(io::ErrorKind::WouldBlock.into())
881                        } else {
882                            Err(io::ErrorKind::Other.into())
883                        }
884                    });
885
886                    transport.f_reconnect = Box::new(move || {
888                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
889                        Ok(())
890                    });
891
892                    transport
893                }),
894                ReconnectStrategy::FixedInterval {
895                    interval: Duration::from_millis(50),
896                    max_retries: None,
897                    timeout: None,
898                },
899            );
900
901            client
903                .fire(Request::new(123u8))
904                .await
905                .expect("Failed to queue request");
906
907            reconnect_rx.recv().await.expect("Reconnect did not occur");
908        }
909
910        #[test(tokio::test)]
911        async fn should_exit_if_reconnect_strategy_has_failed_to_connect() {
912            let (client, server) = Connection::pair(100);
913
914            let client = spawn_test_client(client, ReconnectStrategy::Fail);
917            assert!(!client.is_finished(), "Client unexpectedly died");
918            drop(server);
919            assert_eq!(
920                client.wait().await.unwrap_err().kind(),
921                io::ErrorKind::ConnectionAborted
922            );
923        }
924
925        #[test(tokio::test)]
926        async fn should_exit_if_shutdown_signal_detected() {
927            let (client, _server) = Connection::pair(100);
928
929            let client = spawn_test_client(client, ReconnectStrategy::Fail);
930            client.shutdown().await.unwrap();
931
932            client.wait().await.unwrap();
936        }
937
938        #[test(tokio::test)]
939        async fn should_not_exit_if_shutdown_channel_is_closed() {
940            let (client, mut server) = Connection::pair(100);
941
942            tokio::spawn(async move {
944                let request = server
945                    .read_frame_as::<Request<u8>>()
946                    .await
947                    .unwrap()
948                    .unwrap();
949                server
950                    .write_frame_for(&Response::new(request.id, 2u8))
951                    .await
952                    .unwrap();
953            });
954
955            let mut channel = spawn_test_client(client, ReconnectStrategy::Fail).into_channel();
958            assert_eq!(channel.send(Request::new(1u8)).await.unwrap().payload, 2);
959        }
960    }
961
962    mod untyped {
963        use test_log::test;
964
965        use super::*;
966        type TestClient = UntypedClient;
967
968        #[inline]
970        fn new_test_transport() -> TestTransport {
971            TestTransport {
972                f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
973                f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
974                f_ready: Box::new(|_| Ok(Ready::EMPTY)),
975                f_reconnect: Box::new(|| Ok(())),
976            }
977        }
978
979        #[test(tokio::test)]
980        async fn should_write_queued_requests_as_outgoing_frames() {
981            let (client, mut server) = Connection::pair(100);
982
983            let mut client = TestClient::spawn(client, Default::default());
984            client
985                .fire(Request::new(1u8).to_untyped_request().unwrap())
986                .await
987                .unwrap();
988            client
989                .fire(Request::new(2u8).to_untyped_request().unwrap())
990                .await
991                .unwrap();
992            client
993                .fire(Request::new(3u8).to_untyped_request().unwrap())
994                .await
995                .unwrap();
996
997            assert_eq!(
998                server
999                    .read_frame_as::<Request<u8>>()
1000                    .await
1001                    .unwrap()
1002                    .unwrap()
1003                    .payload,
1004                1
1005            );
1006            assert_eq!(
1007                server
1008                    .read_frame_as::<Request<u8>>()
1009                    .await
1010                    .unwrap()
1011                    .unwrap()
1012                    .payload,
1013                2
1014            );
1015            assert_eq!(
1016                server
1017                    .read_frame_as::<Request<u8>>()
1018                    .await
1019                    .unwrap()
1020                    .unwrap()
1021                    .payload,
1022                3
1023            );
1024        }
1025
1026        #[test(tokio::test)]
1027        async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() {
1028            let (client, mut server) = Connection::pair(100);
1029
1030            tokio::spawn(async move {
1032                let request = server
1033                    .read_frame_as::<Request<u8>>()
1034                    .await
1035                    .unwrap()
1036                    .unwrap();
1037                server
1038                    .write_frame_for(&Response::new(request.id, 2u8))
1039                    .await
1040                    .unwrap();
1041            });
1042
1043            let mut client = TestClient::spawn(client, Default::default());
1044            assert_eq!(
1045                client
1046                    .send(Request::new(1u8).to_untyped_request().unwrap())
1047                    .await
1048                    .unwrap()
1049                    .to_typed_response::<u8>()
1050                    .unwrap()
1051                    .payload,
1052                2
1053            );
1054        }
1055
1056        #[test(tokio::test)]
1057        async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() {
1058            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1059            TestClient::spawn(
1060                Connection::test_client({
1061                    let mut transport = new_test_transport();
1062
1063                    transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into()));
1064
1065                    transport.f_reconnect = Box::new(move || {
1067                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1068                        Ok(())
1069                    });
1070
1071                    transport
1072                }),
1073                ClientConfig {
1074                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1075                        interval: Duration::from_millis(50),
1076                        max_retries: None,
1077                        timeout: None,
1078                    },
1079                    ..Default::default()
1080                },
1081            );
1082
1083            reconnect_rx.recv().await.expect("Reconnect did not occur");
1084        }
1085
1086        #[test(tokio::test)]
1087        async fn should_attempt_to_reconnect_if_connection_closed_by_server() {
1088            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1089            TestClient::spawn(
1090                Connection::test_client({
1091                    let mut transport = new_test_transport();
1092
1093                    transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
1095
1096                    transport.f_try_read = Box::new(|_| Ok(0));
1098
1099                    transport.f_reconnect = Box::new(move || {
1101                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1102                        Ok(())
1103                    });
1104
1105                    transport
1106                }),
1107                ClientConfig {
1108                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1109                        interval: Duration::from_millis(50),
1110                        max_retries: None,
1111                        timeout: None,
1112                    },
1113                    ..Default::default()
1114                },
1115            );
1116
1117            reconnect_rx.recv().await.expect("Reconnect did not occur");
1118        }
1119
1120        #[test(tokio::test)]
1121        async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() {
1122            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1123            TestClient::spawn(
1124                Connection::test_client({
1125                    let mut transport = new_test_transport();
1126
1127                    transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
1129
1130                    transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into()));
1132
1133                    transport.f_reconnect = Box::new(move || {
1135                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1136                        Ok(())
1137                    });
1138
1139                    transport
1140                }),
1141                ClientConfig {
1142                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1143                        interval: Duration::from_millis(50),
1144                        max_retries: None,
1145                        timeout: None,
1146                    },
1147                    ..Default::default()
1148                },
1149            );
1150
1151            reconnect_rx.recv().await.expect("Reconnect did not occur");
1152        }
1153
1154        #[test(tokio::test)]
1155        async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() {
1156            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1157            let mut client = TestClient::spawn(
1158                Connection::test_client({
1159                    let mut transport = new_test_transport();
1160
1161                    transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
1163
1164                    transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into()));
1166
1167                    transport.f_reconnect = Box::new(move || {
1169                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1170                        Ok(())
1171                    });
1172
1173                    transport
1174                }),
1175                ClientConfig {
1176                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1177                        interval: Duration::from_millis(50),
1178                        max_retries: None,
1179                        timeout: None,
1180                    },
1181                    ..Default::default()
1182                },
1183            );
1184
1185            client
1187                .fire(Request::new(123u8).to_untyped_request().unwrap())
1188                .await
1189                .expect("Failed to queue request");
1190
1191            reconnect_rx.recv().await.expect("Reconnect did not occur");
1192        }
1193
1194        #[test(tokio::test)]
1195        async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() {
1196            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1197            let mut client = TestClient::spawn(
1198                Connection::test_client({
1199                    let mut transport = new_test_transport();
1200
1201                    transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
1203
1204                    transport.f_try_write = Box::new(|buf| unsafe {
1207                        static mut CNT: u8 = 0;
1208                        CNT += 1;
1209                        if CNT == 1 {
1210                            Ok(buf.len() / 2)
1211                        } else if CNT == 2 {
1212                            Err(io::ErrorKind::WouldBlock.into())
1213                        } else {
1214                            Err(io::ErrorKind::Other.into())
1215                        }
1216                    });
1217
1218                    transport.f_reconnect = Box::new(move || {
1220                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1221                        Ok(())
1222                    });
1223
1224                    transport
1225                }),
1226                ClientConfig {
1227                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1228                        interval: Duration::from_millis(50),
1229                        max_retries: None,
1230                        timeout: None,
1231                    },
1232                    ..Default::default()
1233                },
1234            );
1235
1236            client
1238                .fire(Request::new(123u8).to_untyped_request().unwrap())
1239                .await
1240                .expect("Failed to queue request");
1241
1242            reconnect_rx.recv().await.expect("Reconnect did not occur");
1243        }
1244
1245        #[test(tokio::test)]
1246        async fn should_exit_if_reconnect_strategy_has_failed_to_connect() {
1247            let (client, server) = Connection::pair(100);
1248
1249            let client = TestClient::spawn(client, Default::default());
1252            assert!(!client.is_finished(), "Client unexpectedly died");
1253            drop(server);
1254            assert_eq!(
1255                client.wait().await.unwrap_err().kind(),
1256                io::ErrorKind::ConnectionAborted
1257            );
1258        }
1259
1260        #[test(tokio::test)]
1261        async fn should_exit_if_shutdown_signal_detected() {
1262            let (client, _server) = Connection::pair(100);
1263
1264            let client = TestClient::spawn(client, Default::default());
1265            client.shutdown().await.unwrap();
1266
1267            client.wait().await.unwrap();
1271        }
1272
1273        #[test(tokio::test)]
1274        async fn should_not_exit_if_shutdown_channel_is_closed() {
1275            let (client, mut server) = Connection::pair(100);
1276
1277            tokio::spawn(async move {
1279                let request = server
1280                    .read_frame_as::<Request<u8>>()
1281                    .await
1282                    .unwrap()
1283                    .unwrap();
1284                server
1285                    .write_frame_for(&Response::new(request.id, 2u8))
1286                    .await
1287                    .unwrap();
1288            });
1289
1290            let mut channel = TestClient::spawn(client, Default::default()).into_channel();
1293            assert_eq!(
1294                channel
1295                    .send(Request::new(1u8).to_untyped_request().unwrap())
1296                    .await
1297                    .unwrap()
1298                    .to_typed_response::<u8>()
1299                    .unwrap()
1300                    .payload,
1301                2
1302            );
1303        }
1304
1305        #[test(tokio::test)]
1306        async fn should_attempt_to_reconnect_if_no_activity_from_server_within_silence_duration() {
1307            let (client, _) = Connection::pair(100);
1308
1309            let client = TestClient::spawn(
1312                client,
1313                ClientConfig {
1314                    silence_duration: Duration::from_millis(100),
1315                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1316                        interval: Duration::from_millis(50),
1317                        max_retries: Some(3),
1318                        timeout: None,
1319                    },
1320                    ..Default::default()
1321                },
1322            );
1323
1324            let (tx, mut rx) = mpsc::unbounded_channel();
1325            client.on_connection_change(move |state| tx.send(state).unwrap());
1326            assert_eq!(rx.recv().await, Some(ConnectionState::Reconnecting));
1327            assert_eq!(rx.recv().await, Some(ConnectionState::Disconnected));
1328            assert_eq!(rx.recv().await, None);
1329        }
1330    }
1331}