1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
use crate::{
cache::Cache,
common::ReceiverState,
dag_walk::{DagWalk, TraversedItem},
error::{Error, IncrementalVerificationError},
};
use bytes::Bytes;
use deterministic_bloom::runtime_size::BloomFilter;
use libipld_core::{
cid::Cid,
multihash::{Code, MultihashDigest},
};
use std::{collections::HashSet, matches};
use wnfs_common::BlockStore;
/// A data structure that keeps state about incremental DAG verification.
#[derive(Clone, Debug)]
pub struct IncrementalDagVerification {
/// All the CIDs that have been discovered to be missing from the DAG.
pub want_cids: HashSet<Cid>,
/// All the CIDs that are available locally.
pub have_cids: HashSet<Cid>,
}
/// The state of a block retrieval
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum BlockState {
/// The block was already received/is already stored
Have,
/// We know we will need this block
Want,
/// We don't know whether we'll need this block
Unexpected,
}
impl IncrementalDagVerification {
/// Initiate incremental DAG verification of given roots.
///
/// This will already run a traversal to find missing subgraphs and
/// CIDs that are already present.
pub async fn new(
roots: impl IntoIterator<Item = Cid>,
store: &impl BlockStore,
cache: &impl Cache,
) -> Result<Self, Error> {
let mut this = Self {
want_cids: roots.into_iter().collect(),
have_cids: HashSet::new(),
};
this.update_have_cids(store, cache).await?;
Ok(this)
}
/// Updates the state of incremental dag verification.
/// This goes through all "want" blocks and what they link to,
/// removing items that we now have and don't want anymore.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn update_have_cids(
&mut self,
store: &impl BlockStore,
cache: &impl Cache,
) -> Result<(), Error> {
let mut dag_walk = DagWalk::breadth_first(self.want_cids.iter().cloned());
while let Some(item) = dag_walk.next(store, cache).await? {
match item {
TraversedItem::Have(cid) => {
self.mark_as_have(cid);
}
TraversedItem::Missing(cid) => {
tracing::trace!(%cid, "Missing block, adding to want list");
self.mark_as_want(cid);
}
}
}
tracing::debug!(
num_want = self.want_cids.len(),
num_have = self.have_cids.len(),
"Finished dag verification"
);
Ok(())
}
fn mark_as_want(&mut self, want: Cid) {
if self.have_cids.contains(&want) {
tracing::warn!(%want, "Marking a CID as wanted, that we have previously marked as having!");
self.have_cids.remove(&want);
}
self.want_cids.insert(want);
}
fn mark_as_have(&mut self, have: Cid) {
self.want_cids.remove(&have);
self.have_cids.insert(have);
}
/// Check the state of a CID to find out whether
/// - we expect it as one of the next possible blocks to receive (Want)
/// - we have already stored it (Have)
/// - we don't know whether we need it (Unexpected)
pub fn block_state(&self, cid: Cid) -> BlockState {
if self.want_cids.contains(&cid) {
BlockState::Want
} else if self.have_cids.contains(&cid) {
BlockState::Have
} else {
BlockState::Unexpected
}
}
/// Verify that
/// - the block is part of the graph below the roots.
/// - the block hasn't been received before
/// - the block actually hashes to the hash from given CID and
///
/// And finally stores the block in the blockstore.
///
/// This *may* fail, even if the block is part of the graph below the roots,
/// if intermediate blocks between the roots and this block are missing.
///
/// This *may* add the block to the blockstore, but still fail to verify, specifically
/// if the block's bytes don't match the hash in the CID.
pub async fn verify_and_store_block(
&mut self,
block: (Cid, Bytes),
store: &impl BlockStore,
cache: &impl Cache,
) -> Result<(), Error> {
let (cid, bytes) = block;
let block_state = self.block_state(cid);
if !matches!(block_state, BlockState::Want) {
return Err(IncrementalVerificationError::ExpectedWantedBlock {
cid: Box::new(cid),
block_state,
}
.into());
}
let hash_func: Code = cid
.hash()
.code()
.try_into()
.map_err(|_| Error::UnsupportedHashCode { cid })?;
let hash = hash_func.digest(bytes.as_ref());
if &hash != cid.hash() {
let actual_cid = Cid::new_v1(cid.codec(), hash);
return Err(IncrementalVerificationError::DigestMismatch {
cid: Box::new(cid),
actual_cid: Box::new(actual_cid),
}
.into());
}
store
.put_block_keyed(cid, bytes)
.await
.map_err(Error::BlockStoreError)?;
self.update_have_cids(store, cache).await?;
Ok(())
}
/// Computes the receiver state for the current incremental dag verification state.
/// This takes the have CIDs and turns them into
pub fn into_receiver_state(self, bloom_fpr: fn(u64) -> f64) -> ReceiverState {
let missing_subgraph_roots = self.want_cids.into_iter().collect();
let bloom_capacity = self.have_cids.len() as u64;
if bloom_capacity == 0 {
return ReceiverState {
missing_subgraph_roots,
have_cids_bloom: None,
};
}
if missing_subgraph_roots.is_empty() {
// We're done. No need to compute a bloom.
return ReceiverState {
missing_subgraph_roots,
have_cids_bloom: None,
};
}
let target_fpr = bloom_fpr(bloom_capacity);
let mut bloom = BloomFilter::new_from_fpr_po2(bloom_capacity, target_fpr);
self.have_cids
.into_iter()
.for_each(|cid| bloom.insert(&cid.to_bytes()));
tracing::debug!(
inserted_elements = bloom_capacity,
size_bits = bloom.as_bytes().len() * 8,
hash_count = bloom.hash_count(),
ones_count = bloom.count_ones(),
target_fpr,
estimated_fpr = bloom.current_false_positive_rate(),
"built 'have cids' bloom",
);
ReceiverState {
missing_subgraph_roots,
have_cids_bloom: Some(bloom),
}
}
}