1use crate::block::Block;
9use crate::ledger::{Ledger, Message, Priority};
10use crate::protocol::{BitswapConfig, MessageWrapper};
11use cid::Cid;
12use fnv::FnvHashSet;
13use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
14use libp2p_core::{connection::ConnectionId, Multiaddr, PeerId};
15use libp2p_swarm::protocols_handler::{IntoProtocolsHandler, OneShotHandler, ProtocolsHandler};
16use libp2p_swarm::{
17 DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
18};
19use std::task::{Context, Poll};
20use std::{
21 collections::{HashMap, VecDeque},
22 mem,
23 sync::{
24 atomic::{AtomicU64, Ordering},
25 Arc,
26 },
27};
28
29#[derive(Clone, Debug, Eq, PartialEq)]
31pub enum BitswapEvent {
32 ReceivedBlock(PeerId, Block),
33 ReceivedWant(PeerId, Cid, Priority),
34 ReceivedCancel(PeerId, Cid),
35}
36
37#[derive(Debug, Default)]
39pub struct Stats {
40 pub sent_blocks: AtomicU64,
41 pub sent_data: AtomicU64,
42 pub received_blocks: AtomicU64,
43 pub received_data: AtomicU64,
44 pub duplicate_blocks: AtomicU64,
45 pub duplicate_data: AtomicU64,
46}
47
48impl Stats {
49 pub fn update_outgoing(&self, num_blocks: u64) {
50 self.sent_blocks.fetch_add(num_blocks, Ordering::Relaxed);
51 }
52
53 pub fn update_incoming_unique(&self, bytes: u64) {
54 self.received_blocks.fetch_add(1, Ordering::Relaxed);
55 self.received_data.fetch_add(bytes, Ordering::Relaxed);
56 }
57
58 pub fn update_incoming_duplicate(&self, bytes: u64) {
59 self.duplicate_blocks.fetch_add(1, Ordering::Relaxed);
60 self.duplicate_data.fetch_add(bytes, Ordering::Relaxed);
61 }
62
63 pub fn add_assign(&self, other: &Stats) {
64 self.sent_blocks
65 .fetch_add(other.sent_blocks.load(Ordering::Relaxed), Ordering::Relaxed);
66 self.sent_data
67 .fetch_add(other.sent_data.load(Ordering::Relaxed), Ordering::Relaxed);
68 self.received_blocks.fetch_add(
69 other.received_blocks.load(Ordering::Relaxed),
70 Ordering::Relaxed,
71 );
72 self.received_data.fetch_add(
73 other.received_data.load(Ordering::Relaxed),
74 Ordering::Relaxed,
75 );
76 self.duplicate_blocks.fetch_add(
77 other.duplicate_blocks.load(Ordering::Relaxed),
78 Ordering::Relaxed,
79 );
80 self.duplicate_data.fetch_add(
81 other.duplicate_data.load(Ordering::Relaxed),
82 Ordering::Relaxed,
83 );
84 }
85}
86
87pub struct Bitswap {
89 events: VecDeque<NetworkBehaviourAction<Message, BitswapEvent>>,
91 target_peers: FnvHashSet<PeerId>,
93 pub connected_peers: HashMap<PeerId, Ledger>,
95 wanted_blocks: HashMap<Cid, Priority>,
97 pub queued_blocks: UnboundedSender<(PeerId, Block)>,
99 ready_blocks: UnboundedReceiver<(PeerId, Block)>,
100 pub stats: HashMap<PeerId, Arc<Stats>>,
102}
103
104impl Default for Bitswap {
105 fn default() -> Self {
106 let (tx, rx) = unbounded();
107
108 Bitswap {
109 events: Default::default(),
110 target_peers: Default::default(),
111 connected_peers: Default::default(),
112 wanted_blocks: Default::default(),
113 queued_blocks: tx,
114 ready_blocks: rx,
115 stats: Default::default(),
116 }
117 }
118}
119
120impl Bitswap {
121 pub fn local_wantlist(&self) -> Vec<(Cid, Priority)> {
123 self.wanted_blocks
124 .iter()
125 .map(|(cid, prio)| (cid.clone(), *prio))
126 .collect()
127 }
128
129 pub fn peer_wantlist(&self, peer: &PeerId) -> Option<Vec<(Cid, Priority)>> {
131 self.connected_peers.get(peer).map(Ledger::wantlist)
132 }
133
134 pub fn stats(&self) -> Stats {
135 self.stats
136 .values()
137 .fold(Stats::default(), |acc, peer_stats| {
138 acc.add_assign(&peer_stats);
139 acc
140 })
141 }
142
143 pub fn peers(&self) -> Vec<PeerId> {
144 self.connected_peers.keys().cloned().collect()
145 }
146
147 pub fn connect(&mut self, peer_id: PeerId) {
151 if self.target_peers.insert(peer_id.clone()) {
152 self.events.push_back(NetworkBehaviourAction::DialPeer {
153 peer_id,
154 condition: DialPeerCondition::Disconnected,
155 });
156 }
157 }
158
159 pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
163 trace!("queueing block to be sent to {}: {}", peer_id, block.cid);
164 if let Some(ledger) = self.connected_peers.get_mut(&peer_id) {
165 ledger.add_block(block);
166 }
167 }
168
169 fn send_want_list(&mut self, peer_id: PeerId) {
171 if !self.wanted_blocks.is_empty() {
172 let mut message = Message::default();
176 for (cid, priority) in &self.wanted_blocks {
177 message.want_block(cid, *priority);
178 }
179 self.events
180 .push_back(NetworkBehaviourAction::NotifyHandler {
181 peer_id,
182 event: message,
183 handler: NotifyHandler::Any,
184 });
185 }
186 }
187
188 pub fn want_block(&mut self, cid: Cid, priority: Priority) {
192 for (_peer_id, ledger) in self.connected_peers.iter_mut() {
193 ledger.want_block(&cid, priority);
194 }
195 self.wanted_blocks.insert(cid, priority);
196 }
197
198 pub fn cancel_block(&mut self, cid: &Cid) {
203 for (_peer_id, ledger) in self.connected_peers.iter_mut() {
204 ledger.cancel_block(cid);
205 }
206 self.wanted_blocks.remove(cid);
207 }
208}
209
210impl NetworkBehaviour for Bitswap {
211 type ProtocolsHandler = OneShotHandler<BitswapConfig, Message, MessageWrapper>;
212 type OutEvent = BitswapEvent;
213
214 fn new_handler(&mut self) -> Self::ProtocolsHandler {
215 debug!("bitswap: new_handler");
216 Default::default()
217 }
218
219 fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
220 debug!("bitswap: addresses_of_peer");
221 Vec::new()
222 }
223
224 fn inject_connected(&mut self, peer_id: &PeerId) {
225 debug!("bitswap: inject_connected {}", peer_id);
226 let ledger = Ledger::new();
227 self.stats.entry(peer_id.clone()).or_default();
228 self.connected_peers.insert(peer_id.clone(), ledger);
229 self.send_want_list(peer_id.clone());
230 }
231
232 fn inject_disconnected(&mut self, peer_id: &PeerId) {
233 debug!("bitswap: inject_disconnected {:?}", peer_id);
234 self.connected_peers.remove(peer_id);
235 }
238
239 fn inject_event(&mut self, source: PeerId, _connection: ConnectionId, message: MessageWrapper) {
240 let mut message = match message {
241 MessageWrapper::Tx => return,
245 MessageWrapper::Rx(msg) => msg,
247 };
248
249 debug!("bitswap: inject_event from {}: {:?}", source, message);
250
251 let current_wantlist = self.local_wantlist();
252
253 let ledger = self
254 .connected_peers
255 .get_mut(&source)
256 .expect("Peer not in ledger?!");
257
258 for cid in message.cancel() {
260 ledger.received_want_list.remove(cid);
261
262 let event = BitswapEvent::ReceivedCancel(source.clone(), cid.clone());
263 self.events
264 .push_back(NetworkBehaviourAction::GenerateEvent(event));
265 }
266
267 for (cid, priority) in message
269 .want()
270 .iter()
271 .filter(|&(cid, _)| !current_wantlist.iter().map(|(c, _)| c).any(|c| c == cid))
272 {
273 ledger.received_want_list.insert(cid.to_owned(), *priority);
274
275 let event = BitswapEvent::ReceivedWant(source.clone(), cid.clone(), *priority);
276 self.events
277 .push_back(NetworkBehaviourAction::GenerateEvent(event));
278 }
279
280 for block in mem::take(&mut message.blocks) {
282 self.cancel_block(&block.cid());
283
284 let event = BitswapEvent::ReceivedBlock(source.clone(), block);
285 self.events
286 .push_back(NetworkBehaviourAction::GenerateEvent(event));
287 }
288 }
289
290 #[allow(clippy::type_complexity)]
291 fn poll(&mut self, ctx: &mut Context, _: &mut impl PollParameters)
292 -> Poll<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>
293 {
294 use futures::stream::StreamExt;
295
296 while let Poll::Ready(Some((peer_id, block))) = self.ready_blocks.poll_next_unpin(ctx) {
297 self.send_block(peer_id, block);
298 }
299
300 if let Some(event) = self.events.pop_front() {
301 return Poll::Ready(event);
302 }
303
304 for (peer_id, ledger) in &mut self.connected_peers {
305 if let Some(message) = ledger.send() {
306 if let Some(peer_stats) = self.stats.get_mut(peer_id) {
307 peer_stats.update_outgoing(message.blocks.len() as u64);
308 }
309
310 return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
311 peer_id: peer_id.clone(),
312 handler: NotifyHandler::Any,
313 event: message,
314 });
315 }
316 }
317 Poll::Pending
318 }
319}