axum_listener/
multi.rs

1use crate::dual::{DualAddr, DualListener, DualStream, ToDualAddr};
2
3/// A listener that can accept connections on multiple underlying listeners simultaneously.
4///
5/// This struct allows you to bind to multiple addresses (TCP and/or Unix Domain Sockets)
6/// and accept connections from any of them. When multiple listeners are ready to accept
7/// connections, there is no guarantee which one will be selected first.
8///
9/// # Implementation Details
10///
11/// Internally, this uses [`futures::future::select_all`] to wait on all listeners
12/// simultaneously, ensuring efficient polling of all underlying listeners.
13///
14/// # Examples
15///
16/// ```rust,no_run
17/// # tokio_test::block_on(async {
18/// use axum::{Router, routing::get};
19/// use axum_listener::multi::MultiListener;
20///
21/// let router = Router::new().route("/", get(|| async { "Hello, World!" }));
22///
23/// // Bind to multiple TCP addresses
24/// let addresses = ["127.0.0.1:8080", "127.0.0.1:8081"];
25/// let listener = MultiListener::bind(addresses).await.unwrap();
26/// axum::serve(listener, router).await.unwrap();
27/// # });
28/// ```
29///
30/// ```rust,no_run
31/// # tokio_test::block_on(async {
32/// use axum_listener::multi::MultiListener;
33///
34/// // Mix TCP and Unix Domain Socket addresses (on Unix systems)
35/// # #[cfg(unix)] {
36/// let addresses = ["localhost:8080", "unix:/tmp/app.sock"];
37/// let listener = MultiListener::bind(addresses).await.unwrap();
38/// # }
39/// # });
40/// ```
41pub struct MultiListener {
42    /// The underlying listeners that this multi-listener manages
43    pub listeners: Vec<DualListener>,
44}
45
46/// An address collection representing the local addresses of a [`MultiListener`].
47///
48/// This struct contains all the addresses that the multi-listener is bound to.
49/// It's returned by the [`axum::serve::Listener::local_addr`] method implementation
50/// for [`MultiListener`].
51///
52/// # Examples
53///
54/// ```rust,no_run
55/// # tokio_test::block_on(async {
56/// use axum_listener::multi::MultiListener;
57/// use axum::serve::Listener;
58///
59/// let addresses = ["127.0.0.1:8080", "127.0.0.1:8081"];
60/// let listener = MultiListener::bind(addresses).await.unwrap();
61/// let multi_addr = listener.local_addr().unwrap();
62/// println!("Bound to {} addresses", multi_addr.addrs.len());
63/// # });
64/// ```
65#[derive(Debug, Clone)]
66pub struct MultiAddr {
67    /// The collection of addresses that the multi-listener is bound to
68    pub addrs: Vec<DualAddr>,
69}
70
71/// A stream collection for multi-listener connections.
72///
73/// This struct is currently not actively used in the public API but is provided
74/// for potential future extensions where multiple streams might need to be
75/// handled together.
76pub struct MultiStream {
77    /// A collection of dual streams
78    pub streams: Vec<DualStream>,
79}
80
81impl MultiListener {
82    /// Creates a new [`MultiListener`] bound to multiple addresses.
83    ///
84    /// This method attempts to bind to all provided addresses simultaneously.
85    /// If any of the bindings fail, the entire operation fails and returns an error.
86    /// All addresses must be successfully bound for this method to succeed.
87    ///
88    /// # Arguments
89    ///
90    /// * `addresses` - An iterable collection of addresses that implement [`ToDualAddr`]
91    ///
92    /// # Returns
93    ///
94    /// Returns a [`MultiListener`] bound to all specified addresses, or an error
95    /// if any binding fails.
96    ///
97    /// # Examples
98    ///
99    /// ```rust,no_run
100    /// # tokio_test::block_on(async {
101    /// use axum_listener::multi::MultiListener;
102    ///
103    /// // Bind to multiple TCP ports
104    /// let tcp_addresses = ["127.0.0.1:8080", "127.0.0.1:8081", "127.0.0.1:8082"];
105    /// let listener = MultiListener::bind(tcp_addresses).await.unwrap();
106    /// # });
107    /// ```
108    ///
109    /// ```rust,no_run
110    /// # tokio_test::block_on(async {
111    /// use axum_listener::multi::MultiListener;
112    ///
113    /// // Mix TCP and Unix Domain Socket addresses
114    /// # #[cfg(unix)] {
115    /// let mixed_addresses = ["localhost:8080", "unix:/tmp/app.sock"];
116    /// let listener = MultiListener::bind(mixed_addresses).await.unwrap();
117    /// # }
118    /// # });
119    /// ```
120    ///
121    /// # Errors
122    ///
123    /// This method can fail if:
124    /// - Any address format is invalid
125    /// - Any address is already in use
126    /// - Permission is denied for any requested address
127    /// - Unix Domain Sockets are not supported on the current platform
128    /// - The provided iterator is empty (no addresses to bind to)
129    pub async fn bind<I: IntoIterator<Item = A>, A: ToDualAddr>(
130        addresses: I,
131    ) -> Result<Self, std::io::Error> {
132        let listeners = futures::future::join_all(addresses.into_iter().map(DualListener::bind))
133            .await
134            .into_iter()
135            .map(|f| dbg!(f))
136            .collect::<Result<Vec<_>, _>>()?;
137
138        Ok(MultiListener { listeners })
139    }
140
141    /// Accepts a new incoming connection from any of the underlying listeners.
142    ///
143    /// This method waits for a connection to be established on any of the bound
144    /// listeners and returns the first one that becomes available. The selection
145    /// is non-deterministic when multiple listeners are ready simultaneously.
146    ///
147    /// # Returns
148    ///
149    /// Returns a tuple containing:
150    /// - [`DualStream`]: The stream for communicating with the client
151    /// - [`DualAddr`]: The address of the connected client
152    ///
153    /// # Examples
154    ///
155    /// ```rust,no_run
156    /// # tokio_test::block_on(async {
157    /// use axum_listener::multi::MultiListener;
158    ///
159    /// let addresses = ["127.0.0.1:8080", "127.0.0.1:8081"];
160    /// let listener = MultiListener::bind(addresses).await.unwrap();
161    ///
162    /// // Accept a connection from any of the bound addresses
163    /// let (stream, addr) = listener.accept().await.unwrap();
164    /// println!("Accepted connection from: {:?}", addr);
165    /// # });
166    /// ```
167    ///
168    /// # Errors
169    ///
170    /// This method can fail if there's an I/O error while accepting a connection
171    /// from any of the underlying listeners.
172    pub async fn accept(&self) -> Result<(DualStream, DualAddr), std::io::Error> {
173        let (out, idx, _rest) = futures::future::select_all(
174            self.listeners
175                .iter()
176                .map(|listener| listener._accept_unpin()),
177        )
178        .await;
179        tracing::trace!("Accepted connection on multi-listener from index {}", idx);
180        out
181    }
182}
183
184impl axum::serve::Listener for MultiListener {
185    type Io = DualStream;
186    type Addr = MultiAddr;
187
188    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
189        let (out, _index, _rest) = futures::future::select_all(
190            self.listeners
191                .iter_mut()
192                .map(|listener| listener._accept_axum_unpin()),
193        )
194        .await;
195        tracing::trace!("Accepted connection on multi-listener from {}", _index);
196        (out.0, MultiAddr { addrs: vec![out.1] })
197    }
198
199    fn local_addr(&self) -> std::io::Result<Self::Addr> {
200        self.listeners
201            .iter()
202            .map(|listener| listener.local_addr())
203            .collect::<Result<Vec<_>, _>>()
204            .map(|addrs| MultiAddr { addrs })
205    }
206}
207
208const _: () = {
209    use super::MultiAddr;
210    use axum::extract::connect_info::Connected;
211    impl Connected<MultiAddr> for MultiAddr {
212        fn connect_info(remote_addr: MultiAddr) -> Self {
213            remote_addr
214        }
215    }
216    use axum::serve;
217
218    impl Connected<serve::IncomingStream<'_, MultiListener>> for MultiAddr {
219        fn connect_info(stream: serve::IncomingStream<'_, MultiListener>) -> Self {
220            stream.remote_addr().clone()
221        }
222    }
223};