rust_ipfs_bitswap/
ledger.rs

1use crate::bitswap_pb;
2use crate::block::Block;
3use crate::error::BitswapError;
4use crate::prefix::Prefix;
5use core::convert::TryFrom;
6use hash_hasher::{HashedMap, HashedSet};
7use libipld::Cid;
8use prost::Message as ProstMessage;
9use std::mem;
10
11pub type Priority = i32;
12
13/// The Ledger contains the history of transactions with a peer.
14#[derive(Debug, Default)]
15pub struct Ledger {
16    /// The list of wanted blocks sent to the peer.
17    sent_want_list: HashedMap<Cid, Priority>,
18    /// The list of wanted blocks received from the peer.
19    pub(crate) received_want_list: HashedMap<Cid, Priority>,
20    /// Queued message.
21    message: Message,
22}
23
24impl Ledger {
25    /// Creates a new `PeerLedger`.
26    pub fn new() -> Self {
27        Self::default()
28    }
29
30    pub fn add_block(&mut self, block: Block) {
31        self.message.add_block(block);
32    }
33
34    pub fn want_block(&mut self, cid: &Cid, priority: Priority) {
35        self.message.want_block(cid, priority);
36    }
37
38    pub fn cancel_block(&mut self, cid: &Cid) {
39        self.message.cancel_block(cid);
40    }
41
42    /// Returns the blocks wanted by the peer in unspecified order
43    pub fn wantlist(&self) -> Vec<(Cid, Priority)> {
44        self.received_want_list
45            .iter()
46            .map(|(cid, prio)| (*cid, *prio))
47            .collect()
48    }
49
50    pub fn send(&mut self) -> Option<Message> {
51        if self.message.is_empty() {
52            return None;
53        }
54        // FIXME: this might produce too large message
55        for cid in self.message.cancel() {
56            self.sent_want_list.remove(cid);
57        }
58        for (cid, priority) in self.message.want() {
59            self.sent_want_list.insert(*cid, *priority);
60        }
61
62        Some(mem::take(&mut self.message))
63    }
64}
65
66/// A bitswap message.
67#[derive(Clone, PartialEq, Eq, Default)]
68pub struct Message {
69    /// List of wanted blocks.
70    want: HashedMap<Cid, Priority>,
71    /// List of blocks to cancel.
72    cancel: HashedSet<Cid>,
73    /// Wheather it is the full list of wanted blocks.
74    full: bool,
75    /// List of blocks to send.
76    pub(crate) blocks: Vec<Block>,
77}
78
79impl Message {
80    /// Checks whether the queued message is empty.
81    pub fn is_empty(&self) -> bool {
82        self.want.is_empty() && self.cancel.is_empty() && self.blocks.is_empty()
83    }
84
85    /// Returns the list of blocks.
86    pub fn blocks(&self) -> &[Block] {
87        &self.blocks
88    }
89
90    /// Returns the list of wanted blocks.
91    pub fn want(&self) -> &HashedMap<Cid, Priority> {
92        &self.want
93    }
94
95    /// Returns the list of cancelled blocks.
96    pub fn cancel(&self) -> &HashedSet<Cid> {
97        &self.cancel
98    }
99
100    /// Adds a `Block` to the message.
101    pub fn add_block(&mut self, block: Block) {
102        self.blocks.push(block);
103    }
104
105    /// Removes the block from the message.
106    pub fn remove_block(&mut self, cid: &Cid) {
107        self.blocks.retain(|block| block.cid() != cid);
108    }
109
110    /// Adds a block to the want list.
111    pub fn want_block(&mut self, cid: &Cid, priority: Priority) {
112        self.want.insert(cid.to_owned(), priority);
113    }
114
115    /// Adds a block to the cancel list.
116    pub fn cancel_block(&mut self, cid: &Cid) {
117        self.cancel.insert(cid.to_owned());
118    }
119
120    /// Removes the block from the want list.
121    #[allow(unused)]
122    pub fn remove_want_block(&mut self, cid: &Cid) {
123        self.want.remove(cid);
124    }
125}
126
127impl From<&Message> for Vec<u8> {
128    fn from(val: &Message) -> Self {
129        let mut proto = bitswap_pb::Message::default();
130        let mut wantlist = bitswap_pb::message::Wantlist::default();
131        for (cid, priority) in val.want() {
132            let entry = bitswap_pb::message::wantlist::Entry {
133                block: cid.to_bytes(),
134                priority: *priority,
135                ..Default::default()
136            };
137            wantlist.entries.push(entry);
138        }
139        for cid in val.cancel() {
140            let entry = bitswap_pb::message::wantlist::Entry {
141                block: cid.to_bytes(),
142                cancel: true,
143                ..Default::default()
144            };
145            wantlist.entries.push(entry);
146        }
147        for block in val.blocks() {
148            let payload = bitswap_pb::message::Block {
149                prefix: Prefix::from(block.cid()).to_bytes(),
150                data: block.data().to_vec(),
151            };
152            proto.payload.push(payload);
153        }
154        if !wantlist.entries.is_empty() {
155            proto.wantlist = Some(wantlist);
156        }
157        let mut res = Vec::with_capacity(proto.encoded_len());
158        proto
159            .encode(&mut res)
160            .expect("there is no situation in which the protobuf message can be invalid");
161        res
162    }
163}
164
165impl Message {
166    /// Turns this `Message` into a message that can be sent to a substream.
167    pub fn to_bytes(&self) -> Vec<u8> {
168        self.into()
169    }
170
171    /// Creates a `Message` from bytes that were received from a substream.
172    pub fn from_bytes(bytes: &[u8]) -> Result<Self, BitswapError> {
173        Self::try_from(bytes)
174    }
175}
176
177impl From<()> for Message {
178    fn from(_: ()) -> Self {
179        Default::default()
180    }
181}
182
183impl TryFrom<&[u8]> for Message {
184    type Error = BitswapError;
185    fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
186        let proto: bitswap_pb::Message = bitswap_pb::Message::decode(bytes)?;
187        let mut message = Message::default();
188        for entry in proto.wantlist.unwrap_or_default().entries {
189            let cid = Cid::try_from(entry.block)?;
190            if entry.cancel {
191                message.cancel_block(&cid);
192            } else {
193                message.want_block(&cid, entry.priority);
194            }
195        }
196        for payload in proto.payload {
197            let prefix = Prefix::new(&payload.prefix)?;
198            let cid = prefix.to_cid(&payload.data)?;
199            let block = Block::new(cid, payload.data)?;
200            message.add_block(block);
201        }
202        Ok(message)
203    }
204}
205
206impl std::fmt::Debug for Message {
207    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
208        let mut first = true;
209        for (cid, priority) in self.want() {
210            if first {
211                first = false;
212            } else {
213                write!(fmt, ", ")?;
214            }
215            write!(fmt, "want: {cid} {priority}")?;
216        }
217        for cid in self.cancel() {
218            if first {
219                first = false;
220            } else {
221                write!(fmt, ", ")?;
222            }
223            write!(fmt, "cancel: {cid}")?;
224        }
225        for block in self.blocks() {
226            if first {
227                first = false;
228            } else {
229                write!(fmt, ", ")?;
230            }
231            write!(fmt, "block: {}", block.cid())?;
232        }
233
234        if first {
235            write!(fmt, "(empty message)")?;
236        }
237
238        Ok(())
239    }
240}