distant_net/
client.rs

1use std::ops::{Deref, DerefMut};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use std::{fmt, io};
5
6use log::*;
7use serde::de::DeserializeOwned;
8use serde::Serialize;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio::task::JoinHandle;
11
12use crate::common::{
13    Connection, FramedTransport, HeapSecretKey, InmemoryTransport, Interest, Reconnectable,
14    Transport, UntypedRequest, UntypedResponse,
15};
16
17mod builder;
18pub use builder::*;
19
20mod channel;
21pub use channel::*;
22
23mod config;
24pub use config::*;
25
26mod reconnect;
27pub use reconnect::*;
28
29mod shutdown;
30pub use shutdown::*;
31
32/// Time to wait inbetween connection read/write when nothing was read or written on last pass
33const SLEEP_DURATION: Duration = Duration::from_millis(1);
34
35/// Represents a client that can be used to send requests & receive responses from a server.
36///
37/// ### Note
38///
39/// This variant does not validate the payload of requests or responses being sent and received.
40pub struct UntypedClient {
41    /// Used to send requests to a server.
42    channel: UntypedChannel,
43
44    /// Used to watch for changes in the connection state.
45    watcher: ConnectionWatcher,
46
47    /// Used to send shutdown request to inner task.
48    shutdown: Box<dyn Shutdown>,
49
50    /// Indicates whether the client task will be shutdown when the client is dropped.
51    shutdown_on_drop: bool,
52
53    /// Contains the task that is running to send requests and receive responses from a server.
54    task: Option<JoinHandle<io::Result<()>>>,
55}
56
57impl fmt::Debug for UntypedClient {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.debug_struct("UntypedClient")
60            .field("channel", &self.channel)
61            .field("shutdown", &"...")
62            .field("task", &self.task)
63            .field("shutdown_on_drop", &self.shutdown_on_drop)
64            .finish()
65    }
66}
67
68impl Drop for UntypedClient {
69    fn drop(&mut self) {
70        if self.shutdown_on_drop {
71            // TODO: Shutdown is an async operation, can we use it here?
72            if let Some(task) = self.task.take() {
73                debug!("Shutdown on drop = true, so aborting client task");
74                task.abort();
75            }
76        }
77    }
78}
79
80impl UntypedClient {
81    /// Consumes the client, returning a typed variant.
82    pub fn into_typed_client<T, U>(mut self) -> Client<T, U> {
83        Client {
84            channel: self.clone_channel().into_typed_channel(),
85            watcher: self.watcher.clone(),
86            shutdown: self.shutdown.clone(),
87            shutdown_on_drop: self.shutdown_on_drop,
88            task: self.task.take(),
89        }
90    }
91
92    /// Convert into underlying channel.
93    pub fn into_channel(self) -> UntypedChannel {
94        self.clone_channel()
95    }
96
97    /// Clones the underlying channel for requests and returns the cloned instance.
98    pub fn clone_channel(&self) -> UntypedChannel {
99        self.channel.clone()
100    }
101
102    /// Waits for the client to terminate, which resolves when the receiving end of the network
103    /// connection is closed (or the client is shutdown). Returns whether or not the client exited
104    /// successfully or due to an error.
105    pub async fn wait(mut self) -> io::Result<()> {
106        match self.task.take().unwrap().await {
107            Ok(x) => x,
108            Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)),
109        }
110    }
111
112    /// Abort the client's current connection by forcing its tasks to abort.
113    pub fn abort(&self) {
114        if let Some(task) = self.task.as_ref() {
115            task.abort();
116        }
117    }
118
119    /// Clones the underlying shutdown signaler for the client. This enables you to wait on the
120    /// client while still having the option to shut it down from somewhere else.
121    pub fn clone_shutdown(&self) -> Box<dyn Shutdown> {
122        self.shutdown.clone()
123    }
124
125    /// Signal for the client to shutdown its connection cleanly.
126    pub async fn shutdown(&self) -> io::Result<()> {
127        self.shutdown.shutdown().await
128    }
129
130    /// Returns whether the client should fully shutdown once it is dropped. If true, this will
131    /// result in all channels tied to the client no longer functioning once the client is dropped.
132    pub fn will_shutdown_on_drop(&mut self) -> bool {
133        self.shutdown_on_drop
134    }
135
136    /// Sets whether the client should fully shutdown once it is dropped. If true, this will result
137    /// in all channels tied to the client no longer functioning once the client is dropped.
138    pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) {
139        self.shutdown_on_drop = shutdown_on_drop;
140    }
141
142    /// Clones the underlying [`ConnectionStateWatcher`] for the client.
143    pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
144        self.watcher.clone()
145    }
146
147    /// Spawns a new task that continually monitors for connection changes and invokes the function
148    /// `f` whenever a new change is detected.
149    pub fn on_connection_change<F>(&self, f: F) -> JoinHandle<()>
150    where
151        F: FnMut(ConnectionState) + Send + 'static,
152    {
153        self.watcher.on_change(f)
154    }
155
156    /// Returns true if client's underlying event processing has finished/terminated.
157    pub fn is_finished(&self) -> bool {
158        self.task.is_none() || self.task.as_ref().unwrap().is_finished()
159    }
160
161    /// Spawns a client using the provided [`FramedTransport`] of [`InmemoryTransport`] and a
162    /// specific [`ReconnectStrategy`].
163    ///
164    /// ### Note
165    ///
166    /// This will NOT perform any handshakes or authentication procedures nor will it replay any
167    /// missing frames. This is to be used when establishing a [`Client`] to be run internally
168    /// within a program.
169    pub fn spawn_inmemory(
170        transport: FramedTransport<InmemoryTransport>,
171        config: ClientConfig,
172    ) -> Self {
173        let connection = Connection::Client {
174            id: rand::random(),
175            reauth_otp: HeapSecretKey::generate(32).unwrap(),
176            transport,
177        };
178        Self::spawn(connection, config)
179    }
180
181    /// Spawns a client using the provided [`Connection`].
182    pub(crate) fn spawn<V>(mut connection: Connection<V>, config: ClientConfig) -> Self
183    where
184        V: Transport + 'static,
185    {
186        let post_office = Arc::new(PostOffice::default());
187        let weak_post_office = Arc::downgrade(&post_office);
188        let (tx, mut rx) = mpsc::channel::<UntypedRequest<'static>>(1);
189        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<oneshot::Sender<io::Result<()>>>(1);
190
191        // Ensure that our transport starts off clean (nothing in buffers or backup)
192        connection.clear();
193
194        let ClientConfig {
195            mut reconnect_strategy,
196            shutdown_on_drop,
197            silence_duration,
198        } = config;
199
200        // Start a task that continually checks for responses and delivers them using the
201        // post office
202        let shutdown_tx_2 = shutdown_tx.clone();
203        let (watcher_tx, watcher_rx) = watch::channel(ConnectionState::Connected);
204        let task = tokio::spawn(async move {
205            let mut needs_reconnect = false;
206            let mut last_read_frame_time = Instant::now();
207
208            // NOTE: We hold onto a copy of the shutdown sender, even though we will never use it,
209            //       to prevent the channel from being closed. This is because we do a check to
210            //       see if we get a shutdown signal or ready state, and closing the channel
211            //       would cause recv() to resolve immediately and result in the task shutting
212            //       down.
213            let _shutdown_tx = shutdown_tx_2;
214
215            loop {
216                // If we have flagged that a reconnect is needed, attempt to do so
217                if needs_reconnect {
218                    info!("Client encountered issue, attempting to reconnect");
219                    debug!("Using strategy {reconnect_strategy:?}");
220                    match reconnect_strategy.reconnect(&mut connection).await {
221                        Ok(()) => {
222                            info!("Client successfully reconnected!");
223                            needs_reconnect = false;
224                            last_read_frame_time = Instant::now();
225                            watcher_tx.send_replace(ConnectionState::Connected);
226                        }
227                        Err(x) => {
228                            error!("Unable to re-establish connection: {x}");
229                            watcher_tx.send_replace(ConnectionState::Disconnected);
230                            break Err(x);
231                        }
232                    }
233                }
234
235                macro_rules! silence_needs_reconnect {
236                    () => {{
237                        info!(
238                            "Client exceeded {}s without server activity, so attempting to reconnect",
239                            silence_duration.as_secs_f32(),
240                        );
241                        needs_reconnect = true;
242                        watcher_tx.send_replace(ConnectionState::Reconnecting);
243                        continue;
244                    }};
245                }
246
247                let silence_time_remaining = silence_duration
248                    .checked_sub(last_read_frame_time.elapsed())
249                    .unwrap_or_default();
250
251                // NOTE: sleep will not trigger if duration is zero/nanosecond scale, so we
252                //       instead will do an early check here in the case that we need to reconnect
253                //       prior to a sleep while polling the ready status
254                if silence_time_remaining.as_millis() == 0 {
255                    silence_needs_reconnect!();
256                }
257
258                let ready = tokio::select! {
259                    // NOTE: This should NEVER return None as we never allow the channel to close.
260                    cb = shutdown_rx.recv() => {
261                        info!("Client got shutdown signal, so exiting event loop");
262                        let cb = cb.expect("Impossible: shutdown channel closed!");
263                        let _ = cb.send(Ok(()));
264                        watcher_tx.send_replace(ConnectionState::Disconnected);
265                        break Ok(());
266                    }
267                    _ = tokio::time::sleep(silence_time_remaining) => {
268                        silence_needs_reconnect!();
269                    }
270                    result = connection.ready(Interest::READABLE | Interest::WRITABLE) => {
271                        match result {
272                            Ok(result) => result,
273                            Err(x) => {
274                                error!("Failed to examine ready state: {x}");
275                                needs_reconnect = true;
276                                watcher_tx.send_replace(ConnectionState::Reconnecting);
277                                continue;
278                            }
279                        }
280                    }
281                };
282
283                let mut read_blocked = !ready.is_readable();
284                let mut write_blocked = !ready.is_writable();
285
286                if ready.is_readable() {
287                    match connection.try_read_frame() {
288                        // If we get an empty frame, we consider this a heartbeat and want
289                        // to adjust our frame read time and discard it from our backup
290                        Ok(Some(frame)) if frame.is_empty() => {
291                            trace!("Client received heartbeat");
292                            last_read_frame_time = Instant::now();
293                        }
294
295                        // Otherwise, we attempt to parse a frame as a response
296                        Ok(Some(frame)) => {
297                            last_read_frame_time = Instant::now();
298                            match UntypedResponse::from_slice(frame.as_item()) {
299                                Ok(response) => {
300                                    if log_enabled!(Level::Trace) {
301                                        trace!(
302                                            "Client receiving (id:{} | origin: {}): {}",
303                                            response.id,
304                                            response.origin_id,
305                                            String::from_utf8_lossy(&response.payload).to_string()
306                                        );
307                                    }
308
309                                    // For trace-level logging, we need to clone the id and
310                                    // origin id before passing the response ownership to
311                                    // be delivered elsewhere
312                                    let (id, origin_id) = if log_enabled!(Level::Trace) {
313                                        (response.id.to_string(), response.origin_id.to_string())
314                                    } else {
315                                        (String::new(), String::new())
316                                    };
317
318                                    // Try to send response to appropriate mailbox
319                                    // TODO: This will block if full... is that a problem?
320                                    if post_office
321                                        .deliver_untyped_response(response.into_owned())
322                                        .await
323                                    {
324                                        trace!("Client delivered response {id} to {origin_id}");
325                                    } else {
326                                        trace!("Client dropped response {id} to {origin_id}");
327                                    }
328                                }
329                                Err(x) => {
330                                    error!("Invalid response: {x}");
331                                }
332                            }
333                        }
334
335                        Ok(None) => {
336                            info!("Connection closed");
337                            needs_reconnect = true;
338                            watcher_tx.send_replace(ConnectionState::Reconnecting);
339                            continue;
340                        }
341                        Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
342                        Err(x) => {
343                            error!("Failed to read next frame: {x}");
344                            needs_reconnect = true;
345                            watcher_tx.send_replace(ConnectionState::Reconnecting);
346                            continue;
347                        }
348                    }
349                }
350
351                if ready.is_writable() {
352                    // If we get more data to write, attempt to write it, which will result in
353                    // writing any queued bytes as well. Othewise, we attempt to flush any pending
354                    // outgoing bytes that weren't sent earlier.
355                    if let Ok(request) = rx.try_recv() {
356                        if log_enabled!(Level::Trace) {
357                            trace!(
358                                "Client sending {}",
359                                String::from_utf8_lossy(&request.to_bytes()).to_string()
360                            );
361                        }
362                        match connection.try_write_frame(request.to_bytes()) {
363                            Ok(()) => (),
364                            Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
365                            Err(x) => {
366                                error!("Failed to write frame: {x}");
367                                needs_reconnect = true;
368                                watcher_tx.send_replace(ConnectionState::Reconnecting);
369                                continue;
370                            }
371                        }
372                    } else {
373                        // In the case of flushing, there are two scenarios in which we want to
374                        // mark no write occurring:
375                        //
376                        // 1. When flush did not write any bytes, which can happen when the buffer
377                        //    is empty
378                        // 2. When the call to write bytes blocks
379                        match connection.try_flush() {
380                            Ok(0) => write_blocked = true,
381                            Ok(_) => (),
382                            Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
383                            Err(x) => {
384                                error!("Failed to flush outgoing data: {x}");
385                                needs_reconnect = true;
386                                watcher_tx.send_replace(ConnectionState::Reconnecting);
387                                continue;
388                            }
389                        }
390                    }
391                }
392
393                // If we did not read or write anything, sleep a bit to offload CPU usage
394                if read_blocked && write_blocked {
395                    tokio::time::sleep(SLEEP_DURATION).await;
396                }
397            }
398        });
399
400        let channel = UntypedChannel {
401            tx,
402            post_office: weak_post_office,
403        };
404
405        Self {
406            channel,
407            watcher: ConnectionWatcher(watcher_rx),
408            shutdown: Box::new(shutdown_tx),
409            shutdown_on_drop,
410            task: Some(task),
411        }
412    }
413}
414
415impl Deref for UntypedClient {
416    type Target = UntypedChannel;
417
418    fn deref(&self) -> &Self::Target {
419        &self.channel
420    }
421}
422
423impl DerefMut for UntypedClient {
424    fn deref_mut(&mut self) -> &mut Self::Target {
425        &mut self.channel
426    }
427}
428
429impl From<UntypedClient> for UntypedChannel {
430    fn from(client: UntypedClient) -> Self {
431        client.into_channel()
432    }
433}
434
435/// Represents a client that can be used to send requests & receive responses from a server.
436pub struct Client<T, U> {
437    /// Used to send requests to a server.
438    channel: Channel<T, U>,
439
440    /// Used to watch for changes in the connection state.
441    watcher: ConnectionWatcher,
442
443    /// Used to send shutdown request to inner task.
444    shutdown: Box<dyn Shutdown>,
445
446    /// Indicates whether the client task will be shutdown when the client is dropped.
447    shutdown_on_drop: bool,
448
449    /// Contains the task that is running to send requests and receive responses from a server.
450    task: Option<JoinHandle<io::Result<()>>>,
451}
452
453impl<T, U> fmt::Debug for Client<T, U> {
454    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
455        f.debug_struct("Client")
456            .field("channel", &self.channel)
457            .field("shutdown", &"...")
458            .field("task", &self.task)
459            .field("shutdown_on_drop", &self.shutdown_on_drop)
460            .finish()
461    }
462}
463
464impl<T, U> Drop for Client<T, U> {
465    fn drop(&mut self) {
466        if self.shutdown_on_drop {
467            // TODO: Shutdown is an async operation, can we use it here?
468            if let Some(task) = self.task.take() {
469                debug!("Shutdown on drop = true, so aborting client task");
470                task.abort();
471            }
472        }
473    }
474}
475
476impl<T, U> Client<T, U>
477where
478    T: Send + Sync + Serialize + 'static,
479    U: Send + Sync + DeserializeOwned + 'static,
480{
481    /// Consumes the client, returning an untyped variant.
482    pub fn into_untyped_client(mut self) -> UntypedClient {
483        UntypedClient {
484            channel: self.clone_channel().into_untyped_channel(),
485            watcher: self.watcher.clone(),
486            shutdown: self.shutdown.clone(),
487            shutdown_on_drop: self.shutdown_on_drop,
488            task: self.task.take(),
489        }
490    }
491
492    /// Spawns a client using the provided [`FramedTransport`] of [`InmemoryTransport`] and a
493    /// specific [`ReconnectStrategy`].
494    ///
495    /// ### Note
496    ///
497    /// This will NOT perform any handshakes or authentication procedures nor will it replay any
498    /// missing frames. This is to be used when establishing a [`Client`] to be run internally
499    /// within a program.
500    pub fn spawn_inmemory(
501        transport: FramedTransport<InmemoryTransport>,
502        config: ClientConfig,
503    ) -> Self {
504        UntypedClient::spawn_inmemory(transport, config).into_typed_client()
505    }
506}
507
508impl Client<(), ()> {
509    /// Creates a new [`ClientBuilder`].
510    pub fn build() -> ClientBuilder<(), ()> {
511        ClientBuilder::new()
512    }
513
514    /// Creates a new [`ClientBuilder`] configured to use a [`TcpConnector`].
515    pub fn tcp<T>(connector: impl Into<TcpConnector<T>>) -> ClientBuilder<(), TcpConnector<T>> {
516        ClientBuilder::new().connector(connector.into())
517    }
518
519    /// Creates a new [`ClientBuilder`] configured to use a [`UnixSocketConnector`].
520    #[cfg(unix)]
521    pub fn unix_socket(
522        connector: impl Into<UnixSocketConnector>,
523    ) -> ClientBuilder<(), UnixSocketConnector> {
524        ClientBuilder::new().connector(connector.into())
525    }
526
527    /// Creates a new [`ClientBuilder`] configured to use a local [`WindowsPipeConnector`].
528    #[cfg(windows)]
529    pub fn local_windows_pipe(
530        connector: impl Into<WindowsPipeConnector>,
531    ) -> ClientBuilder<(), WindowsPipeConnector> {
532        let mut connector = connector.into();
533        connector.local = true;
534        ClientBuilder::new().connector(connector)
535    }
536
537    /// Creates a new [`ClientBuilder`] configured to use a [`WindowsPipeConnector`].
538    #[cfg(windows)]
539    pub fn windows_pipe(
540        connector: impl Into<WindowsPipeConnector>,
541    ) -> ClientBuilder<(), WindowsPipeConnector> {
542        ClientBuilder::new().connector(connector.into())
543    }
544}
545
546impl<T, U> Client<T, U> {
547    /// Convert into underlying channel.
548    pub fn into_channel(self) -> Channel<T, U> {
549        self.clone_channel()
550    }
551
552    /// Clones the underlying channel for requests and returns the cloned instance.
553    pub fn clone_channel(&self) -> Channel<T, U> {
554        self.channel.clone()
555    }
556
557    /// Waits for the client to terminate, which resolves when the receiving end of the network
558    /// connection is closed (or the client is shutdown). Returns whether or not the client exited
559    /// successfully or due to an error.
560    pub async fn wait(mut self) -> io::Result<()> {
561        match self.task.take().unwrap().await {
562            Ok(x) => x,
563            Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)),
564        }
565    }
566
567    /// Abort the client's current connection by forcing its tasks to abort.
568    pub fn abort(&self) {
569        if let Some(task) = self.task.as_ref() {
570            task.abort();
571        }
572    }
573
574    /// Clones the underlying shutdown signaler for the client. This enables you to wait on the
575    /// client while still having the option to shut it down from somewhere else.
576    pub fn clone_shutdown(&self) -> Box<dyn Shutdown> {
577        self.shutdown.clone()
578    }
579
580    /// Signal for the client to shutdown its connection cleanly.
581    pub async fn shutdown(&self) -> io::Result<()> {
582        self.shutdown.shutdown().await
583    }
584
585    /// Returns whether the client should fully shutdown once it is dropped. If true, this will
586    /// result in all channels tied to the client no longer functioning once the client is dropped.
587    pub fn will_shutdown_on_drop(&mut self) -> bool {
588        self.shutdown_on_drop
589    }
590
591    /// Sets whether the client should fully shutdown once it is dropped. If true, this will result
592    /// in all channels tied to the client no longer functioning once the client is dropped.
593    pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) {
594        self.shutdown_on_drop = shutdown_on_drop;
595    }
596
597    /// Clones the underlying [`ConnectionStateWatcher`] for the client.
598    pub fn clone_connection_watcher(&self) -> ConnectionWatcher {
599        self.watcher.clone()
600    }
601
602    /// Spawns a new task that continually monitors for connection changes and invokes the function
603    /// `f` whenever a new change is detected.
604    pub fn on_connection_change<F>(&self, f: F) -> JoinHandle<()>
605    where
606        F: FnMut(ConnectionState) + Send + 'static,
607    {
608        self.watcher.on_change(f)
609    }
610
611    /// Returns true if client's underlying event processing has finished/terminated.
612    pub fn is_finished(&self) -> bool {
613        self.task.is_none() || self.task.as_ref().unwrap().is_finished()
614    }
615}
616
617impl<T, U> Deref for Client<T, U> {
618    type Target = Channel<T, U>;
619
620    fn deref(&self) -> &Self::Target {
621        &self.channel
622    }
623}
624
625impl<T, U> DerefMut for Client<T, U> {
626    fn deref_mut(&mut self) -> &mut Self::Target {
627        &mut self.channel
628    }
629}
630
631impl<T, U> From<Client<T, U>> for Channel<T, U> {
632    fn from(client: Client<T, U>) -> Self {
633        client.clone_channel()
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use crate::client::ClientConfig;
641    use crate::common::{Ready, Request, Response, TestTransport};
642
643    mod typed {
644        use test_log::test;
645
646        use super::*;
647        type TestClient = Client<u8, u8>;
648
649        fn spawn_test_client<T>(
650            connection: Connection<T>,
651            reconnect_strategy: ReconnectStrategy,
652        ) -> TestClient
653        where
654            T: Transport + 'static,
655        {
656            UntypedClient::spawn(
657                connection,
658                ClientConfig {
659                    reconnect_strategy,
660                    ..Default::default()
661                },
662            )
663            .into_typed_client()
664        }
665
666        /// Creates a new test transport whose operations do not panic, but do nothing.
667        #[inline]
668        fn new_test_transport() -> TestTransport {
669            TestTransport {
670                f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
671                f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
672                f_ready: Box::new(|_| Ok(Ready::EMPTY)),
673                f_reconnect: Box::new(|| Ok(())),
674            }
675        }
676
677        #[test(tokio::test)]
678        async fn should_write_queued_requests_as_outgoing_frames() {
679            let (client, mut server) = Connection::pair(100);
680
681            let mut client = spawn_test_client(client, ReconnectStrategy::Fail);
682            client.fire(Request::new(1u8)).await.unwrap();
683            client.fire(Request::new(2u8)).await.unwrap();
684            client.fire(Request::new(3u8)).await.unwrap();
685
686            assert_eq!(
687                server
688                    .read_frame_as::<Request<u8>>()
689                    .await
690                    .unwrap()
691                    .unwrap()
692                    .payload,
693                1
694            );
695            assert_eq!(
696                server
697                    .read_frame_as::<Request<u8>>()
698                    .await
699                    .unwrap()
700                    .unwrap()
701                    .payload,
702                2
703            );
704            assert_eq!(
705                server
706                    .read_frame_as::<Request<u8>>()
707                    .await
708                    .unwrap()
709                    .unwrap()
710                    .payload,
711                3
712            );
713        }
714
715        #[test(tokio::test)]
716        async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() {
717            let (client, mut server) = Connection::pair(100);
718
719            // NOTE: Spawn a separate task to handle the response so we do not deadlock
720            tokio::spawn(async move {
721                let request = server
722                    .read_frame_as::<Request<u8>>()
723                    .await
724                    .unwrap()
725                    .unwrap();
726                server
727                    .write_frame_for(&Response::new(request.id, 2u8))
728                    .await
729                    .unwrap();
730            });
731
732            let mut client = spawn_test_client(client, ReconnectStrategy::Fail);
733            assert_eq!(client.send(Request::new(1u8)).await.unwrap().payload, 2);
734        }
735
736        #[test(tokio::test)]
737        async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() {
738            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
739            spawn_test_client(
740                Connection::test_client({
741                    let mut transport = new_test_transport();
742
743                    transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into()));
744
745                    // Send a signal that the reconnect happened while marking it successful
746                    transport.f_reconnect = Box::new(move || {
747                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
748                        Ok(())
749                    });
750
751                    transport
752                }),
753                ReconnectStrategy::FixedInterval {
754                    interval: Duration::from_millis(50),
755                    max_retries: None,
756                    timeout: None,
757                },
758            );
759
760            reconnect_rx.recv().await.expect("Reconnect did not occur");
761        }
762
763        #[test(tokio::test)]
764        async fn should_attempt_to_reconnect_if_connection_closed_by_server() {
765            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
766            spawn_test_client(
767                Connection::test_client({
768                    let mut transport = new_test_transport();
769
770                    // Report back that we're readable to trigger try_read
771                    transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
772
773                    // Report that no bytes were written, indicting the channel was closed
774                    transport.f_try_read = Box::new(|_| Ok(0));
775
776                    // Send a signal that the reconnect happened while marking it successful
777                    transport.f_reconnect = Box::new(move || {
778                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
779                        Ok(())
780                    });
781
782                    transport
783                }),
784                ReconnectStrategy::FixedInterval {
785                    interval: Duration::from_millis(50),
786                    max_retries: None,
787                    timeout: None,
788                },
789            );
790
791            reconnect_rx.recv().await.expect("Reconnect did not occur");
792        }
793
794        #[test(tokio::test)]
795        async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() {
796            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
797            spawn_test_client(
798                Connection::test_client({
799                    let mut transport = new_test_transport();
800
801                    // Report back that we're readable to trigger try_read
802                    transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
803
804                    // Fail the read
805                    transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into()));
806
807                    // Send a signal that the reconnect happened while marking it successful
808                    transport.f_reconnect = Box::new(move || {
809                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
810                        Ok(())
811                    });
812
813                    transport
814                }),
815                ReconnectStrategy::FixedInterval {
816                    interval: Duration::from_millis(50),
817                    max_retries: None,
818                    timeout: None,
819                },
820            );
821
822            reconnect_rx.recv().await.expect("Reconnect did not occur");
823        }
824
825        #[test(tokio::test)]
826        async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() {
827            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
828            let mut client = spawn_test_client(
829                Connection::test_client({
830                    let mut transport = new_test_transport();
831
832                    // Report back that we're readable to trigger try_read
833                    transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
834
835                    // Fail the write
836                    transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into()));
837
838                    // Send a signal that the reconnect happened while marking it successful
839                    transport.f_reconnect = Box::new(move || {
840                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
841                        Ok(())
842                    });
843
844                    transport
845                }),
846                ReconnectStrategy::FixedInterval {
847                    interval: Duration::from_millis(50),
848                    max_retries: None,
849                    timeout: None,
850                },
851            );
852
853            // Queue up a request to fail to send
854            client
855                .fire(Request::new(123u8))
856                .await
857                .expect("Failed to queue request");
858
859            reconnect_rx.recv().await.expect("Reconnect did not occur");
860        }
861
862        #[test(tokio::test)]
863        async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() {
864            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
865            let mut client = spawn_test_client(
866                Connection::test_client({
867                    let mut transport = new_test_transport();
868
869                    // Report back that we're readable to trigger try_read
870                    transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
871
872                    // Succeed partially with initial try_write, block on second call, and then
873                    // fail during a try_flush
874                    transport.f_try_write = Box::new(|buf| unsafe {
875                        static mut CNT: u8 = 0;
876                        CNT += 1;
877                        if CNT == 1 {
878                            Ok(buf.len() / 2)
879                        } else if CNT == 2 {
880                            Err(io::ErrorKind::WouldBlock.into())
881                        } else {
882                            Err(io::ErrorKind::Other.into())
883                        }
884                    });
885
886                    // Send a signal that the reconnect happened while marking it successful
887                    transport.f_reconnect = Box::new(move || {
888                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
889                        Ok(())
890                    });
891
892                    transport
893                }),
894                ReconnectStrategy::FixedInterval {
895                    interval: Duration::from_millis(50),
896                    max_retries: None,
897                    timeout: None,
898                },
899            );
900
901            // Queue up a request to fail to send
902            client
903                .fire(Request::new(123u8))
904                .await
905                .expect("Failed to queue request");
906
907            reconnect_rx.recv().await.expect("Reconnect did not occur");
908        }
909
910        #[test(tokio::test)]
911        async fn should_exit_if_reconnect_strategy_has_failed_to_connect() {
912            let (client, server) = Connection::pair(100);
913
914            // Spawn the client, verify the task is running, kill our server, and verify that the
915            // client does not block trying to reconnect
916            let client = spawn_test_client(client, ReconnectStrategy::Fail);
917            assert!(!client.is_finished(), "Client unexpectedly died");
918            drop(server);
919            assert_eq!(
920                client.wait().await.unwrap_err().kind(),
921                io::ErrorKind::ConnectionAborted
922            );
923        }
924
925        #[test(tokio::test)]
926        async fn should_exit_if_shutdown_signal_detected() {
927            let (client, _server) = Connection::pair(100);
928
929            let client = spawn_test_client(client, ReconnectStrategy::Fail);
930            client.shutdown().await.unwrap();
931
932            // NOTE: We wait for the client's task to conclude by using `wait` to ensure we do not
933            //       have a race condition testing the task finished state. This will also verify
934            //       that the task exited cleanly, rather than panicking.
935            client.wait().await.unwrap();
936        }
937
938        #[test(tokio::test)]
939        async fn should_not_exit_if_shutdown_channel_is_closed() {
940            let (client, mut server) = Connection::pair(100);
941
942            // NOTE: Spawn a separate task to handle the response so we do not deadlock
943            tokio::spawn(async move {
944                let request = server
945                    .read_frame_as::<Request<u8>>()
946                    .await
947                    .unwrap()
948                    .unwrap();
949                server
950                    .write_frame_for(&Response::new(request.id, 2u8))
951                    .await
952                    .unwrap();
953            });
954
955            // NOTE: We consume the client to produce a channel without maintaining the shutdown
956            //       channel in order to ensure that dropping the client does not kill the task.
957            let mut channel = spawn_test_client(client, ReconnectStrategy::Fail).into_channel();
958            assert_eq!(channel.send(Request::new(1u8)).await.unwrap().payload, 2);
959        }
960    }
961
962    mod untyped {
963        use test_log::test;
964
965        use super::*;
966        type TestClient = UntypedClient;
967
968        /// Creates a new test transport whose operations do not panic, but do nothing.
969        #[inline]
970        fn new_test_transport() -> TestTransport {
971            TestTransport {
972                f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
973                f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())),
974                f_ready: Box::new(|_| Ok(Ready::EMPTY)),
975                f_reconnect: Box::new(|| Ok(())),
976            }
977        }
978
979        #[test(tokio::test)]
980        async fn should_write_queued_requests_as_outgoing_frames() {
981            let (client, mut server) = Connection::pair(100);
982
983            let mut client = TestClient::spawn(client, Default::default());
984            client
985                .fire(Request::new(1u8).to_untyped_request().unwrap())
986                .await
987                .unwrap();
988            client
989                .fire(Request::new(2u8).to_untyped_request().unwrap())
990                .await
991                .unwrap();
992            client
993                .fire(Request::new(3u8).to_untyped_request().unwrap())
994                .await
995                .unwrap();
996
997            assert_eq!(
998                server
999                    .read_frame_as::<Request<u8>>()
1000                    .await
1001                    .unwrap()
1002                    .unwrap()
1003                    .payload,
1004                1
1005            );
1006            assert_eq!(
1007                server
1008                    .read_frame_as::<Request<u8>>()
1009                    .await
1010                    .unwrap()
1011                    .unwrap()
1012                    .payload,
1013                2
1014            );
1015            assert_eq!(
1016                server
1017                    .read_frame_as::<Request<u8>>()
1018                    .await
1019                    .unwrap()
1020                    .unwrap()
1021                    .payload,
1022                3
1023            );
1024        }
1025
1026        #[test(tokio::test)]
1027        async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() {
1028            let (client, mut server) = Connection::pair(100);
1029
1030            // NOTE: Spawn a separate task to handle the response so we do not deadlock
1031            tokio::spawn(async move {
1032                let request = server
1033                    .read_frame_as::<Request<u8>>()
1034                    .await
1035                    .unwrap()
1036                    .unwrap();
1037                server
1038                    .write_frame_for(&Response::new(request.id, 2u8))
1039                    .await
1040                    .unwrap();
1041            });
1042
1043            let mut client = TestClient::spawn(client, Default::default());
1044            assert_eq!(
1045                client
1046                    .send(Request::new(1u8).to_untyped_request().unwrap())
1047                    .await
1048                    .unwrap()
1049                    .to_typed_response::<u8>()
1050                    .unwrap()
1051                    .payload,
1052                2
1053            );
1054        }
1055
1056        #[test(tokio::test)]
1057        async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() {
1058            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1059            TestClient::spawn(
1060                Connection::test_client({
1061                    let mut transport = new_test_transport();
1062
1063                    transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into()));
1064
1065                    // Send a signal that the reconnect happened while marking it successful
1066                    transport.f_reconnect = Box::new(move || {
1067                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1068                        Ok(())
1069                    });
1070
1071                    transport
1072                }),
1073                ClientConfig {
1074                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1075                        interval: Duration::from_millis(50),
1076                        max_retries: None,
1077                        timeout: None,
1078                    },
1079                    ..Default::default()
1080                },
1081            );
1082
1083            reconnect_rx.recv().await.expect("Reconnect did not occur");
1084        }
1085
1086        #[test(tokio::test)]
1087        async fn should_attempt_to_reconnect_if_connection_closed_by_server() {
1088            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1089            TestClient::spawn(
1090                Connection::test_client({
1091                    let mut transport = new_test_transport();
1092
1093                    // Report back that we're readable to trigger try_read
1094                    transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
1095
1096                    // Report that no bytes were written, indicting the channel was closed
1097                    transport.f_try_read = Box::new(|_| Ok(0));
1098
1099                    // Send a signal that the reconnect happened while marking it successful
1100                    transport.f_reconnect = Box::new(move || {
1101                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1102                        Ok(())
1103                    });
1104
1105                    transport
1106                }),
1107                ClientConfig {
1108                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1109                        interval: Duration::from_millis(50),
1110                        max_retries: None,
1111                        timeout: None,
1112                    },
1113                    ..Default::default()
1114                },
1115            );
1116
1117            reconnect_rx.recv().await.expect("Reconnect did not occur");
1118        }
1119
1120        #[test(tokio::test)]
1121        async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() {
1122            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1123            TestClient::spawn(
1124                Connection::test_client({
1125                    let mut transport = new_test_transport();
1126
1127                    // Report back that we're readable to trigger try_read
1128                    transport.f_ready = Box::new(|_| Ok(Ready::READABLE));
1129
1130                    // Fail the read
1131                    transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into()));
1132
1133                    // Send a signal that the reconnect happened while marking it successful
1134                    transport.f_reconnect = Box::new(move || {
1135                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1136                        Ok(())
1137                    });
1138
1139                    transport
1140                }),
1141                ClientConfig {
1142                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1143                        interval: Duration::from_millis(50),
1144                        max_retries: None,
1145                        timeout: None,
1146                    },
1147                    ..Default::default()
1148                },
1149            );
1150
1151            reconnect_rx.recv().await.expect("Reconnect did not occur");
1152        }
1153
1154        #[test(tokio::test)]
1155        async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() {
1156            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1157            let mut client = TestClient::spawn(
1158                Connection::test_client({
1159                    let mut transport = new_test_transport();
1160
1161                    // Report back that we're readable to trigger try_read
1162                    transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
1163
1164                    // Fail the write
1165                    transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into()));
1166
1167                    // Send a signal that the reconnect happened while marking it successful
1168                    transport.f_reconnect = Box::new(move || {
1169                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1170                        Ok(())
1171                    });
1172
1173                    transport
1174                }),
1175                ClientConfig {
1176                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1177                        interval: Duration::from_millis(50),
1178                        max_retries: None,
1179                        timeout: None,
1180                    },
1181                    ..Default::default()
1182                },
1183            );
1184
1185            // Queue up a request to fail to send
1186            client
1187                .fire(Request::new(123u8).to_untyped_request().unwrap())
1188                .await
1189                .expect("Failed to queue request");
1190
1191            reconnect_rx.recv().await.expect("Reconnect did not occur");
1192        }
1193
1194        #[test(tokio::test)]
1195        async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() {
1196            let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1197            let mut client = TestClient::spawn(
1198                Connection::test_client({
1199                    let mut transport = new_test_transport();
1200
1201                    // Report back that we're readable to trigger try_read
1202                    transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE));
1203
1204                    // Succeed partially with initial try_write, block on second call, and then
1205                    // fail during a try_flush
1206                    transport.f_try_write = Box::new(|buf| unsafe {
1207                        static mut CNT: u8 = 0;
1208                        CNT += 1;
1209                        if CNT == 1 {
1210                            Ok(buf.len() / 2)
1211                        } else if CNT == 2 {
1212                            Err(io::ErrorKind::WouldBlock.into())
1213                        } else {
1214                            Err(io::ErrorKind::Other.into())
1215                        }
1216                    });
1217
1218                    // Send a signal that the reconnect happened while marking it successful
1219                    transport.f_reconnect = Box::new(move || {
1220                        reconnect_tx.try_send(()).expect("reconnect tx blocked");
1221                        Ok(())
1222                    });
1223
1224                    transport
1225                }),
1226                ClientConfig {
1227                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1228                        interval: Duration::from_millis(50),
1229                        max_retries: None,
1230                        timeout: None,
1231                    },
1232                    ..Default::default()
1233                },
1234            );
1235
1236            // Queue up a request to fail to send
1237            client
1238                .fire(Request::new(123u8).to_untyped_request().unwrap())
1239                .await
1240                .expect("Failed to queue request");
1241
1242            reconnect_rx.recv().await.expect("Reconnect did not occur");
1243        }
1244
1245        #[test(tokio::test)]
1246        async fn should_exit_if_reconnect_strategy_has_failed_to_connect() {
1247            let (client, server) = Connection::pair(100);
1248
1249            // Spawn the client, verify the task is running, kill our server, and verify that the
1250            // client does not block trying to reconnect
1251            let client = TestClient::spawn(client, Default::default());
1252            assert!(!client.is_finished(), "Client unexpectedly died");
1253            drop(server);
1254            assert_eq!(
1255                client.wait().await.unwrap_err().kind(),
1256                io::ErrorKind::ConnectionAborted
1257            );
1258        }
1259
1260        #[test(tokio::test)]
1261        async fn should_exit_if_shutdown_signal_detected() {
1262            let (client, _server) = Connection::pair(100);
1263
1264            let client = TestClient::spawn(client, Default::default());
1265            client.shutdown().await.unwrap();
1266
1267            // NOTE: We wait for the client's task to conclude by using `wait` to ensure we do not
1268            //       have a race condition testing the task finished state. This will also verify
1269            //       that the task exited cleanly, rather than panicking.
1270            client.wait().await.unwrap();
1271        }
1272
1273        #[test(tokio::test)]
1274        async fn should_not_exit_if_shutdown_channel_is_closed() {
1275            let (client, mut server) = Connection::pair(100);
1276
1277            // NOTE: Spawn a separate task to handle the response so we do not deadlock
1278            tokio::spawn(async move {
1279                let request = server
1280                    .read_frame_as::<Request<u8>>()
1281                    .await
1282                    .unwrap()
1283                    .unwrap();
1284                server
1285                    .write_frame_for(&Response::new(request.id, 2u8))
1286                    .await
1287                    .unwrap();
1288            });
1289
1290            // NOTE: We consume the client to produce a channel without maintaining the shutdown
1291            //       channel in order to ensure that dropping the client does not kill the task.
1292            let mut channel = TestClient::spawn(client, Default::default()).into_channel();
1293            assert_eq!(
1294                channel
1295                    .send(Request::new(1u8).to_untyped_request().unwrap())
1296                    .await
1297                    .unwrap()
1298                    .to_typed_response::<u8>()
1299                    .unwrap()
1300                    .payload,
1301                2
1302            );
1303        }
1304
1305        #[test(tokio::test)]
1306        async fn should_attempt_to_reconnect_if_no_activity_from_server_within_silence_duration() {
1307            let (client, _) = Connection::pair(100);
1308
1309            // NOTE: We consume the client to produce a channel without maintaining the shutdown
1310            //       channel in order to ensure that dropping the client does not kill the task.
1311            let client = TestClient::spawn(
1312                client,
1313                ClientConfig {
1314                    silence_duration: Duration::from_millis(100),
1315                    reconnect_strategy: ReconnectStrategy::FixedInterval {
1316                        interval: Duration::from_millis(50),
1317                        max_retries: Some(3),
1318                        timeout: None,
1319                    },
1320                    ..Default::default()
1321                },
1322            );
1323
1324            let (tx, mut rx) = mpsc::unbounded_channel();
1325            client.on_connection_change(move |state| tx.send(state).unwrap());
1326            assert_eq!(rx.recv().await, Some(ConnectionState::Reconnecting));
1327            assert_eq!(rx.recv().await, Some(ConnectionState::Disconnected));
1328            assert_eq!(rx.recv().await, None);
1329        }
1330    }
1331}