1use crate::block::Block;
9use crate::ledger::{Ledger, Message, Priority};
10use crate::protocol::{BitswapConfig, MessageWrapper};
11use fnv::FnvHashSet;
12use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
13use hash_hasher::HashedMap;
14use libipld::Cid;
15use libp2p::core::Multiaddr;
16use libp2p::identity::PeerId;
17use libp2p::swarm::derive_prelude::ConnectionEstablished;
18use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
19use libp2p::swarm::handler::OneShotHandler;
20use libp2p::swarm::{
21 ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler,
22 PollParameters, THandler, ToSwarm as NetworkBehaviourAction,
23};
24use std::task::{Context, Poll};
25use std::{
26 collections::{HashMap, VecDeque},
27 mem,
28 sync::{
29 atomic::{AtomicU64, Ordering},
30 Arc,
31 },
32};
33
34#[derive(Clone, Debug, Eq, PartialEq)]
36pub enum BitswapEvent {
37 ReceivedBlock(PeerId, Block),
38 ReceivedWant(PeerId, Cid, Priority),
39 ReceivedCancel(PeerId, Cid),
40}
41
42#[derive(Debug, Default)]
44pub struct Stats {
45 pub sent_blocks: AtomicU64,
46 pub sent_data: AtomicU64,
47 pub received_blocks: AtomicU64,
48 pub received_data: AtomicU64,
49 pub duplicate_blocks: AtomicU64,
50 pub duplicate_data: AtomicU64,
51}
52
53impl Stats {
54 pub fn update_outgoing(&self, num_blocks: u64) {
55 self.sent_blocks.fetch_add(num_blocks, Ordering::Relaxed);
56 }
57
58 pub fn update_incoming_unique(&self, bytes: u64) {
59 self.received_blocks.fetch_add(1, Ordering::Relaxed);
60 self.received_data.fetch_add(bytes, Ordering::Relaxed);
61 }
62
63 pub fn update_incoming_duplicate(&self, bytes: u64) {
64 self.duplicate_blocks.fetch_add(1, Ordering::Relaxed);
65 self.duplicate_data.fetch_add(bytes, Ordering::Relaxed);
66 }
67
68 pub fn add_assign(&self, other: &Stats) {
69 self.sent_blocks
70 .fetch_add(other.sent_blocks.load(Ordering::Relaxed), Ordering::Relaxed);
71 self.sent_data
72 .fetch_add(other.sent_data.load(Ordering::Relaxed), Ordering::Relaxed);
73 self.received_blocks.fetch_add(
74 other.received_blocks.load(Ordering::Relaxed),
75 Ordering::Relaxed,
76 );
77 self.received_data.fetch_add(
78 other.received_data.load(Ordering::Relaxed),
79 Ordering::Relaxed,
80 );
81 self.duplicate_blocks.fetch_add(
82 other.duplicate_blocks.load(Ordering::Relaxed),
83 Ordering::Relaxed,
84 );
85 self.duplicate_data.fetch_add(
86 other.duplicate_data.load(Ordering::Relaxed),
87 Ordering::Relaxed,
88 );
89 }
90}
91
92pub struct Bitswap {
94 events: VecDeque<NetworkBehaviourAction<BitswapEvent, Message>>,
96 target_peers: FnvHashSet<PeerId>,
98 pub connected_peers: HashMap<PeerId, Ledger>,
100 wanted_blocks: HashedMap<Cid, Priority>,
102 pub queued_blocks: UnboundedSender<(PeerId, Block)>,
104 ready_blocks: UnboundedReceiver<(PeerId, Block)>,
105
106 pub dont_have_tx: UnboundedSender<(PeerId, Cid)>,
107 dont_have_rx: UnboundedReceiver<(PeerId, Cid)>,
108 pub stats: HashMap<PeerId, Arc<Stats>>,
110}
111
112impl Default for Bitswap {
113 fn default() -> Self {
114 let (tx, rx) = unbounded();
115 let (dtx, drx) = unbounded();
116
117 Bitswap {
118 events: Default::default(),
119 target_peers: Default::default(),
120 connected_peers: Default::default(),
121 wanted_blocks: Default::default(),
122 queued_blocks: tx,
123 dont_have_rx: drx,
124 dont_have_tx: dtx,
125 ready_blocks: rx,
126 stats: Default::default(),
127 }
128 }
129}
130
131impl Bitswap {
132 pub fn local_wantlist(&self) -> Vec<(Cid, Priority)> {
134 self.wanted_blocks
135 .iter()
136 .map(|(cid, prio)| (*cid, *prio))
137 .collect()
138 }
139
140 pub fn peer_wantlist(&self, peer: &PeerId) -> Option<Vec<(Cid, Priority)>> {
142 self.connected_peers.get(peer).map(Ledger::wantlist)
143 }
144
145 pub fn stats(&self) -> Stats {
146 self.stats
147 .values()
148 .fold(Stats::default(), |acc, peer_stats| {
149 acc.add_assign(peer_stats);
150 acc
151 })
152 }
153
154 pub fn peers(&self) -> Vec<PeerId> {
155 self.connected_peers.keys().cloned().collect()
156 }
157
158 pub fn connect(&mut self, peer_id: PeerId) {
162 if self.target_peers.insert(peer_id) {
163 self.events.push_back(NetworkBehaviourAction::Dial {
164 opts: DialOpts::peer_id(peer_id)
165 .condition(PeerCondition::Disconnected)
166 .build(),
167 });
168 }
169 }
170
171 pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
175 trace!("queueing block to be sent to {}: {}", peer_id, block.cid());
176 if let Some(ledger) = self.connected_peers.get_mut(&peer_id) {
177 ledger.add_block(block);
178 }
179 }
180
181 fn send_want_list(&mut self, peer_id: PeerId) {
183 if !self.wanted_blocks.is_empty() {
184 let mut message = Message::default();
188 for (cid, priority) in &self.wanted_blocks {
189 message.want_block(cid, *priority);
190 }
191 self.events
192 .push_back(NetworkBehaviourAction::NotifyHandler {
193 peer_id,
194 event: message,
195 handler: NotifyHandler::Any,
196 });
197 }
198 }
199
200 pub fn want_block(&mut self, cid: Cid, priority: Priority) {
204 for (_peer_id, ledger) in self.connected_peers.iter_mut() {
205 ledger.want_block(&cid, priority);
206 }
207 self.wanted_blocks.insert(cid, priority);
208 }
209
210 pub fn want_block_from_peers(&mut self, cid: Cid, priority: Priority, peers: &[PeerId]) {
214 for peer in peers {
215 if let Some(ledger) = self.connected_peers.get_mut(peer) {
216 ledger.want_block(&cid, priority);
217 }
218 }
219 self.wanted_blocks.insert(cid, priority);
220 }
221
222 pub fn dont_have(&mut self, cid: Cid) {
224 for (_peer_id, ledger) in self.connected_peers.iter_mut() {
225 ledger.received_want_list.remove(&cid);
226 }
228 }
229
230 pub fn dont_have_for_peer(&mut self, peer_id: PeerId, cid: Cid) {
231 if let Some(ledger) = self.connected_peers.get_mut(&peer_id) {
232 ledger.received_want_list.remove(&cid);
233 ledger.received_want_list.shrink_to_fit();
234 }
236 }
237
238 pub fn cancel_block(&mut self, cid: &Cid) {
243 for (_peer_id, ledger) in self.connected_peers.iter_mut() {
244 ledger.cancel_block(cid);
245 }
246 self.wanted_blocks.remove(cid);
247 }
248}
249
250impl NetworkBehaviour for Bitswap {
251 type ConnectionHandler = OneShotHandler<BitswapConfig, Message, MessageWrapper>;
252 type OutEvent = BitswapEvent;
253
254 fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
255 debug!("bitswap: addresses_of_peer");
256 Vec::new()
257 }
258
259 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
260 match event {
261 FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
262 debug!("bitswap: inject_connected {}", peer_id);
263 self.target_peers.remove(&peer_id);
264 let ledger = Ledger::new();
265 self.stats.entry(peer_id).or_default();
266 self.connected_peers.insert(peer_id, ledger);
267 self.send_want_list(peer_id);
268 }
269 FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => {
270 debug!("bitswap: inject_disconnected {:?}", peer_id);
271 self.connected_peers.remove(&peer_id);
272 }
273 _ => {}
274 }
275 }
276
277 fn handle_established_inbound_connection(
278 &mut self,
279 _connection_id: ConnectionId,
280 _peer: PeerId,
281 _local_addr: &Multiaddr,
282 _remote_addr: &Multiaddr,
283 ) -> Result<THandler<Self>, ConnectionDenied> {
284 Ok(Self::ConnectionHandler::default())
285 }
286
287 fn handle_established_outbound_connection(
288 &mut self,
289 _connection_id: ConnectionId,
290 _peer: PeerId,
291 _addr: &Multiaddr,
292 _role_override: libp2p::core::Endpoint,
293 ) -> Result<THandler<Self>, ConnectionDenied> {
294 Ok(Self::ConnectionHandler::default())
295 }
296
297 fn on_connection_handler_event(
298 &mut self,
299 source: PeerId,
300 _connection: ConnectionId,
301 message: MessageWrapper,
302 ) {
303 let mut message = match message {
304 MessageWrapper::Tx => return,
308 MessageWrapper::Rx(msg) => msg,
310 };
311
312 debug!("bitswap: inject_event from {}: {:?}", source, message);
313
314 let current_wantlist = self.local_wantlist();
315
316 let ledger = match self.connected_peers.get_mut(&source) {
323 Some(ledger) => ledger,
324 None => {
325 debug!("bitswap: Peer {} is not in ledger", source);
326 return;
327 }
328 };
329
330 for cid in message.cancel() {
332 ledger.received_want_list.remove(cid);
333
334 let event = BitswapEvent::ReceivedCancel(source, *cid);
335 self.events
336 .push_back(NetworkBehaviourAction::GenerateEvent(event));
337 }
338
339 for (cid, priority) in message
341 .want()
342 .iter()
343 .filter(|&(cid, _)| !current_wantlist.iter().map(|(c, _)| c).any(|c| c == cid))
344 {
345 ledger.received_want_list.insert(cid.to_owned(), *priority);
346
347 let event = BitswapEvent::ReceivedWant(source, *cid, *priority);
348 self.events
349 .push_back(NetworkBehaviourAction::GenerateEvent(event));
350 }
351
352 for block in mem::take(&mut message.blocks) {
354 self.cancel_block(block.cid());
355
356 let event = BitswapEvent::ReceivedBlock(source, block);
357 self.events
358 .push_back(NetworkBehaviourAction::GenerateEvent(event));
359 }
360 }
361
362 #[allow(clippy::type_complexity)]
363 fn poll(
364 &mut self,
365 ctx: &mut Context,
366 _: &mut impl PollParameters,
367 ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Message>> {
368 use futures::stream::StreamExt;
369
370 if let Some(event) = self.events.pop_front() {
371 return Poll::Ready(event);
372 }
373
374 while let Poll::Ready(Some((peer_id, block))) = self.dont_have_rx.poll_next_unpin(ctx) {
375 self.dont_have_for_peer(peer_id, block);
376 }
377
378 while let Poll::Ready(Some((peer_id, block))) = self.ready_blocks.poll_next_unpin(ctx) {
379 self.send_block(peer_id, block);
380 }
381
382 for (peer_id, ledger) in &mut self.connected_peers {
383 if let Some(message) = ledger.send() {
384 if let Some(peer_stats) = self.stats.get_mut(peer_id) {
385 peer_stats.update_outgoing(message.blocks.len() as u64);
386 }
387
388 return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
389 peer_id: *peer_id,
390 handler: NotifyHandler::Any,
391 event: message,
392 });
393 }
394 }
395 Poll::Pending
396 }
397}