Skip to main content

forest/libp2p_bitswap/
request_manager.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! Request manager implementation that is optimized for `filecoin` network
5//! usage
6
7use std::{
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use crate::cid_collections::CidHashMap;
13use ahash::{HashSet, HashSetExt};
14use cid::Cid;
15use flume::TryRecvError;
16use futures::StreamExt;
17use libp2p::PeerId;
18use parking_lot::RwLock;
19
20use crate::libp2p_bitswap::{event_handlers::*, *};
21
22const BITSWAP_BLOCK_REQUEST_INTERVAL: Duration = Duration::from_millis(500);
23
24pub type ValidatePeerCallback = dyn Fn(PeerId) -> bool + Send + Sync;
25
26#[derive(Debug, Clone)]
27struct ResponseChannels {
28    block_have: flume::Sender<PeerId>,
29    block_received: flume::Sender<Option<Vec<u8>>>,
30}
31
32/// Request manager implementation that is optimized for Filecoin network
33/// usage
34pub struct BitswapRequestManager {
35    // channel for outbound `have` requests
36    outbound_have_request_tx: flume::Sender<(PeerId, Cid)>,
37    outbound_have_request_rx: flume::Receiver<(PeerId, Cid)>,
38    // channel for outbound `cancel` requests
39    outbound_cancel_request_tx: flume::Sender<(PeerId, Cid)>,
40    outbound_cancel_request_rx: flume::Receiver<(PeerId, Cid)>,
41    // channel for outbound `block` requests
42    outbound_block_request_tx: flume::Sender<(PeerId, Cid)>,
43    outbound_block_request_rx: flume::Receiver<(PeerId, Cid)>,
44    peers: RwLock<HashSet<PeerId>>,
45    response_channels: RwLock<CidHashMap<ResponseChannels>>,
46}
47
48impl BitswapRequestManager {
49    /// A receiver channel of the outbound `bitswap` network requests that the
50    /// [`BitswapRequestManager`] emits. The messages from this channel need
51    /// to be sent with [`BitswapBehaviour::send_request`] to make
52    /// [`BitswapRequestManager::get_block`] work.
53    pub fn outbound_request_stream(
54        &self,
55    ) -> impl futures::stream::Stream<Item = (PeerId, BitswapRequest)> + '_ {
56        type MapperType = fn((libp2p::PeerId, Cid)) -> (libp2p::PeerId, BitswapRequest);
57
58        fn new_block((peer, cid): (PeerId, Cid)) -> (PeerId, BitswapRequest) {
59            (peer, BitswapRequest::new_block(cid).send_dont_have(false))
60        }
61
62        fn new_have((peer, cid): (PeerId, Cid)) -> (PeerId, BitswapRequest) {
63            (peer, BitswapRequest::new_have(cid).send_dont_have(false))
64        }
65
66        fn new_cancel((peer, cid): (PeerId, Cid)) -> (PeerId, BitswapRequest) {
67            (peer, BitswapRequest::new_cancel(cid).send_dont_have(false))
68        }
69
70        // Use separate channels here to not block `block` requests when too many other type of requests are queued.
71        let streams = vec![
72            self.outbound_block_request_rx
73                .stream()
74                .map(new_block as MapperType),
75            self.outbound_have_request_rx
76                .stream()
77                .map(new_have as MapperType),
78            self.outbound_cancel_request_rx
79                .stream()
80                .map(new_cancel as MapperType),
81        ];
82        futures::stream::select_all(streams)
83    }
84}
85
86impl Default for BitswapRequestManager {
87    fn default() -> Self {
88        let (outbound_have_request_tx, outbound_have_request_rx) = flume::unbounded();
89        let (outbound_cancel_request_tx, outbound_cancel_request_rx) = flume::unbounded();
90        let (outbound_block_request_tx, outbound_block_request_rx) = flume::unbounded();
91        Self {
92            outbound_have_request_tx,
93            outbound_have_request_rx,
94            outbound_cancel_request_tx,
95            outbound_cancel_request_rx,
96            outbound_block_request_tx,
97            outbound_block_request_rx,
98            peers: RwLock::new(HashSet::new()),
99            response_channels: RwLock::new(CidHashMap::new()),
100        }
101    }
102}
103
104impl BitswapRequestManager {
105    /// Hook the `bitswap` network event into the [`BitswapRequestManager`]
106    pub fn handle_event<S: BitswapStoreRead>(
107        self: &Arc<Self>,
108        bitswap: &mut BitswapBehaviour,
109        store: &S,
110        event: BitswapBehaviourEvent,
111    ) -> anyhow::Result<()> {
112        handle_event_impl(self, bitswap, store, event)
113    }
114
115    /// Gets a block, writing it to the given block store that implements
116    /// [`BitswapStoreReadWrite`] and respond to the channel. Note: this
117    /// method is a non-blocking, it is intended to return immediately.
118    #[cfg(not(target_arch = "wasm32"))]
119    pub fn get_block(
120        self: Arc<Self>,
121        store: Arc<impl BitswapStoreReadWrite>,
122        cid: Cid,
123        timeout: Duration,
124        responder: Option<flume::Sender<bool>>,
125        validate_peer: Option<Arc<ValidatePeerCallback>>,
126    ) {
127        let start = Instant::now();
128        let store_cloned = store.clone();
129        task::spawn(async move {
130            let mut success = store.contains(&cid).unwrap_or_default();
131            if !success {
132                let deadline = start.checked_add(timeout).expect("Infallible");
133                success = task::spawn_blocking(move || {
134                    self.get_block_sync(store_cloned, cid, deadline, validate_peer)
135                })
136                .await
137                .unwrap_or_default();
138                // Spin check db when `get_block_sync` fails fast,
139                // which means there is other task actually processing the same `cid`
140                while !success && Instant::now() < deadline {
141                    task::sleep(BITSWAP_BLOCK_REQUEST_INTERVAL).await;
142                    success = store.contains(&cid).unwrap_or_default();
143                }
144            }
145
146            if success {
147                metrics::message_counter_get_block_success().inc();
148            } else {
149                metrics::message_counter_get_block_failure().inc();
150            }
151
152            if let Some(responder) = responder
153                && let Err(e) = responder.send_async(success).await
154            {
155                debug!("{e}");
156            }
157
158            metrics::GET_BLOCK_TIME.observe((Instant::now() - start).as_secs_f64());
159        });
160    }
161
162    fn get_block_sync(
163        &self,
164        store: Arc<impl BitswapStoreReadWrite>,
165        cid: Cid,
166        deadline: Instant,
167        validate_peer: Option<Arc<ValidatePeerCallback>>,
168    ) -> bool {
169        // Fail fast here when the given `cid` is being processed by other tasks
170        if self.response_channels.read().contains_key(&cid) {
171            return false;
172        }
173
174        let (block_have_tx, block_have_rx) = flume::unbounded();
175        let (block_saved_tx, block_saved_rx) = flume::unbounded();
176        let channels = ResponseChannels {
177            block_have: block_have_tx,
178            block_received: block_saved_tx,
179        };
180        {
181            self.response_channels.write().insert(cid, channels);
182        }
183
184        let peers: Vec<_> = self.peers.read().iter().cloned().collect();
185        let validated_peers: Vec<_> = peers
186            .iter()
187            .filter(|&&p| validate_peer.as_ref().map(|f| f(p)).unwrap_or(true))
188            .cloned()
189            .collect();
190
191        debug!("Found {} valid peers for {cid}", validated_peers.len());
192        let selected_peers = if validated_peers.is_empty() {
193            // Fallback to all peers
194            peers
195        } else {
196            validated_peers
197        };
198
199        for peer in selected_peers {
200            if let Err(e) = self.outbound_have_request_tx.send((peer, cid)) {
201                debug!("{e}");
202            }
203        }
204
205        let mut success = false;
206        let mut block_data = None;
207        while !success && Instant::now() < deadline {
208            match block_have_rx.try_recv() {
209                Ok(peer) => {
210                    _ = self.outbound_block_request_tx.send((peer, cid));
211                }
212                Err(TryRecvError::Empty) => {}
213                Err(TryRecvError::Disconnected) => {
214                    break;
215                }
216            }
217
218            if let Ok(data) = block_saved_rx.recv_timeout(BITSWAP_BLOCK_REQUEST_INTERVAL) {
219                success = true;
220                block_data = data;
221            }
222        }
223
224        if !success && let Ok(data) = block_saved_rx.recv_deadline(deadline) {
225            success = true;
226            block_data = data;
227        }
228
229        if let Some(data) = block_data {
230            success = match Block::new(cid, data) {
231                Ok(block) => match store.insert(&block) {
232                    Ok(()) => {
233                        metrics::message_counter_inbound_response_block_update_db().inc();
234                        true
235                    }
236                    Err(e) => {
237                        metrics::message_counter_inbound_response_block_update_db_failure().inc();
238                        warn!(
239                            "Failed to update db: {e}, cid: {cid}, data: {:?}",
240                            block.data()
241                        );
242                        false
243                    }
244                },
245                Err(e) => {
246                    warn!("Failed to construct block: {e}, cid: {cid}");
247                    false
248                }
249            };
250        }
251
252        // Cleanup
253        {
254            let mut response_channels = self.response_channels.write();
255            response_channels.remove(&cid);
256            metrics::response_channel_container_capacity()
257                .set(response_channels.total_capacity() as _);
258        }
259
260        success
261    }
262
263    pub(in crate::libp2p_bitswap) fn on_inbound_response_event<S: BitswapStoreRead>(
264        &self,
265        store: &S,
266        response: BitswapInboundResponseEvent,
267    ) {
268        use BitswapInboundResponseEvent::*;
269
270        match response {
271            HaveBlock(peer, cid) => {
272                if let Some(chans) = self.response_channels.read().get(&cid) {
273                    _ = chans.block_have.send(peer);
274                }
275            }
276            DataBlock(_peer, cid, data) => {
277                if let Some(chans) = self.response_channels.read().get(&cid) {
278                    if let Ok(true) = store.contains(&cid) {
279                        // Avoid duplicate writes, still notify the receiver
280                        metrics::message_counter_inbound_response_block_already_exists_in_db()
281                            .inc();
282                        _ = chans.block_received.send(None);
283                    } else {
284                        _ = chans.block_received.send(Some(data));
285                    }
286
287                    // <https://github.com/ipfs/go-libipfs/tree/main/bitswap#background>
288                    // When a node receives blocks that it asked for, the node should send out a
289                    // notification called a 'Cancel' to tell its peers that the
290                    // node no longer wants those blocks.
291                    for &peer in self.peers.read().iter() {
292                        if let Err(e) = self.outbound_cancel_request_tx.send((peer, cid)) {
293                            debug!("{e}");
294                        }
295                    }
296                } else {
297                    metrics::message_counter_inbound_response_block_not_requested().inc();
298                }
299            }
300        }
301    }
302
303    pub(in crate::libp2p_bitswap) fn on_peer_connected(&self, peer: PeerId) -> bool {
304        let mut peers = self.peers.write();
305        let success = peers.insert(peer);
306        if success {
307            metrics::peer_container_capacity().set(peers.capacity() as _);
308        }
309        success
310    }
311
312    pub(in crate::libp2p_bitswap) fn on_peer_disconnected(&self, peer: &PeerId) -> bool {
313        let mut peers = self.peers.write();
314        let success = peers.remove(peer);
315        if success {
316            metrics::peer_container_capacity().set(peers.capacity() as _);
317        }
318        success
319    }
320}