1use std::{
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use crate::cid_collections::CidHashMap;
13use ahash::{HashSet, HashSetExt};
14use cid::Cid;
15use flume::TryRecvError;
16use futures::StreamExt;
17use libp2p::PeerId;
18use parking_lot::RwLock;
19
20use crate::libp2p_bitswap::{event_handlers::*, *};
21
22const BITSWAP_BLOCK_REQUEST_INTERVAL: Duration = Duration::from_millis(500);
23
24pub type ValidatePeerCallback = dyn Fn(PeerId) -> bool + Send + Sync;
25
26#[derive(Debug, Clone)]
27struct ResponseChannels {
28 block_have: flume::Sender<PeerId>,
29 block_received: flume::Sender<Option<Vec<u8>>>,
30}
31
32pub struct BitswapRequestManager {
35 outbound_have_request_tx: flume::Sender<(PeerId, Cid)>,
37 outbound_have_request_rx: flume::Receiver<(PeerId, Cid)>,
38 outbound_cancel_request_tx: flume::Sender<(PeerId, Cid)>,
40 outbound_cancel_request_rx: flume::Receiver<(PeerId, Cid)>,
41 outbound_block_request_tx: flume::Sender<(PeerId, Cid)>,
43 outbound_block_request_rx: flume::Receiver<(PeerId, Cid)>,
44 peers: RwLock<HashSet<PeerId>>,
45 response_channels: RwLock<CidHashMap<ResponseChannels>>,
46}
47
48impl BitswapRequestManager {
49 pub fn outbound_request_stream(
54 &self,
55 ) -> impl futures::stream::Stream<Item = (PeerId, BitswapRequest)> + '_ {
56 type MapperType = fn((libp2p::PeerId, Cid)) -> (libp2p::PeerId, BitswapRequest);
57
58 fn new_block((peer, cid): (PeerId, Cid)) -> (PeerId, BitswapRequest) {
59 (peer, BitswapRequest::new_block(cid).send_dont_have(false))
60 }
61
62 fn new_have((peer, cid): (PeerId, Cid)) -> (PeerId, BitswapRequest) {
63 (peer, BitswapRequest::new_have(cid).send_dont_have(false))
64 }
65
66 fn new_cancel((peer, cid): (PeerId, Cid)) -> (PeerId, BitswapRequest) {
67 (peer, BitswapRequest::new_cancel(cid).send_dont_have(false))
68 }
69
70 let streams = vec![
72 self.outbound_block_request_rx
73 .stream()
74 .map(new_block as MapperType),
75 self.outbound_have_request_rx
76 .stream()
77 .map(new_have as MapperType),
78 self.outbound_cancel_request_rx
79 .stream()
80 .map(new_cancel as MapperType),
81 ];
82 futures::stream::select_all(streams)
83 }
84}
85
86impl Default for BitswapRequestManager {
87 fn default() -> Self {
88 let (outbound_have_request_tx, outbound_have_request_rx) = flume::unbounded();
89 let (outbound_cancel_request_tx, outbound_cancel_request_rx) = flume::unbounded();
90 let (outbound_block_request_tx, outbound_block_request_rx) = flume::unbounded();
91 Self {
92 outbound_have_request_tx,
93 outbound_have_request_rx,
94 outbound_cancel_request_tx,
95 outbound_cancel_request_rx,
96 outbound_block_request_tx,
97 outbound_block_request_rx,
98 peers: RwLock::new(HashSet::new()),
99 response_channels: RwLock::new(CidHashMap::new()),
100 }
101 }
102}
103
104impl BitswapRequestManager {
105 pub fn handle_event<S: BitswapStoreRead>(
107 self: &Arc<Self>,
108 bitswap: &mut BitswapBehaviour,
109 store: &S,
110 event: BitswapBehaviourEvent,
111 ) -> anyhow::Result<()> {
112 handle_event_impl(self, bitswap, store, event)
113 }
114
115 #[cfg(not(target_arch = "wasm32"))]
119 pub fn get_block(
120 self: Arc<Self>,
121 store: Arc<impl BitswapStoreReadWrite>,
122 cid: Cid,
123 timeout: Duration,
124 responder: Option<flume::Sender<bool>>,
125 validate_peer: Option<Arc<ValidatePeerCallback>>,
126 ) {
127 let start = Instant::now();
128 let store_cloned = store.clone();
129 task::spawn(async move {
130 let mut success = store.contains(&cid).unwrap_or_default();
131 if !success {
132 let deadline = start.checked_add(timeout).expect("Infallible");
133 success = task::spawn_blocking(move || {
134 self.get_block_sync(store_cloned, cid, deadline, validate_peer)
135 })
136 .await
137 .unwrap_or_default();
138 while !success && Instant::now() < deadline {
141 task::sleep(BITSWAP_BLOCK_REQUEST_INTERVAL).await;
142 success = store.contains(&cid).unwrap_or_default();
143 }
144 }
145
146 if success {
147 metrics::message_counter_get_block_success().inc();
148 } else {
149 metrics::message_counter_get_block_failure().inc();
150 }
151
152 if let Some(responder) = responder
153 && let Err(e) = responder.send_async(success).await
154 {
155 debug!("{e}");
156 }
157
158 metrics::GET_BLOCK_TIME.observe((Instant::now() - start).as_secs_f64());
159 });
160 }
161
162 fn get_block_sync(
163 &self,
164 store: Arc<impl BitswapStoreReadWrite>,
165 cid: Cid,
166 deadline: Instant,
167 validate_peer: Option<Arc<ValidatePeerCallback>>,
168 ) -> bool {
169 if self.response_channels.read().contains_key(&cid) {
171 return false;
172 }
173
174 let (block_have_tx, block_have_rx) = flume::unbounded();
175 let (block_saved_tx, block_saved_rx) = flume::unbounded();
176 let channels = ResponseChannels {
177 block_have: block_have_tx,
178 block_received: block_saved_tx,
179 };
180 {
181 self.response_channels.write().insert(cid, channels);
182 }
183
184 let peers: Vec<_> = self.peers.read().iter().cloned().collect();
185 let validated_peers: Vec<_> = peers
186 .iter()
187 .filter(|&&p| validate_peer.as_ref().map(|f| f(p)).unwrap_or(true))
188 .cloned()
189 .collect();
190
191 debug!("Found {} valid peers for {cid}", validated_peers.len());
192 let selected_peers = if validated_peers.is_empty() {
193 peers
195 } else {
196 validated_peers
197 };
198
199 for peer in selected_peers {
200 if let Err(e) = self.outbound_have_request_tx.send((peer, cid)) {
201 debug!("{e}");
202 }
203 }
204
205 let mut success = false;
206 let mut block_data = None;
207 while !success && Instant::now() < deadline {
208 match block_have_rx.try_recv() {
209 Ok(peer) => {
210 _ = self.outbound_block_request_tx.send((peer, cid));
211 }
212 Err(TryRecvError::Empty) => {}
213 Err(TryRecvError::Disconnected) => {
214 break;
215 }
216 }
217
218 if let Ok(data) = block_saved_rx.recv_timeout(BITSWAP_BLOCK_REQUEST_INTERVAL) {
219 success = true;
220 block_data = data;
221 }
222 }
223
224 if !success && let Ok(data) = block_saved_rx.recv_deadline(deadline) {
225 success = true;
226 block_data = data;
227 }
228
229 if let Some(data) = block_data {
230 success = match Block::new(cid, data) {
231 Ok(block) => match store.insert(&block) {
232 Ok(()) => {
233 metrics::message_counter_inbound_response_block_update_db().inc();
234 true
235 }
236 Err(e) => {
237 metrics::message_counter_inbound_response_block_update_db_failure().inc();
238 warn!(
239 "Failed to update db: {e}, cid: {cid}, data: {:?}",
240 block.data()
241 );
242 false
243 }
244 },
245 Err(e) => {
246 warn!("Failed to construct block: {e}, cid: {cid}");
247 false
248 }
249 };
250 }
251
252 {
254 let mut response_channels = self.response_channels.write();
255 response_channels.remove(&cid);
256 metrics::response_channel_container_capacity()
257 .set(response_channels.total_capacity() as _);
258 }
259
260 success
261 }
262
263 pub(in crate::libp2p_bitswap) fn on_inbound_response_event<S: BitswapStoreRead>(
264 &self,
265 store: &S,
266 response: BitswapInboundResponseEvent,
267 ) {
268 use BitswapInboundResponseEvent::*;
269
270 match response {
271 HaveBlock(peer, cid) => {
272 if let Some(chans) = self.response_channels.read().get(&cid) {
273 _ = chans.block_have.send(peer);
274 }
275 }
276 DataBlock(_peer, cid, data) => {
277 if let Some(chans) = self.response_channels.read().get(&cid) {
278 if let Ok(true) = store.contains(&cid) {
279 metrics::message_counter_inbound_response_block_already_exists_in_db()
281 .inc();
282 _ = chans.block_received.send(None);
283 } else {
284 _ = chans.block_received.send(Some(data));
285 }
286
287 for &peer in self.peers.read().iter() {
292 if let Err(e) = self.outbound_cancel_request_tx.send((peer, cid)) {
293 debug!("{e}");
294 }
295 }
296 } else {
297 metrics::message_counter_inbound_response_block_not_requested().inc();
298 }
299 }
300 }
301 }
302
303 pub(in crate::libp2p_bitswap) fn on_peer_connected(&self, peer: PeerId) -> bool {
304 let mut peers = self.peers.write();
305 let success = peers.insert(peer);
306 if success {
307 metrics::peer_container_capacity().set(peers.capacity() as _);
308 }
309 success
310 }
311
312 pub(in crate::libp2p_bitswap) fn on_peer_disconnected(&self, peer: &PeerId) -> bool {
313 let mut peers = self.peers.write();
314 let success = peers.remove(peer);
315 if success {
316 metrics::peer_container_capacity().set(peers.capacity() as _);
317 }
318 success
319 }
320}