axum_listener/multi.rs
1use crate::dual::{DualAddr, DualListener, DualStream, ToDualAddr};
2
3/// A listener that can accept connections on multiple underlying listeners simultaneously.
4///
5/// This struct allows you to bind to multiple addresses (TCP and/or Unix Domain Sockets)
6/// and accept connections from any of them. When multiple listeners are ready to accept
7/// connections, there is no guarantee which one will be selected first.
8///
9/// # Implementation Details
10///
11/// Internally, this uses [`futures::future::select_all`] to wait on all listeners
12/// simultaneously, ensuring efficient polling of all underlying listeners.
13///
14/// # Examples
15///
16/// ```rust,no_run
17/// # tokio_test::block_on(async {
18/// use axum::{Router, routing::get};
19/// use axum_listener::multi::MultiListener;
20///
21/// let router = Router::new().route("/", get(|| async { "Hello, World!" }));
22///
23/// // Bind to multiple TCP addresses
24/// let addresses = ["127.0.0.1:8080", "127.0.0.1:8081"];
25/// let listener = MultiListener::bind(addresses).await.unwrap();
26/// axum::serve(listener, router).await.unwrap();
27/// # });
28/// ```
29///
30/// ```rust,no_run
31/// # tokio_test::block_on(async {
32/// use axum_listener::multi::MultiListener;
33///
34/// // Mix TCP and Unix Domain Socket addresses (on Unix systems)
35/// # #[cfg(unix)] {
36/// let addresses = ["localhost:8080", "unix:/tmp/app.sock"];
37/// let listener = MultiListener::bind(addresses).await.unwrap();
38/// # }
39/// # });
40/// ```
41pub struct MultiListener {
42 /// The underlying listeners that this multi-listener manages
43 pub listeners: Vec<DualListener>,
44}
45
46/// An address collection representing the local addresses of a [`MultiListener`].
47///
48/// This struct contains all the addresses that the multi-listener is bound to.
49/// It's returned by the [`axum::serve::Listener::local_addr`] method implementation
50/// for [`MultiListener`].
51///
52/// # Examples
53///
54/// ```rust,no_run
55/// # tokio_test::block_on(async {
56/// use axum_listener::multi::MultiListener;
57/// use axum::serve::Listener;
58///
59/// let addresses = ["127.0.0.1:8080", "127.0.0.1:8081"];
60/// let listener = MultiListener::bind(addresses).await.unwrap();
61/// let multi_addr = listener.local_addr().unwrap();
62/// println!("Bound to {} addresses", multi_addr.addrs.len());
63/// # });
64/// ```
65#[derive(Debug, Clone)]
66pub struct MultiAddr {
67 /// The collection of addresses that the multi-listener is bound to
68 pub addrs: Vec<DualAddr>,
69}
70
71/// A stream collection for multi-listener connections.
72///
73/// This struct is currently not actively used in the public API but is provided
74/// for potential future extensions where multiple streams might need to be
75/// handled together.
76pub struct MultiStream {
77 /// A collection of dual streams
78 pub streams: Vec<DualStream>,
79}
80
81impl MultiListener {
82 /// Creates a new [`MultiListener`] bound to multiple addresses.
83 ///
84 /// This method attempts to bind to all provided addresses simultaneously.
85 /// If any of the bindings fail, the entire operation fails and returns an error.
86 /// All addresses must be successfully bound for this method to succeed.
87 ///
88 /// # Arguments
89 ///
90 /// * `addresses` - An iterable collection of addresses that implement [`ToDualAddr`]
91 ///
92 /// # Returns
93 ///
94 /// Returns a [`MultiListener`] bound to all specified addresses, or an error
95 /// if any binding fails.
96 ///
97 /// # Examples
98 ///
99 /// ```rust,no_run
100 /// # tokio_test::block_on(async {
101 /// use axum_listener::multi::MultiListener;
102 ///
103 /// // Bind to multiple TCP ports
104 /// let tcp_addresses = ["127.0.0.1:8080", "127.0.0.1:8081", "127.0.0.1:8082"];
105 /// let listener = MultiListener::bind(tcp_addresses).await.unwrap();
106 /// # });
107 /// ```
108 ///
109 /// ```rust,no_run
110 /// # tokio_test::block_on(async {
111 /// use axum_listener::multi::MultiListener;
112 ///
113 /// // Mix TCP and Unix Domain Socket addresses
114 /// # #[cfg(unix)] {
115 /// let mixed_addresses = ["localhost:8080", "unix:/tmp/app.sock"];
116 /// let listener = MultiListener::bind(mixed_addresses).await.unwrap();
117 /// # }
118 /// # });
119 /// ```
120 ///
121 /// # Errors
122 ///
123 /// This method can fail if:
124 /// - Any address format is invalid
125 /// - Any address is already in use
126 /// - Permission is denied for any requested address
127 /// - Unix Domain Sockets are not supported on the current platform
128 /// - The provided iterator is empty (no addresses to bind to)
129 pub async fn bind<I: IntoIterator<Item = A>, A: ToDualAddr>(
130 addresses: I,
131 ) -> Result<Self, std::io::Error> {
132 let listeners = futures::future::join_all(addresses.into_iter().map(DualListener::bind))
133 .await
134 .into_iter()
135 .map(|f| dbg!(f))
136 .collect::<Result<Vec<_>, _>>()?;
137
138 Ok(MultiListener { listeners })
139 }
140
141 /// Accepts a new incoming connection from any of the underlying listeners.
142 ///
143 /// This method waits for a connection to be established on any of the bound
144 /// listeners and returns the first one that becomes available. The selection
145 /// is non-deterministic when multiple listeners are ready simultaneously.
146 ///
147 /// # Returns
148 ///
149 /// Returns a tuple containing:
150 /// - [`DualStream`]: The stream for communicating with the client
151 /// - [`DualAddr`]: The address of the connected client
152 ///
153 /// # Examples
154 ///
155 /// ```rust,no_run
156 /// # tokio_test::block_on(async {
157 /// use axum_listener::multi::MultiListener;
158 ///
159 /// let addresses = ["127.0.0.1:8080", "127.0.0.1:8081"];
160 /// let listener = MultiListener::bind(addresses).await.unwrap();
161 ///
162 /// // Accept a connection from any of the bound addresses
163 /// let (stream, addr) = listener.accept().await.unwrap();
164 /// println!("Accepted connection from: {:?}", addr);
165 /// # });
166 /// ```
167 ///
168 /// # Errors
169 ///
170 /// This method can fail if there's an I/O error while accepting a connection
171 /// from any of the underlying listeners.
172 pub async fn accept(&self) -> Result<(DualStream, DualAddr), std::io::Error> {
173 let (out, idx, _rest) = futures::future::select_all(
174 self.listeners
175 .iter()
176 .map(|listener| listener._accept_unpin()),
177 )
178 .await;
179 tracing::trace!("Accepted connection on multi-listener from index {}", idx);
180 out
181 }
182}
183
184impl axum::serve::Listener for MultiListener {
185 type Io = DualStream;
186 type Addr = MultiAddr;
187
188 async fn accept(&mut self) -> (Self::Io, Self::Addr) {
189 let (out, _index, _rest) = futures::future::select_all(
190 self.listeners
191 .iter_mut()
192 .map(|listener| listener._accept_axum_unpin()),
193 )
194 .await;
195 tracing::trace!("Accepted connection on multi-listener from {}", _index);
196 (out.0, MultiAddr { addrs: vec![out.1] })
197 }
198
199 fn local_addr(&self) -> std::io::Result<Self::Addr> {
200 self.listeners
201 .iter()
202 .map(|listener| listener.local_addr())
203 .collect::<Result<Vec<_>, _>>()
204 .map(|addrs| MultiAddr { addrs })
205 }
206}
207
208const _: () = {
209 use super::MultiAddr;
210 use axum::extract::connect_info::Connected;
211 impl Connected<MultiAddr> for MultiAddr {
212 fn connect_info(remote_addr: MultiAddr) -> Self {
213 remote_addr
214 }
215 }
216 use axum::serve;
217
218 impl Connected<serve::IncomingStream<'_, MultiListener>> for MultiAddr {
219 fn connect_info(stream: serve::IncomingStream<'_, MultiListener>) -> Self {
220 stream.remote_addr().clone()
221 }
222 }
223};