ant_libp2p_swarm_test/
lib.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use ant_libp2p_core as libp2p_core;
22use ant_libp2p_plaintext as libp2p_plaintext;
23use ant_libp2p_swarm as libp2p_swarm;
24use ant_libp2p_tcp as libp2p_tcp;
25use ant_libp2p_yamux as libp2p_yamux;
26
27use std::{fmt::Debug, future::IntoFuture, time::Duration};
28
29use async_trait::async_trait;
30use futures::{
31    future::{BoxFuture, Either},
32    FutureExt, StreamExt,
33};
34use libp2p_core::{multiaddr::Protocol, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37    dial_opts::{DialOpts, PeerCondition},
38    NetworkBehaviour, Swarm, SwarmEvent,
39};
40
41/// An extension trait for [`Swarm`] that makes it
42/// easier to set up a network of [`Swarm`]s for tests.
43#[async_trait]
44pub trait SwarmExt {
45    type NB: NetworkBehaviour;
46
47    /// Create a new [`Swarm`] with an ephemeral identity and the `async-std` runtime.
48    ///
49    /// The swarm will use a [`libp2p_core::transport::MemoryTransport`] together with a
50    /// [`libp2p_plaintext::Config`] authentication layer and [`libp2p_yamux::Config`] as the
51    /// multiplexer. However, these details should not be relied
52    /// upon by the test and may change at any time.
53    #[cfg(feature = "async-std")]
54    fn new_ephemeral(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
55    where
56        Self: Sized;
57
58    /// Create a new [`Swarm`] with an ephemeral identity and the `tokio` runtime.
59    ///
60    /// The swarm will use a [`libp2p_core::transport::MemoryTransport`] together with a
61    /// [`libp2p_plaintext::Config`] authentication layer and [`libp2p_yamux::Config`] as the
62    /// multiplexer. However, these details should not be relied
63    /// upon by the test and may change at any time.
64    #[cfg(feature = "tokio")]
65    fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
66    where
67        Self: Sized;
68
69    /// Establishes a connection to the given [`Swarm`], polling both of them until the connection
70    /// is established.
71    ///
72    /// This will take addresses from the `other` [`Swarm`] via [`Swarm::external_addresses`].
73    /// By default, this iterator will not yield any addresses.
74    /// To add listen addresses as external addresses, use
75    /// [`ListenFuture::with_memory_addr_external`] or [`ListenFuture::with_tcp_addr_external`].
76    async fn connect<T>(&mut self, other: &mut Swarm<T>)
77    where
78        T: NetworkBehaviour + Send,
79        <T as NetworkBehaviour>::ToSwarm: Debug;
80
81    /// Dial the provided address and wait until a connection has been established.
82    ///
83    /// In a normal test scenario, you should prefer [`SwarmExt::connect`] but that is not always
84    /// possible. This function only abstracts away the "dial and wait for
85    /// `ConnectionEstablished` event" part.
86    ///
87    /// Because we don't have access to the other [`Swarm`],
88    /// we can't guarantee that it makes progress.
89    async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId;
90
91    /// Wait for specified condition to return `Some`.
92    async fn wait<E, P>(&mut self, predicate: P) -> E
93    where
94        P: Fn(SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>) -> Option<E>,
95        P: Send;
96
97    /// Listens for incoming connections, polling the [`Swarm`] until the
98    /// transport is ready to accept connections.
99    ///
100    /// The first address is for the memory transport, the second one for the TCP transport.
101    fn listen(&mut self) -> ListenFuture<&mut Self>;
102
103    /// Returns the next [`SwarmEvent`] or times out after 10 seconds.
104    ///
105    /// If the 10s timeout does not fit your usecase, please fall back to `StreamExt::next`.
106    async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>;
107
108    /// Returns the next behaviour event or times out after 10 seconds.
109    ///
110    /// If the 10s timeout does not fit your usecase, please fall back to `StreamExt::next`.
111    async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm;
112
113    async fn loop_on_next(self);
114}
115
116/// Drives two [`Swarm`]s until a certain number of events are emitted.
117///
118/// # Usage
119///
120/// ## Number of events
121///
122/// The number of events is configured via const generics based on the array size of the return
123/// type. This allows the compiler to infer how many events you are expecting based on how you use
124/// this function. For example, if you expect the first [`Swarm`] to emit 2 events, you should
125/// assign the first variable of the returned tuple value to an array of size 2. This works
126/// especially well if you directly pattern-match on the return value.
127///
128/// ## Type of event
129///
130/// This function utilizes the [`TryIntoOutput`] trait.
131/// Similar as to the number of expected events, the type of event is inferred based on your usage.
132/// If you match against a [`SwarmEvent`], the first [`SwarmEvent`] will be returned.
133/// If you match against your [`NetworkBehaviour::ToSwarm`] type, [`SwarmEvent`]s which are not
134/// [`SwarmEvent::Behaviour`] will be skipped until the [`Swarm`] returns a behaviour event.
135///
136/// You can implement the [`TryIntoOutput`] for any other type to further customize this behaviour.
137///
138/// # Difference to [`futures::future::join`]
139///
140/// This function is similar to joining two futures with two crucial differences:
141/// 1. As described above, it allows you to obtain more than a single event.
142/// 2. More importantly, it will continue to poll the [`Swarm`]s **even if they already has emitted
143///    all expected events**.
144///
145/// Especially (2) is crucial for our usage of this function.
146/// If a [`Swarm`] is not polled, nothing within it makes progress.
147/// This can "starve" the other swarm which for example may wait for another message to be sent on a
148/// connection.
149///
150/// Using [`drive`] instead of [`futures::future::join`] ensures that a [`Swarm`] continues to be
151/// polled, even after it emitted its events.
152pub async fn drive<
153    TBehaviour1,
154    const NUM_EVENTS_SWARM_1: usize,
155    Out1,
156    TBehaviour2,
157    const NUM_EVENTS_SWARM_2: usize,
158    Out2,
159>(
160    swarm1: &mut Swarm<TBehaviour2>,
161    swarm2: &mut Swarm<TBehaviour1>,
162) -> ([Out1; NUM_EVENTS_SWARM_1], [Out2; NUM_EVENTS_SWARM_2])
163where
164    TBehaviour2: NetworkBehaviour + Send,
165    TBehaviour2::ToSwarm: Debug,
166    TBehaviour1: NetworkBehaviour + Send,
167    TBehaviour1::ToSwarm: Debug,
168    SwarmEvent<TBehaviour2::ToSwarm>: TryIntoOutput<Out1>,
169    SwarmEvent<TBehaviour1::ToSwarm>: TryIntoOutput<Out2>,
170    Out1: Debug,
171    Out2: Debug,
172{
173    let mut res1 = Vec::<Out1>::with_capacity(NUM_EVENTS_SWARM_1);
174    let mut res2 = Vec::<Out2>::with_capacity(NUM_EVENTS_SWARM_2);
175
176    while res1.len() < NUM_EVENTS_SWARM_1 || res2.len() < NUM_EVENTS_SWARM_2 {
177        match futures::future::select(swarm1.next_swarm_event(), swarm2.next_swarm_event()).await {
178            Either::Left((o1, _)) => {
179                if let Ok(o1) = o1.try_into_output() {
180                    res1.push(o1);
181                }
182            }
183            Either::Right((o2, _)) => {
184                if let Ok(o2) = o2.try_into_output() {
185                    res2.push(o2);
186                }
187            }
188        }
189    }
190
191    (
192        res1.try_into().unwrap_or_else(|res1: Vec<_>| {
193            panic!(
194                "expected {NUM_EVENTS_SWARM_1} items from first swarm but got {}",
195                res1.len()
196            )
197        }),
198        res2.try_into().unwrap_or_else(|res2: Vec<_>| {
199            panic!(
200                "expected {NUM_EVENTS_SWARM_2} items from second swarm but got {}",
201                res2.len()
202            )
203        }),
204    )
205}
206
207pub trait TryIntoOutput<O>: Sized {
208    fn try_into_output(self) -> Result<O, Self>;
209}
210
211impl<O> TryIntoOutput<O> for SwarmEvent<O> {
212    fn try_into_output(self) -> Result<O, Self> {
213        self.try_into_behaviour_event()
214    }
215}
216impl<TBehaviourOutEvent> TryIntoOutput<SwarmEvent<TBehaviourOutEvent>>
217    for SwarmEvent<TBehaviourOutEvent>
218{
219    fn try_into_output(self) -> Result<SwarmEvent<TBehaviourOutEvent>, Self> {
220        Ok(self)
221    }
222}
223
224#[async_trait]
225impl<B> SwarmExt for Swarm<B>
226where
227    B: NetworkBehaviour + Send,
228    <B as NetworkBehaviour>::ToSwarm: Debug,
229{
230    type NB = B;
231
232    #[cfg(feature = "async-std")]
233    fn new_ephemeral(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
234    where
235        Self: Sized,
236    {
237        use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
238        use libp2p_identity::Keypair;
239
240        let identity = Keypair::generate_ed25519();
241        let peer_id = PeerId::from(identity.public());
242
243        let transport = MemoryTransport::default()
244            .or_transport(libp2p_tcp::async_io::Transport::default())
245            .upgrade(Version::V1)
246            .authenticate(libp2p_plaintext::Config::new(&identity))
247            .multiplex(libp2p_yamux::Config::default())
248            .timeout(Duration::from_secs(20))
249            .boxed();
250
251        Swarm::new(
252            transport,
253            behaviour_fn(identity),
254            peer_id,
255            libp2p_swarm::Config::with_async_std_executor()
256                // Some tests need
257                // connections to be kept
258                // alive beyond what the
259                // individual behaviour
260                // configures.,
261                .with_idle_connection_timeout(Duration::from_secs(5)),
262        )
263    }
264
265    #[cfg(feature = "tokio")]
266    fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
267    where
268        Self: Sized,
269    {
270        use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
271        use libp2p_identity::Keypair;
272
273        let identity = Keypair::generate_ed25519();
274        let peer_id = PeerId::from(identity.public());
275
276        let transport = MemoryTransport::default()
277            .or_transport(libp2p_tcp::tokio::Transport::default())
278            .upgrade(Version::V1)
279            .authenticate(libp2p_plaintext::Config::new(&identity))
280            .multiplex(libp2p_yamux::Config::default())
281            .timeout(Duration::from_secs(20))
282            .boxed();
283
284        Swarm::new(
285            transport,
286            behaviour_fn(identity),
287            peer_id,
288            libp2p_swarm::Config::with_tokio_executor()
289                .with_idle_connection_timeout(Duration::from_secs(5)), /* Some tests need
290                                                                        * connections to be kept
291                                                                        * alive beyond what the
292                                                                        * individual behaviour
293                                                                        * configures., */
294        )
295    }
296
297    async fn connect<T>(&mut self, other: &mut Swarm<T>)
298    where
299        T: NetworkBehaviour + Send,
300        <T as NetworkBehaviour>::ToSwarm: Debug,
301    {
302        let external_addresses = other.external_addresses().cloned().collect();
303
304        let dial_opts = DialOpts::peer_id(*other.local_peer_id())
305            .addresses(external_addresses)
306            .condition(PeerCondition::Always)
307            .build();
308
309        self.dial(dial_opts).unwrap();
310
311        let mut dialer_done = false;
312        let mut listener_done = false;
313
314        loop {
315            match futures::future::select(self.next_swarm_event(), other.next_swarm_event()).await {
316                Either::Left((SwarmEvent::ConnectionEstablished { .. }, _)) => {
317                    dialer_done = true;
318                }
319                Either::Right((SwarmEvent::ConnectionEstablished { .. }, _)) => {
320                    listener_done = true;
321                }
322                Either::Left((other, _)) => {
323                    tracing::debug!(
324                        dialer=?other,
325                        "Ignoring event from dialer"
326                    );
327                }
328                Either::Right((other, _)) => {
329                    tracing::debug!(
330                        listener=?other,
331                        "Ignoring event from listener"
332                    );
333                }
334            }
335
336            if dialer_done && listener_done {
337                return;
338            }
339        }
340    }
341
342    async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId {
343        self.dial(addr.clone()).unwrap();
344
345        self.wait(|e| match e {
346            SwarmEvent::ConnectionEstablished {
347                endpoint, peer_id, ..
348            } => (endpoint.get_remote_address() == &addr).then_some(peer_id),
349            other => {
350                tracing::debug!(
351                    dialer=?other,
352                    "Ignoring event from dialer"
353                );
354                None
355            }
356        })
357        .await
358    }
359
360    async fn wait<E, P>(&mut self, predicate: P) -> E
361    where
362        P: Fn(SwarmEvent<<B as NetworkBehaviour>::ToSwarm>) -> Option<E>,
363        P: Send,
364    {
365        loop {
366            let event = self.next_swarm_event().await;
367            if let Some(e) = predicate(event) {
368                break e;
369            }
370        }
371    }
372
373    fn listen(&mut self) -> ListenFuture<&mut Self> {
374        ListenFuture {
375            add_memory_external: false,
376            add_tcp_external: false,
377            swarm: self,
378        }
379    }
380
381    async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm> {
382        match futures::future::select(
383            futures_timer::Delay::new(Duration::from_secs(10)),
384            self.select_next_some(),
385        )
386        .await
387        {
388            Either::Left(((), _)) => panic!("Swarm did not emit an event within 10s"),
389            Either::Right((event, _)) => {
390                tracing::trace!(?event);
391
392                event
393            }
394        }
395    }
396
397    async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm {
398        loop {
399            if let Ok(event) = self.next_swarm_event().await.try_into_behaviour_event() {
400                return event;
401            }
402        }
403    }
404
405    async fn loop_on_next(mut self) {
406        while let Some(event) = self.next().await {
407            tracing::trace!(?event);
408        }
409    }
410}
411
412pub struct ListenFuture<S> {
413    add_memory_external: bool,
414    add_tcp_external: bool,
415    swarm: S,
416}
417
418impl<S> ListenFuture<S> {
419    /// Adds the memory address we are starting to listen on as an external address using
420    /// [`Swarm::add_external_address`].
421    ///
422    /// This is typically "safe" for tests because within a process, memory addresses are "globally"
423    /// reachable. However, some tests depend on which addresses are external and need this to
424    /// be configurable so it is not a good default.
425    pub fn with_memory_addr_external(mut self) -> Self {
426        self.add_memory_external = true;
427
428        self
429    }
430
431    /// Adds the TCP address we are starting to listen on as an external address using
432    /// [`Swarm::add_external_address`].
433    ///
434    /// This is typically "safe" for tests because on the same machine, 127.0.0.1 is reachable for
435    /// other [`Swarm`]s. However, some tests depend on which addresses are external and need
436    /// this to be configurable so it is not a good default.
437    pub fn with_tcp_addr_external(mut self) -> Self {
438        self.add_tcp_external = true;
439
440        self
441    }
442}
443
444impl<'s, B> IntoFuture for ListenFuture<&'s mut Swarm<B>>
445where
446    B: NetworkBehaviour + Send,
447    <B as NetworkBehaviour>::ToSwarm: Debug,
448{
449    type Output = (Multiaddr, Multiaddr);
450    type IntoFuture = BoxFuture<'s, Self::Output>;
451
452    fn into_future(self) -> Self::IntoFuture {
453        async move {
454            let swarm = self.swarm;
455
456            let memory_addr_listener_id = swarm.listen_on(Protocol::Memory(0).into()).unwrap();
457
458            // block until we are actually listening
459            let memory_multiaddr = swarm
460                .wait(|e| match e {
461                    SwarmEvent::NewListenAddr {
462                        address,
463                        listener_id,
464                    } => (listener_id == memory_addr_listener_id).then_some(address),
465                    other => {
466                        panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
467                    }
468                })
469                .await;
470
471            let tcp_addr_listener_id = swarm
472                .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
473                .unwrap();
474
475            let tcp_multiaddr = swarm
476                .wait(|e| match e {
477                    SwarmEvent::NewListenAddr {
478                        address,
479                        listener_id,
480                    } => (listener_id == tcp_addr_listener_id).then_some(address),
481                    other => {
482                        panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
483                    }
484                })
485                .await;
486
487            if self.add_memory_external {
488                swarm.add_external_address(memory_multiaddr.clone());
489            }
490            if self.add_tcp_external {
491                swarm.add_external_address(tcp_multiaddr.clone());
492            }
493
494            (memory_multiaddr, tcp_multiaddr)
495        }
496        .boxed()
497    }
498}