tycho_network/util/
traits.rs

1use std::future::Future;
2
3use anyhow::Result;
4
5use crate::network::{Network, Peer};
6use crate::types::{PeerEvent, PeerId, Request, Response};
7
8pub trait NetworkExt {
9    fn query(
10        &self,
11        peer_id: &PeerId,
12        request: Request,
13    ) -> impl Future<Output = Result<Response>> + Send;
14
15    fn send(&self, peer_id: &PeerId, request: Request) -> impl Future<Output = Result<()>> + Send;
16}
17
18impl NetworkExt for Network {
19    async fn query(&self, peer_id: &PeerId, request: Request) -> Result<Response> {
20        on_connected_peer(self, Peer::rpc, peer_id, request).await
21    }
22
23    async fn send(&self, peer_id: &PeerId, request: Request) -> Result<()> {
24        on_connected_peer(self, Peer::send_message, peer_id, request).await
25    }
26}
27
28async fn on_connected_peer<T, F>(
29    network: &Network,
30    f: F,
31    peer_id: &PeerId,
32    request: Request,
33) -> Result<T>
34where
35    for<'a> F: PeerTask<'a, T>,
36{
37    use tokio::sync::broadcast::error::RecvError;
38
39    let mut peer_events = network.subscribe()?;
40
41    // Interact if already connected
42    if let Some(peer) = network.peer(peer_id) {
43        return f.call(&peer, request).await;
44    }
45
46    match network.known_peers().get(peer_id) {
47        // Initiate a connection of it is a known peer
48        Some(peer_info) => {
49            // TODO: try multiple addresses
50            let address = peer_info
51                .iter_addresses()
52                .next()
53                .cloned()
54                .expect("address list must have at least one item");
55
56            network.connect(address, peer_id).await?;
57        }
58        // Error otherwise
59        None => anyhow::bail!("trying to interact with an unknown peer: {peer_id}"),
60    }
61
62    loop {
63        match peer_events.recv().await {
64            Ok(PeerEvent::NewPeer(new_peer_id)) if new_peer_id == peer_id => {
65                if let Some(peer) = network.peer(peer_id) {
66                    return f.call(&peer, request).await;
67                }
68            }
69            Ok(_) => {}
70            Err(RecvError::Closed) => anyhow::bail!("network subscription closed"),
71            Err(RecvError::Lagged(_)) => {
72                peer_events = peer_events.resubscribe();
73
74                if let Some(peer) = network.peer(peer_id) {
75                    return f.call(&peer, request).await;
76                }
77            }
78        }
79
80        anyhow::ensure!(
81            network.known_peers().contains(peer_id),
82            "waiting for a connection to an unknown peer: {peer_id}",
83        );
84    }
85}
86
87trait PeerTask<'a, T> {
88    type Output: Future<Output = Result<T>> + 'a;
89
90    fn call(self, peer: &'a Peer, request: Request) -> Self::Output;
91}
92
93impl<'a, T, F, Fut> PeerTask<'a, T> for F
94where
95    F: FnOnce(&'a Peer, Request) -> Fut,
96    Fut: Future<Output = Result<T>> + 'a,
97{
98    type Output = Fut;
99
100    #[inline]
101    fn call(self, peer: &'a Peer, request: Request) -> Fut {
102        self(peer, request)
103    }
104}