aldrin_test/
tokio.rs

1//! Tokio-based utilities for Aldrin server and client tests
2//!
3//! The types in this module are conceptually identical to the ones in the top-level crate, but are
4//! more convenient if you use Tokio, because they all automatically spawn the required tasks.
5
6#[cfg(test)]
7mod test;
8
9use aldrin::error::RunError;
10use aldrin::Handle;
11use aldrin_broker::{BrokerHandle, ConnectionError, ConnectionHandle};
12use aldrin_core::channel::Disconnected;
13use std::ops::{Deref, DerefMut};
14use tokio::task::JoinHandle;
15
16/// Tokio-based broker for use in tests.
17///
18/// This type is a simple wrapper around [`aldrin_broker::Broker`] and
19/// [`aldrin_broker::BrokerHandle`]. All method of [`BrokerHandle`] can be called on this type as
20/// well due to its [`Deref`] implementation.
21///
22/// See the [`tokio` module documentation](self) for usage examples.
23#[derive(Debug)]
24pub struct TestBroker {
25    inner: crate::TestBroker,
26    join: Option<JoinHandle<()>>,
27}
28
29impl TestBroker {
30    /// Creates a new broker.
31    pub fn new() -> Self {
32        let mut inner = crate::TestBroker::new();
33
34        Self {
35            join: Some(tokio::spawn(inner.take_broker().run())),
36            inner,
37        }
38    }
39
40    /// Returns a handle to the broker.
41    pub fn handle(&self) -> &BrokerHandle {
42        &self.inner
43    }
44
45    /// Shuts down the broker and joins its task.
46    ///
47    /// This function cannot be canceled in a meaningful way after it has been polled once, because
48    /// it would panic if called (and polled) again. Ensure that you call it only once and then poll
49    /// it to completion.
50    ///
51    /// # Panics
52    ///
53    /// This function will panic if the [`Broker`](aldrin_broker::Broker) task has already been
54    /// joined or attempted to join (see notes above as well).
55    pub async fn join(&mut self) {
56        self.inner.shutdown().await;
57        self.join.take().expect("already joined").await.unwrap();
58    }
59
60    /// Shuts down the broker when idle and joins its task.
61    ///
62    /// This function cannot be canceled in a meaningful way after it has been polled once, because
63    /// it would panic if called (and polled) again. Ensure that you call it only once and then poll
64    /// it to completion.
65    ///
66    /// # Panics
67    ///
68    /// This function will panic if the [`Broker`](aldrin_broker::Broker) task has already been
69    /// joined or attempted to join (see notes above as well).
70    pub async fn join_idle(&mut self) {
71        self.inner.shutdown_idle().await;
72        self.join.take().expect("already joined").await.unwrap();
73    }
74
75    /// Creates a new `Client`.
76    pub async fn add_client(&mut self) -> TestClient {
77        let inner = self.inner.add_client().await;
78        TestClient::new(inner)
79    }
80}
81
82impl Default for TestBroker {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl Deref for TestBroker {
89    type Target = BrokerHandle;
90
91    fn deref(&self) -> &BrokerHandle {
92        &self.inner
93    }
94}
95
96impl DerefMut for TestBroker {
97    fn deref_mut(&mut self) -> &mut BrokerHandle {
98        &mut self.inner
99    }
100}
101
102/// Tokio-based client for use in tests.
103///
104/// [`TestClient`] dereferences to [`aldrin::Handle`] and thus all methods on [`Handle`] can
105/// be called on [`TestClient`] as well.
106#[derive(Debug)]
107pub struct TestClient {
108    inner: crate::TestClient,
109    client: Option<JoinHandle<Result<(), RunError<Disconnected>>>>,
110    conn: Option<JoinHandle<Result<(), ConnectionError<Disconnected>>>>,
111}
112
113impl TestClient {
114    fn new(mut inner: crate::TestClient) -> Self {
115        Self {
116            client: Some(tokio::spawn(inner.take_client().run())),
117            conn: Some(tokio::spawn(inner.take_connection().run())),
118            inner,
119        }
120    }
121
122    /// Returns a handle to the client.
123    pub fn handle(&self) -> &Handle {
124        &self.inner
125    }
126
127    /// Returns a handle to the connection.
128    pub fn connection(&self) -> &ConnectionHandle {
129        self.inner.connection()
130    }
131
132    /// Shuts down the client and joins the client and connection tasks.
133    ///
134    /// This function cannot be canceled in a meaningful way after it has been polled once, because
135    /// it would panic if called (and polled) again. Ensure you call it only once and then poll it
136    /// to completion.
137    ///
138    /// # Panics
139    ///
140    /// This function will panic if the tasks have already been joined or attempted to join (see
141    /// notes above as well).
142    pub async fn join(&mut self) {
143        self.inner.shutdown();
144        self.join_client().await;
145        self.join_connection().await;
146    }
147
148    async fn join_client(&mut self) {
149        self.client
150            .take()
151            .expect("client already joined")
152            .await
153            .unwrap()
154            .unwrap();
155    }
156
157    async fn join_connection(&mut self) {
158        self.conn
159            .take()
160            .expect("connection already joined")
161            .await
162            .unwrap()
163            .unwrap();
164    }
165}
166
167impl Deref for TestClient {
168    type Target = Handle;
169
170    fn deref(&self) -> &Handle {
171        &self.inner
172    }
173}