1use std::any::Any;
9use std::fmt;
10use std::sync::Arc;
11
12use futures::lock::Mutex;
13use futures::lock::MutexGuard;
14use futures::StreamExt;
15use indexmap::IndexSet;
16
17use super::AsyncSetQuery;
18use super::BoxVertexStream;
19use super::Hints;
20use crate::Result;
21use crate::Vertex;
22
23pub struct LazySet {
25 inner: Arc<Mutex<Inner>>,
26 hints: Hints,
27}
28
29struct Inner {
30 iter: BoxVertexStream,
31 visited: IndexSet<Vertex>,
32 state: State,
33}
34
35impl Inner {
36 async fn load_more(&mut self, n: usize, mut out: Option<&mut Vec<Vertex>>) -> Result<()> {
37 if matches!(self.state, State::Complete | State::Error) {
38 return Ok(());
39 }
40 for _ in 0..n {
41 match self.iter.next().await {
42 Some(Ok(name)) => {
43 if let Some(ref mut out) = out {
44 out.push(name.clone());
45 }
46 self.visited.insert(name);
47 }
48 None => {
49 self.state = State::Complete;
50 break;
51 }
52 Some(Err(err)) => {
53 self.state = State::Error;
54 return Err(err);
55 }
56 }
57 }
58 Ok(())
59 }
60}
61
62#[derive(Copy, Clone, Debug, PartialEq)]
63enum State {
64 Incomplete,
65 Complete,
66 Error,
67}
68
69pub struct Iter {
70 inner: Arc<Mutex<Inner>>,
71 index: usize,
72}
73
74impl Iter {
75 async fn next(&mut self) -> Option<Result<Vertex>> {
76 loop {
77 let mut inner = self.inner.lock().await;
78 match inner.state {
79 State::Error => break None,
80 State::Complete if inner.visited.len() <= self.index => break None,
81 State::Complete | State::Incomplete => {
82 let value = inner.visited.get_index(self.index).cloned();
83 match value {
84 Some(value) => {
85 self.index += 1;
86 break Some(Ok(value));
87 }
88 None => {
89 if let Err(err) = inner.load_more(1, None).await {
91 return Some(Err(err));
92 }
93 continue;
94 }
95 }
96 }
97 }
98 }
99 }
100
101 fn into_stream(self) -> BoxVertexStream {
102 Box::pin(futures::stream::unfold(self, |mut state| async move {
103 let result = state.next().await;
104 result.map(|r| (r, state))
105 }))
106 }
107}
108
109impl fmt::Debug for LazySet {
110 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111 f.write_str("<lazy ")?;
112 if let Some(inner) = self.inner.try_lock() {
113 let limit = f.width().unwrap_or(3);
114 f.debug_list()
115 .entries(inner.visited.iter().take(limit))
116 .finish()?;
117 let remaining = inner.visited.len().max(limit) - limit;
118 match (remaining, inner.state) {
119 (0, State::Incomplete) => f.write_str(" + ? more")?,
120 (n, State::Incomplete) => write!(f, "+ {} + ? more", n)?,
121 (0, _) => {}
122 (n, _) => write!(f, " + {} more", n)?,
123 }
124 } else {
125 f.write_str(" ?")?;
126 }
127 f.write_str(">")?;
128 Ok(())
129 }
130}
131
132impl LazySet {
133 pub fn from_iter<I>(names: I, hints: Hints) -> Self
134 where
135 I: IntoIterator<Item = Result<Vertex>> + 'static,
136 <I as IntoIterator>::IntoIter: Send + Sync,
137 {
138 let stream = futures::stream::iter(names);
139 Self::from_stream(Box::pin(stream), hints)
140 }
141
142 pub fn from_stream(names: BoxVertexStream, hints: Hints) -> Self {
143 let inner = Inner {
144 iter: names,
145 visited: IndexSet::new(),
146 state: State::Incomplete,
147 };
148 Self {
149 inner: Arc::new(Mutex::new(inner)),
150 hints,
151 }
152 }
153
154 async fn load_all(&self) -> Result<MutexGuard<'_, Inner>> {
155 let mut inner = self.inner.lock().await;
156 inner.load_more(usize::max_value(), None).await?;
157 Ok(inner)
158 }
159}
160
161#[async_trait::async_trait]
162impl AsyncSetQuery for LazySet {
163 async fn iter(&self) -> Result<BoxVertexStream> {
164 let inner = self.inner.clone();
165 let iter = Iter { inner, index: 0 };
166 Ok(iter.into_stream())
167 }
168
169 async fn iter_rev(&self) -> Result<BoxVertexStream> {
170 let inner = self.load_all().await?;
171 let iter = inner.visited.clone().into_iter().rev().map(Ok);
172 let stream = futures::stream::iter(iter);
173 Ok(Box::pin(stream))
174 }
175
176 async fn count_slow(&self) -> Result<u64> {
177 let inner = self.load_all().await?;
178 Ok(inner.visited.len().try_into()?)
179 }
180
181 async fn size_hint(&self) -> (u64, Option<u64>) {
182 let inner = self.inner.lock().await;
183 let min = inner.visited.len() as u64;
184 let max = match inner.state {
185 State::Incomplete => None,
186 State::Complete => Some(min as u64),
187 State::Error => None,
188 };
189 (min, max)
190 }
191
192 async fn last(&self) -> Result<Option<Vertex>> {
193 let inner = self.load_all().await?;
194 Ok(inner.visited.iter().rev().nth(0).cloned())
195 }
196
197 async fn contains(&self, name: &Vertex) -> Result<bool> {
198 let mut inner = self.inner.lock().await;
199 if inner.visited.contains(name) {
200 return Ok(true);
201 } else {
202 let mut loaded = Vec::new();
203 loop {
204 loaded.clear();
205 inner.load_more(1, Some(&mut loaded)).await?;
206 debug_assert!(loaded.len() <= 1);
207 if loaded.is_empty() {
208 break;
209 }
210 if loaded.first() == Some(name) {
211 return Ok(true);
212 }
213 }
214 }
215 Ok(false)
216 }
217
218 async fn contains_fast(&self, name: &Vertex) -> Result<Option<bool>> {
219 let inner = self.inner.lock().await;
220 if inner.visited.contains(name) {
221 return Ok(Some(true));
222 } else if inner.state != State::Incomplete {
223 return Ok(Some(false));
224 }
225 Ok(None)
226 }
227
228 fn as_any(&self) -> &dyn Any {
229 self
230 }
231
232 fn hints(&self) -> &Hints {
233 &self.hints
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use std::collections::HashSet;
240
241 use super::super::tests::*;
242 use super::*;
243
244 fn lazy_set(a: &[u8]) -> LazySet {
245 LazySet::from_iter(
246 a.to_vec().into_iter().map(|b| Ok(to_name(b))),
247 Hints::default(),
248 )
249 }
250
251 #[test]
252 fn test_lazy_basic() -> Result<()> {
253 let set = lazy_set(b"\x11\x33\x22\x77\x22\x55\x11");
254 check_invariants(&set)?;
255 assert_eq!(shorten_iter(ni(set.iter())), ["11", "33", "22", "77", "55"]);
256 assert_eq!(
257 shorten_iter(ni(set.iter_rev())),
258 ["55", "77", "22", "33", "11"]
259 );
260 assert!(!nb(set.is_empty())?);
261 assert_eq!(nb(set.count_slow())?, 5);
262 assert_eq!(shorten_name(nb(set.first())?.unwrap()), "11");
263 assert_eq!(shorten_name(nb(set.last())?.unwrap()), "55");
264 Ok(())
265 }
266
267 #[test]
268 fn test_debug() {
269 let set = lazy_set(b"");
270 assert_eq!(dbg(&set), "<lazy [] + ? more>");
271 nb(set.count_slow()).unwrap();
272 assert_eq!(dbg(&set), "<lazy []>");
273
274 let set = lazy_set(b"\x11\x33\x22");
275 assert_eq!(dbg(&set), "<lazy [] + ? more>");
276 let mut iter = ni(set.iter()).unwrap();
277 iter.next();
278 assert_eq!(dbg(&set), "<lazy [1111] + ? more>");
279 iter.next();
280 assert_eq!(dbg(&set), "<lazy [1111, 3333] + ? more>");
281 iter.next();
282 assert_eq!(format!("{:2.2?}", &set), "<lazy [11, 33]+ 1 + ? more>");
283 iter.next();
284 assert_eq!(format!("{:1.3?}", &set), "<lazy [111] + 2 more>");
285 }
286
287 #[test]
288 fn test_lazy() -> Result<()> {
289 let set = lazy_set(b"\x11\x33\x22");
290 assert_eq!(nb(set.size_hint()), (0, None));
291 assert!(!nb(set.is_empty())?);
293 assert_eq!(nb(set.size_hint()), (1, None));
294 assert_eq!(nb(set.count_slow())?, 3);
296 assert_eq!(nb(set.size_hint()), (3, Some(3)));
297 Ok(())
298 }
299
300 quickcheck::quickcheck! {
301 fn test_lazy_quickcheck(a: Vec<u8>) -> bool {
302 let set = lazy_set(&a);
303 check_invariants(&set).unwrap();
304
305 let count = nb(set.count_slow()).unwrap() as usize;
306 assert!(count <= a.len());
307
308 let set2: HashSet<_> = a.iter().cloned().collect();
309 assert_eq!(count, set2.len());
310
311 true
312 }
313 }
314}