1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
use crate::{cache::Cache, common::references, error::Error};
use bytes::Bytes;
use futures::{stream::try_unfold, Stream};
use libipld_core::cid::Cid;
use std::collections::{HashSet, VecDeque};
use wnfs_common::{BlockStore, BlockStoreError};
/// A struct that represents an ongoing walk through the Dag.
#[derive(Clone, Debug)]
pub struct DagWalk {
/// A queue of CIDs to visit next
pub frontier: VecDeque<Cid>,
/// The set of already visited CIDs. This prevents re-visiting.
pub visited: HashSet<Cid>,
/// Whether to do a breadth-first or depth-first traversal.
/// This controls whether newly discovered links are appended or prepended to the frontier.
pub breadth_first: bool,
}
/// Represents the state that a traversed block was found in.
/// If it's `Have`, then
#[derive(Debug, Clone, Copy)]
pub enum TraversedItem {
/// The block is available locally, and further
/// links from this block will be traversed.
Have(Cid),
/// The block is not available locally, so its links
/// can't be followed.
Missing(Cid),
}
impl TraversedItem {
/// Return the CID of this traversed item. If the block for this CID
/// is missing, turn this item into the appropriate error.
pub fn to_cid(self) -> Result<Cid, Error> {
match self {
Self::Have(cid) => Ok(cid),
Self::Missing(cid) => Err(Error::BlockStoreError(BlockStoreError::CIDNotFound(cid))),
}
}
}
impl DagWalk {
/// Start a breadth-first traversal of given roots.
///
/// Breadth-first is explained the easiest in the simple case of a tree (which is a DAG):
/// It will visit each node in the tree layer-by-layer.
///
/// So the first nodes it will visit are going to be all roots in order.
pub fn breadth_first(roots: impl IntoIterator<Item = Cid>) -> Self {
Self::new(roots, true)
}
/// Start a depth-first traversal of given roots.
///
/// Depth-first will follow links immediately after discovering them, taking the fastest
/// path towards leaves.
///
/// The very first node is guaranteed to be the first root, but subsequent nodes may not be
/// from the initial roots.
pub fn depth_first(roots: impl IntoIterator<Item = Cid>) -> Self {
Self::new(roots, false)
}
/// Start a DAG traversal of given roots. See also `breadth_first` and `depth_first`.
pub fn new(roots: impl IntoIterator<Item = Cid>, breadth_first: bool) -> Self {
let frontier = roots.into_iter().collect();
let visited = HashSet::new();
Self {
frontier,
visited,
breadth_first,
}
}
fn frontier_next(&mut self) -> Option<Cid> {
loop {
let cid = if self.breadth_first {
self.frontier.pop_back()?
} else {
self.frontier.pop_front()?
};
// We loop until we find an unvisited block
if self.visited.insert(cid) {
return Some(cid);
}
}
}
/// Return the next node in the traversal.
///
/// Returns `None` if no nodes are left to be visited.
///
/// Returns `Some((cid, item_state))` where `cid` is the next block in
/// the traversal, and `item_state` tells you whether the block is available
/// in the blockstore locally. If not, its links won't be followed and the
/// traversal will be incomplete.
/// This is not an error! If you want this to be an error, consider using
/// `TraversedItem::to_cid`.
pub async fn next(
&mut self,
store: &impl BlockStore,
cache: &impl Cache,
) -> Result<Option<TraversedItem>, Error> {
let Some(cid) = self.frontier_next() else {
return Ok(None);
};
let has_block = store
.has_block(&cid)
.await
.map_err(Error::BlockStoreError)?;
if has_block {
let refs = cache
.references(cid, store)
.await
.map_err(Error::BlockStoreError)?;
for ref_cid in refs {
if !self.visited.contains(&ref_cid) {
self.frontier.push_front(ref_cid);
}
}
}
let item = if has_block {
TraversedItem::Have(cid)
} else {
TraversedItem::Missing(cid)
};
Ok(Some(item))
}
/// Turn this traversal into a stream
pub fn stream<'a>(
self,
store: &'a impl BlockStore,
cache: &'a impl Cache,
) -> impl Stream<Item = Result<TraversedItem, Error>> + Unpin + 'a {
Box::pin(try_unfold(self, move |mut this| async move {
let item = this.next(store, cache).await?;
Ok(item.map(|b| (b, this)))
}))
}
/// Turn this traversal into a stream that takes ownership of the store & cache.
///
/// In most cases `store` and `cache` should be cheaply-clonable types, so giving
/// the traversal ownership of them shouldn't be a big deal.
///
/// This helps with creating streams that are `: 'static`, which is useful for
/// anything that ends up being put into e.g. a tokio task.
pub fn stream_owned(
self,
store: impl BlockStore,
cache: impl Cache,
) -> impl Stream<Item = Result<TraversedItem, Error>> + Unpin {
Box::pin(try_unfold(
(self, store, cache),
move |(mut this, store, cache)| async move {
let item = this.next(&store, &cache).await?;
Ok(item.map(|b| (b, (this, store, cache))))
},
))
}
/// Find out whether the traversal is finished.
///
/// The next call to `next` would result in `None` if this returns true.
pub fn is_finished(&self) -> bool {
// We're finished if the frontier does not contain any CIDs that we have not visited yet.
// Put differently:
// We're not finished if there exist unvisited CIDs in the frontier.
!self
.frontier
.iter()
.any(|frontier_cid| !self.visited.contains(frontier_cid))
}
/// Skip a node from the traversal for now.
pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<(), Error> {
let (cid, bytes) = block;
let refs = references(cid, bytes, HashSet::new()).map_err(Error::ParsingError)?;
self.visited.insert(cid);
self.frontier
.retain(|frontier_cid| !refs.contains(frontier_cid));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::NoCache;
use futures::TryStreamExt;
use libipld::{cbor::DagCborCodec, Ipld};
use testresult::TestResult;
use wnfs_common::{encode, MemoryBlockStore};
#[test_log::test(async_std::test)]
async fn test_walk_dag_breadth_first() -> TestResult {
let store = &MemoryBlockStore::new();
// cid_root ---> cid_1_wrap ---> cid_1
// -> cid_2
// -> cid_3
let cid_1 = store
.put_block(
encode(&Ipld::String("1".into()), DagCborCodec)?,
DagCborCodec.into(),
)
.await?;
let cid_2 = store
.put_block(
encode(&Ipld::String("2".into()), DagCborCodec)?,
DagCborCodec.into(),
)
.await?;
let cid_3 = store
.put_block(
encode(&Ipld::String("3".into()), DagCborCodec)?,
DagCborCodec.into(),
)
.await?;
let cid_1_wrap = store
.put_block(
encode(&Ipld::List(vec![Ipld::Link(cid_1)]), DagCborCodec)?,
DagCborCodec.into(),
)
.await?;
let cid_root = store
.put_block(
encode(
&Ipld::List(vec![
Ipld::Link(cid_1_wrap),
Ipld::Link(cid_2),
Ipld::Link(cid_3),
]),
DagCborCodec,
)?,
DagCborCodec.into(),
)
.await?;
let cids = DagWalk::breadth_first([cid_root])
.stream(store, &NoCache)
.and_then(|item| async move { item.to_cid() })
.try_collect::<Vec<_>>()
.await?
.into_iter()
.collect::<Vec<_>>();
assert_eq!(cids, vec![cid_root, cid_1_wrap, cid_2, cid_3, cid_1]);
Ok(())
}
}
#[cfg(test)]
mod proptests {
use super::*;
use crate::{cache::NoCache, test_utils::arb_ipld_dag};
use futures::TryStreamExt;
use libipld::{
multihash::{Code, MultihashDigest},
Ipld, IpldCodec,
};
use proptest::strategy::Strategy;
use std::collections::BTreeSet;
use test_strategy::proptest;
use wnfs_common::{encode, MemoryBlockStore};
fn ipld_dags() -> impl Strategy<Value = (Vec<(Cid, Ipld)>, Cid)> {
arb_ipld_dag(1..256, 0.5, |cids, _| {
let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect());
let cid = Cid::new_v1(
IpldCodec::DagCbor.into(),
Code::Blake3_256.digest(&encode(&ipld, IpldCodec::DagCbor).unwrap()),
);
(cid, ipld)
})
}
#[proptest(max_shrink_iters = 100_000)]
fn walk_dag_never_iterates_block_twice(#[strategy(ipld_dags())] dag: (Vec<(Cid, Ipld)>, Cid)) {
async_std::task::block_on(async {
let (dag, root) = dag;
let store = &MemoryBlockStore::new();
for (cid, ipld) in dag.iter() {
let block: Bytes = encode(ipld, IpldCodec::DagCbor).unwrap().into();
let cid_store = store
.put_block(block, IpldCodec::DagCbor.into())
.await
.unwrap();
assert_eq!(*cid, cid_store);
}
let mut cids = DagWalk::breadth_first([root])
.stream(store, &NoCache)
.and_then(|item| async move { item.to_cid() })
.try_collect::<Vec<_>>()
.await
.unwrap();
cids.sort();
let unique_cids = cids
.iter()
.cloned()
.collect::<BTreeSet<_>>()
.into_iter()
.collect::<Vec<_>>();
assert_eq!(cids, unique_cids);
});
}
}