1use crate::bitswap_pb;
2use crate::block::Block;
3use crate::error::BitswapError;
4use crate::prefix::Prefix;
5use cid::Cid;
6use core::convert::TryFrom;
7use prost::Message as ProstMessage;
8use std::{
9 collections::{HashMap, HashSet},
10 mem,
11};
12
13pub type Priority = i32;
14
15#[derive(Debug, Default)]
17pub struct Ledger {
18 sent_want_list: HashMap<Cid, Priority>,
20 pub(crate) received_want_list: HashMap<Cid, Priority>,
22 message: Message,
24}
25
26impl Ledger {
27 pub fn new() -> Self {
29 Self::default()
30 }
31
32 pub fn add_block(&mut self, block: Block) {
33 self.message.add_block(block);
34 }
35
36 pub fn want_block(&mut self, cid: &Cid, priority: Priority) {
37 self.message.want_block(cid, priority);
38 }
39
40 pub fn cancel_block(&mut self, cid: &Cid) {
41 self.message.cancel_block(cid);
42 }
43
44 pub fn wantlist(&self) -> Vec<(Cid, Priority)> {
46 self.received_want_list
47 .iter()
48 .map(|(cid, prio)| (cid.clone(), *prio))
49 .collect()
50 }
51
52 pub fn send(&mut self) -> Option<Message> {
53 if self.message.is_empty() {
54 return None;
55 }
56 for cid in self.message.cancel() {
58 self.sent_want_list.remove(cid);
59 }
60 for (cid, priority) in self.message.want() {
61 self.sent_want_list.insert(cid.clone(), *priority);
62 }
63
64 Some(mem::take(&mut self.message))
65 }
66}
67
68#[derive(Clone, PartialEq, Default)]
70pub struct Message {
71 want: HashMap<Cid, Priority>,
73 cancel: HashSet<Cid>,
75 full: bool,
77 pub(crate) blocks: Vec<Block>,
79}
80
81impl Message {
82 pub fn is_empty(&self) -> bool {
84 self.want.is_empty() && self.cancel.is_empty() && self.blocks.is_empty()
85 }
86
87 pub fn blocks(&self) -> &[Block] {
89 &self.blocks
90 }
91
92 pub fn want(&self) -> &HashMap<Cid, Priority> {
94 &self.want
95 }
96
97 pub fn cancel(&self) -> &HashSet<Cid> {
99 &self.cancel
100 }
101
102 pub fn add_block(&mut self, block: Block) {
104 self.blocks.push(block);
105 }
106
107 pub fn remove_block(&mut self, cid: &Cid) {
109 self.blocks.retain(|block| block.cid() != cid);
110 }
111
112 pub fn want_block(&mut self, cid: &Cid, priority: Priority) {
114 self.want.insert(cid.to_owned(), priority);
115 }
116
117 pub fn cancel_block(&mut self, cid: &Cid) {
119 self.cancel.insert(cid.to_owned());
120 }
121
122 #[allow(unused)]
124 pub fn remove_want_block(&mut self, cid: &Cid) {
125 self.want.remove(cid);
126 }
127}
128
129impl Into<Vec<u8>> for &Message {
130 fn into(self) -> Vec<u8> {
131 let mut proto = bitswap_pb::Message::default();
132 let mut wantlist = bitswap_pb::message::Wantlist::default();
133 for (cid, priority) in self.want() {
134 let mut entry = bitswap_pb::message::wantlist::Entry::default();
135 entry.block = cid.to_bytes();
136 entry.priority = *priority;
137 wantlist.entries.push(entry);
138 }
139 for cid in self.cancel() {
140 let mut entry = bitswap_pb::message::wantlist::Entry::default();
141 entry.block = cid.to_bytes();
142 entry.cancel = true;
143 wantlist.entries.push(entry);
144 }
145 for block in self.blocks() {
146 let mut payload = bitswap_pb::message::Block::default();
147 payload.prefix = Prefix::from(block.cid()).to_bytes();
148 payload.data = block.data().to_vec();
149 proto.payload.push(payload);
150 }
151 if !wantlist.entries.is_empty() {
152 proto.wantlist = Some(wantlist);
153 }
154 let mut res = Vec::with_capacity(proto.encoded_len());
155 proto
156 .encode(&mut res)
157 .expect("there is no situation in which the protobuf message can be invalid");
158 res
159 }
160}
161
162impl Message {
163 pub fn to_bytes(&self) -> Vec<u8> {
165 self.into()
166 }
167
168 pub fn from_bytes(bytes: &[u8]) -> Result<Self, BitswapError> {
170 Self::try_from(bytes)
171 }
172}
173
174impl From<()> for Message {
175 fn from(_: ()) -> Self {
176 Default::default()
177 }
178}
179
180impl TryFrom<&[u8]> for Message {
181 type Error = BitswapError;
182 fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
183 let proto: bitswap_pb::Message = bitswap_pb::Message::decode(bytes)?;
184 let mut message = Message::default();
185 for entry in proto.wantlist.unwrap_or_default().entries {
186 let cid = Cid::try_from(entry.block)?;
187 if entry.cancel {
188 message.cancel_block(&cid);
189 } else {
190 message.want_block(&cid, entry.priority);
191 }
192 }
193 for payload in proto.payload {
194 let prefix = Prefix::new(&payload.prefix)?;
195 let cid = prefix.to_cid(&payload.data)?;
196 let block = Block {
197 cid,
198 data: payload.data.into_boxed_slice(),
199 };
200 message.add_block(block);
201 }
202 Ok(message)
203 }
204}
205
206impl std::fmt::Debug for Message {
207 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
208 let mut first = true;
209 for (cid, priority) in self.want() {
210 if first {
211 first = false;
212 } else {
213 write!(fmt, ", ")?;
214 }
215 write!(fmt, "want: {} {}", cid, priority)?;
216 }
217 for cid in self.cancel() {
218 if first {
219 first = false;
220 } else {
221 write!(fmt, ", ")?;
222 }
223 write!(fmt, "cancel: {}", cid)?;
224 }
225 for block in self.blocks() {
226 if first {
227 first = false;
228 } else {
229 write!(fmt, ", ")?;
230 }
231 write!(fmt, "block: {}", block.cid())?;
232 }
233
234 if first {
235 write!(fmt, "(empty message)")?;
236 }
237
238 Ok(())
239 }
240}