1use ant_libp2p_core as libp2p_core;
22use ant_libp2p_plaintext as libp2p_plaintext;
23use ant_libp2p_swarm as libp2p_swarm;
24use ant_libp2p_tcp as libp2p_tcp;
25use ant_libp2p_yamux as libp2p_yamux;
26
27use std::{fmt::Debug, future::IntoFuture, time::Duration};
28
29use async_trait::async_trait;
30use futures::{
31 future::{BoxFuture, Either},
32 FutureExt, StreamExt,
33};
34use libp2p_core::{multiaddr::Protocol, Multiaddr};
35use libp2p_identity::PeerId;
36use libp2p_swarm::{
37 dial_opts::{DialOpts, PeerCondition},
38 NetworkBehaviour, Swarm, SwarmEvent,
39};
40
41#[async_trait]
44pub trait SwarmExt {
45 type NB: NetworkBehaviour;
46
47 #[cfg(feature = "async-std")]
54 fn new_ephemeral(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
55 where
56 Self: Sized;
57
58 #[cfg(feature = "tokio")]
65 fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
66 where
67 Self: Sized;
68
69 async fn connect<T>(&mut self, other: &mut Swarm<T>)
77 where
78 T: NetworkBehaviour + Send,
79 <T as NetworkBehaviour>::ToSwarm: Debug;
80
81 async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId;
90
91 async fn wait<E, P>(&mut self, predicate: P) -> E
93 where
94 P: Fn(SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>) -> Option<E>,
95 P: Send;
96
97 fn listen(&mut self) -> ListenFuture<&mut Self>;
102
103 async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm>;
107
108 async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm;
112
113 async fn loop_on_next(self);
114}
115
116pub async fn drive<
153 TBehaviour1,
154 const NUM_EVENTS_SWARM_1: usize,
155 Out1,
156 TBehaviour2,
157 const NUM_EVENTS_SWARM_2: usize,
158 Out2,
159>(
160 swarm1: &mut Swarm<TBehaviour2>,
161 swarm2: &mut Swarm<TBehaviour1>,
162) -> ([Out1; NUM_EVENTS_SWARM_1], [Out2; NUM_EVENTS_SWARM_2])
163where
164 TBehaviour2: NetworkBehaviour + Send,
165 TBehaviour2::ToSwarm: Debug,
166 TBehaviour1: NetworkBehaviour + Send,
167 TBehaviour1::ToSwarm: Debug,
168 SwarmEvent<TBehaviour2::ToSwarm>: TryIntoOutput<Out1>,
169 SwarmEvent<TBehaviour1::ToSwarm>: TryIntoOutput<Out2>,
170 Out1: Debug,
171 Out2: Debug,
172{
173 let mut res1 = Vec::<Out1>::with_capacity(NUM_EVENTS_SWARM_1);
174 let mut res2 = Vec::<Out2>::with_capacity(NUM_EVENTS_SWARM_2);
175
176 while res1.len() < NUM_EVENTS_SWARM_1 || res2.len() < NUM_EVENTS_SWARM_2 {
177 match futures::future::select(swarm1.next_swarm_event(), swarm2.next_swarm_event()).await {
178 Either::Left((o1, _)) => {
179 if let Ok(o1) = o1.try_into_output() {
180 res1.push(o1);
181 }
182 }
183 Either::Right((o2, _)) => {
184 if let Ok(o2) = o2.try_into_output() {
185 res2.push(o2);
186 }
187 }
188 }
189 }
190
191 (
192 res1.try_into().unwrap_or_else(|res1: Vec<_>| {
193 panic!(
194 "expected {NUM_EVENTS_SWARM_1} items from first swarm but got {}",
195 res1.len()
196 )
197 }),
198 res2.try_into().unwrap_or_else(|res2: Vec<_>| {
199 panic!(
200 "expected {NUM_EVENTS_SWARM_2} items from second swarm but got {}",
201 res2.len()
202 )
203 }),
204 )
205}
206
207pub trait TryIntoOutput<O>: Sized {
208 fn try_into_output(self) -> Result<O, Self>;
209}
210
211impl<O> TryIntoOutput<O> for SwarmEvent<O> {
212 fn try_into_output(self) -> Result<O, Self> {
213 self.try_into_behaviour_event()
214 }
215}
216impl<TBehaviourOutEvent> TryIntoOutput<SwarmEvent<TBehaviourOutEvent>>
217 for SwarmEvent<TBehaviourOutEvent>
218{
219 fn try_into_output(self) -> Result<SwarmEvent<TBehaviourOutEvent>, Self> {
220 Ok(self)
221 }
222}
223
224#[async_trait]
225impl<B> SwarmExt for Swarm<B>
226where
227 B: NetworkBehaviour + Send,
228 <B as NetworkBehaviour>::ToSwarm: Debug,
229{
230 type NB = B;
231
232 #[cfg(feature = "async-std")]
233 fn new_ephemeral(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
234 where
235 Self: Sized,
236 {
237 use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
238 use libp2p_identity::Keypair;
239
240 let identity = Keypair::generate_ed25519();
241 let peer_id = PeerId::from(identity.public());
242
243 let transport = MemoryTransport::default()
244 .or_transport(libp2p_tcp::async_io::Transport::default())
245 .upgrade(Version::V1)
246 .authenticate(libp2p_plaintext::Config::new(&identity))
247 .multiplex(libp2p_yamux::Config::default())
248 .timeout(Duration::from_secs(20))
249 .boxed();
250
251 Swarm::new(
252 transport,
253 behaviour_fn(identity),
254 peer_id,
255 libp2p_swarm::Config::with_async_std_executor()
256 .with_idle_connection_timeout(Duration::from_secs(5)),
262 )
263 }
264
265 #[cfg(feature = "tokio")]
266 fn new_ephemeral_tokio(behaviour_fn: impl FnOnce(libp2p_identity::Keypair) -> Self::NB) -> Self
267 where
268 Self: Sized,
269 {
270 use libp2p_core::{transport::MemoryTransport, upgrade::Version, Transport as _};
271 use libp2p_identity::Keypair;
272
273 let identity = Keypair::generate_ed25519();
274 let peer_id = PeerId::from(identity.public());
275
276 let transport = MemoryTransport::default()
277 .or_transport(libp2p_tcp::tokio::Transport::default())
278 .upgrade(Version::V1)
279 .authenticate(libp2p_plaintext::Config::new(&identity))
280 .multiplex(libp2p_yamux::Config::default())
281 .timeout(Duration::from_secs(20))
282 .boxed();
283
284 Swarm::new(
285 transport,
286 behaviour_fn(identity),
287 peer_id,
288 libp2p_swarm::Config::with_tokio_executor()
289 .with_idle_connection_timeout(Duration::from_secs(5)), )
295 }
296
297 async fn connect<T>(&mut self, other: &mut Swarm<T>)
298 where
299 T: NetworkBehaviour + Send,
300 <T as NetworkBehaviour>::ToSwarm: Debug,
301 {
302 let external_addresses = other.external_addresses().cloned().collect();
303
304 let dial_opts = DialOpts::peer_id(*other.local_peer_id())
305 .addresses(external_addresses)
306 .condition(PeerCondition::Always)
307 .build();
308
309 self.dial(dial_opts).unwrap();
310
311 let mut dialer_done = false;
312 let mut listener_done = false;
313
314 loop {
315 match futures::future::select(self.next_swarm_event(), other.next_swarm_event()).await {
316 Either::Left((SwarmEvent::ConnectionEstablished { .. }, _)) => {
317 dialer_done = true;
318 }
319 Either::Right((SwarmEvent::ConnectionEstablished { .. }, _)) => {
320 listener_done = true;
321 }
322 Either::Left((other, _)) => {
323 tracing::debug!(
324 dialer=?other,
325 "Ignoring event from dialer"
326 );
327 }
328 Either::Right((other, _)) => {
329 tracing::debug!(
330 listener=?other,
331 "Ignoring event from listener"
332 );
333 }
334 }
335
336 if dialer_done && listener_done {
337 return;
338 }
339 }
340 }
341
342 async fn dial_and_wait(&mut self, addr: Multiaddr) -> PeerId {
343 self.dial(addr.clone()).unwrap();
344
345 self.wait(|e| match e {
346 SwarmEvent::ConnectionEstablished {
347 endpoint, peer_id, ..
348 } => (endpoint.get_remote_address() == &addr).then_some(peer_id),
349 other => {
350 tracing::debug!(
351 dialer=?other,
352 "Ignoring event from dialer"
353 );
354 None
355 }
356 })
357 .await
358 }
359
360 async fn wait<E, P>(&mut self, predicate: P) -> E
361 where
362 P: Fn(SwarmEvent<<B as NetworkBehaviour>::ToSwarm>) -> Option<E>,
363 P: Send,
364 {
365 loop {
366 let event = self.next_swarm_event().await;
367 if let Some(e) = predicate(event) {
368 break e;
369 }
370 }
371 }
372
373 fn listen(&mut self) -> ListenFuture<&mut Self> {
374 ListenFuture {
375 add_memory_external: false,
376 add_tcp_external: false,
377 swarm: self,
378 }
379 }
380
381 async fn next_swarm_event(&mut self) -> SwarmEvent<<Self::NB as NetworkBehaviour>::ToSwarm> {
382 match futures::future::select(
383 futures_timer::Delay::new(Duration::from_secs(10)),
384 self.select_next_some(),
385 )
386 .await
387 {
388 Either::Left(((), _)) => panic!("Swarm did not emit an event within 10s"),
389 Either::Right((event, _)) => {
390 tracing::trace!(?event);
391
392 event
393 }
394 }
395 }
396
397 async fn next_behaviour_event(&mut self) -> <Self::NB as NetworkBehaviour>::ToSwarm {
398 loop {
399 if let Ok(event) = self.next_swarm_event().await.try_into_behaviour_event() {
400 return event;
401 }
402 }
403 }
404
405 async fn loop_on_next(mut self) {
406 while let Some(event) = self.next().await {
407 tracing::trace!(?event);
408 }
409 }
410}
411
412pub struct ListenFuture<S> {
413 add_memory_external: bool,
414 add_tcp_external: bool,
415 swarm: S,
416}
417
418impl<S> ListenFuture<S> {
419 pub fn with_memory_addr_external(mut self) -> Self {
426 self.add_memory_external = true;
427
428 self
429 }
430
431 pub fn with_tcp_addr_external(mut self) -> Self {
438 self.add_tcp_external = true;
439
440 self
441 }
442}
443
444impl<'s, B> IntoFuture for ListenFuture<&'s mut Swarm<B>>
445where
446 B: NetworkBehaviour + Send,
447 <B as NetworkBehaviour>::ToSwarm: Debug,
448{
449 type Output = (Multiaddr, Multiaddr);
450 type IntoFuture = BoxFuture<'s, Self::Output>;
451
452 fn into_future(self) -> Self::IntoFuture {
453 async move {
454 let swarm = self.swarm;
455
456 let memory_addr_listener_id = swarm.listen_on(Protocol::Memory(0).into()).unwrap();
457
458 let memory_multiaddr = swarm
460 .wait(|e| match e {
461 SwarmEvent::NewListenAddr {
462 address,
463 listener_id,
464 } => (listener_id == memory_addr_listener_id).then_some(address),
465 other => {
466 panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
467 }
468 })
469 .await;
470
471 let tcp_addr_listener_id = swarm
472 .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
473 .unwrap();
474
475 let tcp_multiaddr = swarm
476 .wait(|e| match e {
477 SwarmEvent::NewListenAddr {
478 address,
479 listener_id,
480 } => (listener_id == tcp_addr_listener_id).then_some(address),
481 other => {
482 panic!("Unexpected event while waiting for `NewListenAddr`: {other:?}")
483 }
484 })
485 .await;
486
487 if self.add_memory_external {
488 swarm.add_external_address(memory_multiaddr.clone());
489 }
490 if self.add_tcp_external {
491 swarm.add_external_address(tcp_multiaddr.clone());
492 }
493
494 (memory_multiaddr, tcp_multiaddr)
495 }
496 .boxed()
497 }
498}