rust_ipfs_bitswap/
ledger.rs1use crate::bitswap_pb;
2use crate::block::Block;
3use crate::error::BitswapError;
4use crate::prefix::Prefix;
5use core::convert::TryFrom;
6use hash_hasher::{HashedMap, HashedSet};
7use libipld::Cid;
8use prost::Message as ProstMessage;
9use std::mem;
10
11pub type Priority = i32;
12
13#[derive(Debug, Default)]
15pub struct Ledger {
16 sent_want_list: HashedMap<Cid, Priority>,
18 pub(crate) received_want_list: HashedMap<Cid, Priority>,
20 message: Message,
22}
23
24impl Ledger {
25 pub fn new() -> Self {
27 Self::default()
28 }
29
30 pub fn add_block(&mut self, block: Block) {
31 self.message.add_block(block);
32 }
33
34 pub fn want_block(&mut self, cid: &Cid, priority: Priority) {
35 self.message.want_block(cid, priority);
36 }
37
38 pub fn cancel_block(&mut self, cid: &Cid) {
39 self.message.cancel_block(cid);
40 }
41
42 pub fn wantlist(&self) -> Vec<(Cid, Priority)> {
44 self.received_want_list
45 .iter()
46 .map(|(cid, prio)| (*cid, *prio))
47 .collect()
48 }
49
50 pub fn send(&mut self) -> Option<Message> {
51 if self.message.is_empty() {
52 return None;
53 }
54 for cid in self.message.cancel() {
56 self.sent_want_list.remove(cid);
57 }
58 for (cid, priority) in self.message.want() {
59 self.sent_want_list.insert(*cid, *priority);
60 }
61
62 Some(mem::take(&mut self.message))
63 }
64}
65
66#[derive(Clone, PartialEq, Eq, Default)]
68pub struct Message {
69 want: HashedMap<Cid, Priority>,
71 cancel: HashedSet<Cid>,
73 full: bool,
75 pub(crate) blocks: Vec<Block>,
77}
78
79impl Message {
80 pub fn is_empty(&self) -> bool {
82 self.want.is_empty() && self.cancel.is_empty() && self.blocks.is_empty()
83 }
84
85 pub fn blocks(&self) -> &[Block] {
87 &self.blocks
88 }
89
90 pub fn want(&self) -> &HashedMap<Cid, Priority> {
92 &self.want
93 }
94
95 pub fn cancel(&self) -> &HashedSet<Cid> {
97 &self.cancel
98 }
99
100 pub fn add_block(&mut self, block: Block) {
102 self.blocks.push(block);
103 }
104
105 pub fn remove_block(&mut self, cid: &Cid) {
107 self.blocks.retain(|block| block.cid() != cid);
108 }
109
110 pub fn want_block(&mut self, cid: &Cid, priority: Priority) {
112 self.want.insert(cid.to_owned(), priority);
113 }
114
115 pub fn cancel_block(&mut self, cid: &Cid) {
117 self.cancel.insert(cid.to_owned());
118 }
119
120 #[allow(unused)]
122 pub fn remove_want_block(&mut self, cid: &Cid) {
123 self.want.remove(cid);
124 }
125}
126
127impl From<&Message> for Vec<u8> {
128 fn from(val: &Message) -> Self {
129 let mut proto = bitswap_pb::Message::default();
130 let mut wantlist = bitswap_pb::message::Wantlist::default();
131 for (cid, priority) in val.want() {
132 let entry = bitswap_pb::message::wantlist::Entry {
133 block: cid.to_bytes(),
134 priority: *priority,
135 ..Default::default()
136 };
137 wantlist.entries.push(entry);
138 }
139 for cid in val.cancel() {
140 let entry = bitswap_pb::message::wantlist::Entry {
141 block: cid.to_bytes(),
142 cancel: true,
143 ..Default::default()
144 };
145 wantlist.entries.push(entry);
146 }
147 for block in val.blocks() {
148 let payload = bitswap_pb::message::Block {
149 prefix: Prefix::from(block.cid()).to_bytes(),
150 data: block.data().to_vec(),
151 };
152 proto.payload.push(payload);
153 }
154 if !wantlist.entries.is_empty() {
155 proto.wantlist = Some(wantlist);
156 }
157 let mut res = Vec::with_capacity(proto.encoded_len());
158 proto
159 .encode(&mut res)
160 .expect("there is no situation in which the protobuf message can be invalid");
161 res
162 }
163}
164
165impl Message {
166 pub fn to_bytes(&self) -> Vec<u8> {
168 self.into()
169 }
170
171 pub fn from_bytes(bytes: &[u8]) -> Result<Self, BitswapError> {
173 Self::try_from(bytes)
174 }
175}
176
177impl From<()> for Message {
178 fn from(_: ()) -> Self {
179 Default::default()
180 }
181}
182
183impl TryFrom<&[u8]> for Message {
184 type Error = BitswapError;
185 fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
186 let proto: bitswap_pb::Message = bitswap_pb::Message::decode(bytes)?;
187 let mut message = Message::default();
188 for entry in proto.wantlist.unwrap_or_default().entries {
189 let cid = Cid::try_from(entry.block)?;
190 if entry.cancel {
191 message.cancel_block(&cid);
192 } else {
193 message.want_block(&cid, entry.priority);
194 }
195 }
196 for payload in proto.payload {
197 let prefix = Prefix::new(&payload.prefix)?;
198 let cid = prefix.to_cid(&payload.data)?;
199 let block = Block::new(cid, payload.data)?;
200 message.add_block(block);
201 }
202 Ok(message)
203 }
204}
205
206impl std::fmt::Debug for Message {
207 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
208 let mut first = true;
209 for (cid, priority) in self.want() {
210 if first {
211 first = false;
212 } else {
213 write!(fmt, ", ")?;
214 }
215 write!(fmt, "want: {cid} {priority}")?;
216 }
217 for cid in self.cancel() {
218 if first {
219 first = false;
220 } else {
221 write!(fmt, ", ")?;
222 }
223 write!(fmt, "cancel: {cid}")?;
224 }
225 for block in self.blocks() {
226 if first {
227 first = false;
228 } else {
229 write!(fmt, ", ")?;
230 }
231 write!(fmt, "block: {}", block.cid())?;
232 }
233
234 if first {
235 write!(fmt, "(empty message)")?;
236 }
237
238 Ok(())
239 }
240}