aldrin_test/tokio.rs
1//! Tokio-based utilities for Aldrin server and client tests
2//!
3//! The types in this module are conceptually identical to the ones in the top-level crate, but are
4//! more convenient if you use Tokio, because they all automatically spawn the required tasks.
5
6#[cfg(test)]
7mod test;
8
9use aldrin::error::RunError;
10use aldrin::Handle;
11use aldrin_broker::{BrokerHandle, ConnectionError, ConnectionHandle};
12use aldrin_core::channel::Disconnected;
13use std::ops::{Deref, DerefMut};
14use tokio::task::JoinHandle;
15
16/// Tokio-based broker for use in tests.
17///
18/// This type is a simple wrapper around [`aldrin_broker::Broker`] and
19/// [`aldrin_broker::BrokerHandle`]. All method of [`BrokerHandle`] can be called on this type as
20/// well due to its [`Deref`] implementation.
21///
22/// See the [`tokio` module documentation](self) for usage examples.
23#[derive(Debug)]
24pub struct TestBroker {
25 inner: crate::TestBroker,
26 join: Option<JoinHandle<()>>,
27}
28
29impl TestBroker {
30 /// Creates a new broker.
31 pub fn new() -> Self {
32 let mut inner = crate::TestBroker::new();
33
34 Self {
35 join: Some(tokio::spawn(inner.take_broker().run())),
36 inner,
37 }
38 }
39
40 /// Returns a handle to the broker.
41 pub fn handle(&self) -> &BrokerHandle {
42 &self.inner
43 }
44
45 /// Shuts down the broker and joins its task.
46 ///
47 /// This function cannot be canceled in a meaningful way after it has been polled once, because
48 /// it would panic if called (and polled) again. Ensure that you call it only once and then poll
49 /// it to completion.
50 ///
51 /// # Panics
52 ///
53 /// This function will panic if the [`Broker`](aldrin_broker::Broker) task has already been
54 /// joined or attempted to join (see notes above as well).
55 pub async fn join(&mut self) {
56 self.inner.shutdown().await;
57 self.join.take().expect("already joined").await.unwrap();
58 }
59
60 /// Shuts down the broker when idle and joins its task.
61 ///
62 /// This function cannot be canceled in a meaningful way after it has been polled once, because
63 /// it would panic if called (and polled) again. Ensure that you call it only once and then poll
64 /// it to completion.
65 ///
66 /// # Panics
67 ///
68 /// This function will panic if the [`Broker`](aldrin_broker::Broker) task has already been
69 /// joined or attempted to join (see notes above as well).
70 pub async fn join_idle(&mut self) {
71 self.inner.shutdown_idle().await;
72 self.join.take().expect("already joined").await.unwrap();
73 }
74
75 /// Creates a new `Client`.
76 pub async fn add_client(&mut self) -> TestClient {
77 let inner = self.inner.add_client().await;
78 TestClient::new(inner)
79 }
80}
81
82impl Default for TestBroker {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl Deref for TestBroker {
89 type Target = BrokerHandle;
90
91 fn deref(&self) -> &BrokerHandle {
92 &self.inner
93 }
94}
95
96impl DerefMut for TestBroker {
97 fn deref_mut(&mut self) -> &mut BrokerHandle {
98 &mut self.inner
99 }
100}
101
102/// Tokio-based client for use in tests.
103///
104/// [`TestClient`] dereferences to [`aldrin::Handle`] and thus all methods on [`Handle`] can
105/// be called on [`TestClient`] as well.
106#[derive(Debug)]
107pub struct TestClient {
108 inner: crate::TestClient,
109 client: Option<JoinHandle<Result<(), RunError<Disconnected>>>>,
110 conn: Option<JoinHandle<Result<(), ConnectionError<Disconnected>>>>,
111}
112
113impl TestClient {
114 fn new(mut inner: crate::TestClient) -> Self {
115 Self {
116 client: Some(tokio::spawn(inner.take_client().run())),
117 conn: Some(tokio::spawn(inner.take_connection().run())),
118 inner,
119 }
120 }
121
122 /// Returns a handle to the client.
123 pub fn handle(&self) -> &Handle {
124 &self.inner
125 }
126
127 /// Returns a handle to the connection.
128 pub fn connection(&self) -> &ConnectionHandle {
129 self.inner.connection()
130 }
131
132 /// Shuts down the client and joins the client and connection tasks.
133 ///
134 /// This function cannot be canceled in a meaningful way after it has been polled once, because
135 /// it would panic if called (and polled) again. Ensure you call it only once and then poll it
136 /// to completion.
137 ///
138 /// # Panics
139 ///
140 /// This function will panic if the tasks have already been joined or attempted to join (see
141 /// notes above as well).
142 pub async fn join(&mut self) {
143 self.inner.shutdown();
144 self.join_client().await;
145 self.join_connection().await;
146 }
147
148 async fn join_client(&mut self) {
149 self.client
150 .take()
151 .expect("client already joined")
152 .await
153 .unwrap()
154 .unwrap();
155 }
156
157 async fn join_connection(&mut self) {
158 self.conn
159 .take()
160 .expect("connection already joined")
161 .await
162 .unwrap()
163 .unwrap();
164 }
165}
166
167impl Deref for TestClient {
168 type Target = Handle;
169
170 fn deref(&self) -> &Handle {
171 &self.inner
172 }
173}