1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//! helper methods to stream trees
use crate::{
index::NodeInfo,
store::{BanyanValue, ReadOnlyStore},
tree::Tree,
util::{take_until_condition, ToStreamExt},
};
use super::{FilteredChunk, Forest, TreeTypes};
use crate::query::*;
use futures::executor::ThreadPool;
use futures::prelude::*;
use std::sync::atomic::Ordering;
use std::{ops::RangeInclusive, sync::atomic::AtomicU64, sync::Arc};
impl<T: TreeTypes, R: ReadOnlyStore<T::Link>> Forest<T, R> {
/// Given a sequence of roots, will stream matching events in ascending order indefinitely.
///
/// This is implemented by calling stream_trees_chunked and just flattening the chunks.
pub fn stream_trees<Q, S, V>(
&self,
query: Q,
trees: S,
) -> impl Stream<Item = anyhow::Result<(u64, T::Key, V)>> + Send
where
Q: Query<T> + Clone,
S: Stream<Item = Tree<T, V>> + Send + 'static,
V: BanyanValue,
{
self.stream_trees_chunked(query, trees, 0..=u64::max_value(), &|_| ())
.map_ok(|chunk| stream::iter(chunk.data.into_iter().map(Ok)))
.try_flatten()
}
/// Given a sequence of roots, will stream chunks in ascending order until it arrives at `range.end()`.
/// - query: the query
/// - roots: the stream of roots. It is assumed that trees later in this stream will be bigger
/// - range: the range which to stream. It is up to the caller to ensure that we have events for this range.
/// - mk_extra: a fn that allows to compute extra info from indices.
/// this can be useful to get progress info even if the query does not match any events
pub fn stream_trees_chunked<S, Q, V, E, F>(
&self,
query: Q,
trees: S,
range: RangeInclusive<u64>,
mk_extra: &'static F,
) -> impl Stream<Item = anyhow::Result<FilteredChunk<(u64, T::Key, V), E>>> + Send + 'static
where
S: Stream<Item = Tree<T, V>> + Send + 'static,
Q: Query<T> + Clone,
V: BanyanValue,
E: Send + 'static,
F: Send + Sync + 'static + Fn(&NodeInfo<T, R>) -> E,
{
let end = *range.end();
let start_offset_ref = Arc::new(AtomicU64::new(*range.start()));
let forest = self.clone();
let result = trees
.filter_map(move |tree| future::ready(tree.into_inner()))
.flat_map(move |(index, secrets, _)| {
// create an intersection of a range query and the main query
// and wrap it in an arc so it is cheap to clone
let range = start_offset_ref.load(Ordering::SeqCst)..=*range.end();
let query = AndQuery(OffsetRangeQuery::from(range), query.clone()).boxed();
let start_offset_ref = start_offset_ref.clone();
forest
.stream_filtered_chunked0(secrets, query, index, mk_extra)
.take_while(move |result| {
if let Ok(chunk) = result {
// update the offset
start_offset_ref.fetch_max(chunk.range.end, Ordering::SeqCst);
}
// abort at the first non-ok offset
future::ready(result.is_ok())
})
});
// make sure we terminate when `end` is reached
take_until_condition(result, move |chunk| match chunk {
// end is inclusive, whereas `chunk.range.end` is exclusive
Ok(chunk) => chunk.range.end > end,
Err(_) => true,
})
}
/// Given a sequence of roots, will stream chunks in ascending order indefinitely.
///
/// Note that this method has no way to know when the query is done. So ending this stream,
/// if desired, will have to be done by the caller using e.g. `take_while(...)`.
/// - query: the query
/// - roots: the stream of roots. It is assumed that trees later in this stream will be bigger
/// - range: the range which to stream. It is up to the caller to ensure that we have events for this range.
/// - mk_extra: a fn that allows to compute extra info from indices.
/// this can be useful to get progress info even if the query does not match any events
pub fn stream_trees_chunked_threaded<S, Q, V, E, F>(
&self,
query: Q,
trees: S,
range: RangeInclusive<u64>,
mk_extra: &'static F,
thread_pool: ThreadPool,
) -> impl Stream<Item = anyhow::Result<FilteredChunk<(u64, T::Key, V), E>>> + Send + 'static
where
S: Stream<Item = Tree<T, V>> + Send + 'static,
Q: Query<T> + Clone,
V: BanyanValue,
E: Send + 'static,
F: Send + Sync + 'static + Fn(&NodeInfo<T, R>) -> E,
{
let offset = Arc::new(AtomicU64::new(*range.start()));
let forest = self.clone();
trees
.filter_map(move |tree| future::ready(tree.into_inner()))
.flat_map(move |(index, secrets, _)| {
// create an intersection of a range query and the main query
// and wrap it in an arc so it is cheap to clone
let range = offset.load(Ordering::SeqCst)..=*range.end();
let query = AndQuery(OffsetRangeQuery::from(range), query.clone()).boxed();
let offset = offset.clone();
let iter = forest
.clone()
.traverse0(secrets, query, index, mk_extra)
.take_while(move |result| {
if let Ok(chunk) = result {
// update the offset
offset.fetch_max(chunk.range.end, Ordering::SeqCst);
}
// abort at the first non-ok offset
result.is_ok()
});
iter.into_stream(1, thread_pool.clone())
})
}
/// Given a sequence of roots, will stream chunks in reverse order until it arrives at `range.start()`.
///
/// Values within chunks are in ascending offset order, so if you flatten them you have to reverse them first.
/// - query: the query
/// - trees: the stream of roots. It is assumed that trees later in this stream will be bigger
/// - range: the range which to stream. It is up to the caller to ensure that we have events for this range.
/// - mk_extra: a fn that allows to compute extra info from indices.
/// this can be useful to get progress info even if the query does not match any events
pub fn stream_trees_chunked_reverse<S, Q, V, E, F>(
&self,
query: Q,
trees: S,
range: RangeInclusive<u64>,
mk_extra: &'static F,
) -> impl Stream<Item = anyhow::Result<FilteredChunk<(u64, T::Key, V), E>>> + Send + 'static
where
S: Stream<Item = Tree<T, V>> + Send + 'static,
Q: Query<T> + Clone,
V: BanyanValue,
E: Send + 'static,
F: Send + Sync + 'static + Fn(&NodeInfo<T, R>) -> E,
{
let start = *range.start();
let end_offset_ref = Arc::new(AtomicU64::new(*range.end()));
let forest = self.clone();
let result = trees
.filter_map(move |tree| future::ready(tree.into_inner()))
.flat_map(move |(index, secrets, _)| {
let end_offset = end_offset_ref.load(Ordering::SeqCst);
// create an intersection of a range query and the main query
// and wrap it in an arc so it is cheap to clone
let query = AndQuery(
OffsetRangeQuery::from(*range.start()..=end_offset),
query.clone(),
)
.boxed();
let end_offset_ref = end_offset_ref.clone();
forest
.stream_filtered_chunked_reverse0(&secrets, query, index, mk_extra)
.take_while(move |result| {
if let Ok(chunk) = result {
// update the end offset from the start of what we got
end_offset_ref.fetch_min(chunk.range.start, Ordering::SeqCst);
}
// abort at the first non-ok offset
future::ready(result.is_ok())
})
});
// make sure we terminate when `start` is reached
take_until_condition(result, move |chunk| match chunk {
Ok(chunk) => chunk.range.start <= start,
Err(_) => true,
})
}
}