ipfs_bitswap/
ledger.rs

1use crate::bitswap_pb;
2use crate::block::Block;
3use crate::error::BitswapError;
4use crate::prefix::Prefix;
5use cid::Cid;
6use core::convert::TryFrom;
7use prost::Message as ProstMessage;
8use std::{
9    collections::{HashMap, HashSet},
10    mem,
11};
12
13pub type Priority = i32;
14
15/// The Ledger contains the history of transactions with a peer.
16#[derive(Debug, Default)]
17pub struct Ledger {
18    /// The list of wanted blocks sent to the peer.
19    sent_want_list: HashMap<Cid, Priority>,
20    /// The list of wanted blocks received from the peer.
21    pub(crate) received_want_list: HashMap<Cid, Priority>,
22    /// Queued message.
23    message: Message,
24}
25
26impl Ledger {
27    /// Creates a new `PeerLedger`.
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    pub fn add_block(&mut self, block: Block) {
33        self.message.add_block(block);
34    }
35
36    pub fn want_block(&mut self, cid: &Cid, priority: Priority) {
37        self.message.want_block(cid, priority);
38    }
39
40    pub fn cancel_block(&mut self, cid: &Cid) {
41        self.message.cancel_block(cid);
42    }
43
44    /// Returns the blocks wanted by the peer in unspecified order
45    pub fn wantlist(&self) -> Vec<(Cid, Priority)> {
46        self.received_want_list
47            .iter()
48            .map(|(cid, prio)| (cid.clone(), *prio))
49            .collect()
50    }
51
52    pub fn send(&mut self) -> Option<Message> {
53        if self.message.is_empty() {
54            return None;
55        }
56        // FIXME: this might produce too large message
57        for cid in self.message.cancel() {
58            self.sent_want_list.remove(cid);
59        }
60        for (cid, priority) in self.message.want() {
61            self.sent_want_list.insert(cid.clone(), *priority);
62        }
63
64        Some(mem::take(&mut self.message))
65    }
66}
67
68/// A bitswap message.
69#[derive(Clone, PartialEq, Default)]
70pub struct Message {
71    /// List of wanted blocks.
72    want: HashMap<Cid, Priority>,
73    /// List of blocks to cancel.
74    cancel: HashSet<Cid>,
75    /// Wheather it is the full list of wanted blocks.
76    full: bool,
77    /// List of blocks to send.
78    pub(crate) blocks: Vec<Block>,
79}
80
81impl Message {
82    /// Checks whether the queued message is empty.
83    pub fn is_empty(&self) -> bool {
84        self.want.is_empty() && self.cancel.is_empty() && self.blocks.is_empty()
85    }
86
87    /// Returns the list of blocks.
88    pub fn blocks(&self) -> &[Block] {
89        &self.blocks
90    }
91
92    /// Returns the list of wanted blocks.
93    pub fn want(&self) -> &HashMap<Cid, Priority> {
94        &self.want
95    }
96
97    /// Returns the list of cancelled blocks.
98    pub fn cancel(&self) -> &HashSet<Cid> {
99        &self.cancel
100    }
101
102    /// Adds a `Block` to the message.
103    pub fn add_block(&mut self, block: Block) {
104        self.blocks.push(block);
105    }
106
107    /// Removes the block from the message.
108    pub fn remove_block(&mut self, cid: &Cid) {
109        self.blocks.retain(|block| block.cid() != cid);
110    }
111
112    /// Adds a block to the want list.
113    pub fn want_block(&mut self, cid: &Cid, priority: Priority) {
114        self.want.insert(cid.to_owned(), priority);
115    }
116
117    /// Adds a block to the cancel list.
118    pub fn cancel_block(&mut self, cid: &Cid) {
119        self.cancel.insert(cid.to_owned());
120    }
121
122    /// Removes the block from the want list.
123    #[allow(unused)]
124    pub fn remove_want_block(&mut self, cid: &Cid) {
125        self.want.remove(cid);
126    }
127}
128
129impl Into<Vec<u8>> for &Message {
130    fn into(self) -> Vec<u8> {
131        let mut proto = bitswap_pb::Message::default();
132        let mut wantlist = bitswap_pb::message::Wantlist::default();
133        for (cid, priority) in self.want() {
134            let mut entry = bitswap_pb::message::wantlist::Entry::default();
135            entry.block = cid.to_bytes();
136            entry.priority = *priority;
137            wantlist.entries.push(entry);
138        }
139        for cid in self.cancel() {
140            let mut entry = bitswap_pb::message::wantlist::Entry::default();
141            entry.block = cid.to_bytes();
142            entry.cancel = true;
143            wantlist.entries.push(entry);
144        }
145        for block in self.blocks() {
146            let mut payload = bitswap_pb::message::Block::default();
147            payload.prefix = Prefix::from(block.cid()).to_bytes();
148            payload.data = block.data().to_vec();
149            proto.payload.push(payload);
150        }
151        if !wantlist.entries.is_empty() {
152            proto.wantlist = Some(wantlist);
153        }
154        let mut res = Vec::with_capacity(proto.encoded_len());
155        proto
156            .encode(&mut res)
157            .expect("there is no situation in which the protobuf message can be invalid");
158        res
159    }
160}
161
162impl Message {
163    /// Turns this `Message` into a message that can be sent to a substream.
164    pub fn to_bytes(&self) -> Vec<u8> {
165        self.into()
166    }
167
168    /// Creates a `Message` from bytes that were received from a substream.
169    pub fn from_bytes(bytes: &[u8]) -> Result<Self, BitswapError> {
170        Self::try_from(bytes)
171    }
172}
173
174impl From<()> for Message {
175    fn from(_: ()) -> Self {
176        Default::default()
177    }
178}
179
180impl TryFrom<&[u8]> for Message {
181    type Error = BitswapError;
182    fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
183        let proto: bitswap_pb::Message = bitswap_pb::Message::decode(bytes)?;
184        let mut message = Message::default();
185        for entry in proto.wantlist.unwrap_or_default().entries {
186            let cid = Cid::try_from(entry.block)?;
187            if entry.cancel {
188                message.cancel_block(&cid);
189            } else {
190                message.want_block(&cid, entry.priority);
191            }
192        }
193        for payload in proto.payload {
194            let prefix = Prefix::new(&payload.prefix)?;
195            let cid = prefix.to_cid(&payload.data)?;
196            let block = Block {
197                cid,
198                data: payload.data.into_boxed_slice(),
199            };
200            message.add_block(block);
201        }
202        Ok(message)
203    }
204}
205
206impl std::fmt::Debug for Message {
207    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
208        let mut first = true;
209        for (cid, priority) in self.want() {
210            if first {
211                first = false;
212            } else {
213                write!(fmt, ", ")?;
214            }
215            write!(fmt, "want: {} {}", cid, priority)?;
216        }
217        for cid in self.cancel() {
218            if first {
219                first = false;
220            } else {
221                write!(fmt, ", ")?;
222            }
223            write!(fmt, "cancel: {}", cid)?;
224        }
225        for block in self.blocks() {
226            if first {
227                first = false;
228            } else {
229                write!(fmt, ", ")?;
230            }
231            write!(fmt, "block: {}", block.cid())?;
232        }
233
234        if first {
235            write!(fmt, "(empty message)")?;
236        }
237
238        Ok(())
239    }
240}