Skip to main content

dag/set/
lazy.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::any::Any;
9use std::fmt;
10use std::sync::Arc;
11
12use futures::lock::Mutex;
13use futures::lock::MutexGuard;
14use futures::StreamExt;
15use indexmap::IndexSet;
16
17use super::AsyncSetQuery;
18use super::BoxVertexStream;
19use super::Hints;
20use crate::Result;
21use crate::Vertex;
22
23/// A set backed by a lazy iterator of names.
24pub struct LazySet {
25    inner: Arc<Mutex<Inner>>,
26    hints: Hints,
27}
28
29struct Inner {
30    iter: BoxVertexStream,
31    visited: IndexSet<Vertex>,
32    state: State,
33}
34
35impl Inner {
36    async fn load_more(&mut self, n: usize, mut out: Option<&mut Vec<Vertex>>) -> Result<()> {
37        if matches!(self.state, State::Complete | State::Error) {
38            return Ok(());
39        }
40        for _ in 0..n {
41            match self.iter.next().await {
42                Some(Ok(name)) => {
43                    if let Some(ref mut out) = out {
44                        out.push(name.clone());
45                    }
46                    self.visited.insert(name);
47                }
48                None => {
49                    self.state = State::Complete;
50                    break;
51                }
52                Some(Err(err)) => {
53                    self.state = State::Error;
54                    return Err(err);
55                }
56            }
57        }
58        Ok(())
59    }
60}
61
62#[derive(Copy, Clone, Debug, PartialEq)]
63enum State {
64    Incomplete,
65    Complete,
66    Error,
67}
68
69pub struct Iter {
70    inner: Arc<Mutex<Inner>>,
71    index: usize,
72}
73
74impl Iter {
75    async fn next(&mut self) -> Option<Result<Vertex>> {
76        loop {
77            let mut inner = self.inner.lock().await;
78            match inner.state {
79                State::Error => break None,
80                State::Complete if inner.visited.len() <= self.index => break None,
81                State::Complete | State::Incomplete => {
82                    let value = inner.visited.get_index(self.index).cloned();
83                    match value {
84                        Some(value) => {
85                            self.index += 1;
86                            break Some(Ok(value));
87                        }
88                        None => {
89                            // Data not available. Load more.
90                            if let Err(err) = inner.load_more(1, None).await {
91                                return Some(Err(err));
92                            }
93                            continue;
94                        }
95                    }
96                }
97            }
98        }
99    }
100
101    fn into_stream(self) -> BoxVertexStream {
102        Box::pin(futures::stream::unfold(self, |mut state| async move {
103            let result = state.next().await;
104            result.map(|r| (r, state))
105        }))
106    }
107}
108
109impl fmt::Debug for LazySet {
110    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111        f.write_str("<lazy ")?;
112        if let Some(inner) = self.inner.try_lock() {
113            let limit = f.width().unwrap_or(3);
114            f.debug_list()
115                .entries(inner.visited.iter().take(limit))
116                .finish()?;
117            let remaining = inner.visited.len().max(limit) - limit;
118            match (remaining, inner.state) {
119                (0, State::Incomplete) => f.write_str(" + ? more")?,
120                (n, State::Incomplete) => write!(f, "+ {} + ? more", n)?,
121                (0, _) => {}
122                (n, _) => write!(f, " + {} more", n)?,
123            }
124        } else {
125            f.write_str(" ?")?;
126        }
127        f.write_str(">")?;
128        Ok(())
129    }
130}
131
132impl LazySet {
133    pub fn from_iter<I>(names: I, hints: Hints) -> Self
134    where
135        I: IntoIterator<Item = Result<Vertex>> + 'static,
136        <I as IntoIterator>::IntoIter: Send + Sync,
137    {
138        let stream = futures::stream::iter(names);
139        Self::from_stream(Box::pin(stream), hints)
140    }
141
142    pub fn from_stream(names: BoxVertexStream, hints: Hints) -> Self {
143        let inner = Inner {
144            iter: names,
145            visited: IndexSet::new(),
146            state: State::Incomplete,
147        };
148        Self {
149            inner: Arc::new(Mutex::new(inner)),
150            hints,
151        }
152    }
153
154    async fn load_all(&self) -> Result<MutexGuard<'_, Inner>> {
155        let mut inner = self.inner.lock().await;
156        inner.load_more(usize::max_value(), None).await?;
157        Ok(inner)
158    }
159}
160
161#[async_trait::async_trait]
162impl AsyncSetQuery for LazySet {
163    async fn iter(&self) -> Result<BoxVertexStream> {
164        let inner = self.inner.clone();
165        let iter = Iter { inner, index: 0 };
166        Ok(iter.into_stream())
167    }
168
169    async fn iter_rev(&self) -> Result<BoxVertexStream> {
170        let inner = self.load_all().await?;
171        let iter = inner.visited.clone().into_iter().rev().map(Ok);
172        let stream = futures::stream::iter(iter);
173        Ok(Box::pin(stream))
174    }
175
176    async fn count_slow(&self) -> Result<u64> {
177        let inner = self.load_all().await?;
178        Ok(inner.visited.len().try_into()?)
179    }
180
181    async fn size_hint(&self) -> (u64, Option<u64>) {
182        let inner = self.inner.lock().await;
183        let min = inner.visited.len() as u64;
184        let max = match inner.state {
185            State::Incomplete => None,
186            State::Complete => Some(min as u64),
187            State::Error => None,
188        };
189        (min, max)
190    }
191
192    async fn last(&self) -> Result<Option<Vertex>> {
193        let inner = self.load_all().await?;
194        Ok(inner.visited.iter().rev().nth(0).cloned())
195    }
196
197    async fn contains(&self, name: &Vertex) -> Result<bool> {
198        let mut inner = self.inner.lock().await;
199        if inner.visited.contains(name) {
200            return Ok(true);
201        } else {
202            let mut loaded = Vec::new();
203            loop {
204                loaded.clear();
205                inner.load_more(1, Some(&mut loaded)).await?;
206                debug_assert!(loaded.len() <= 1);
207                if loaded.is_empty() {
208                    break;
209                }
210                if loaded.first() == Some(name) {
211                    return Ok(true);
212                }
213            }
214        }
215        Ok(false)
216    }
217
218    async fn contains_fast(&self, name: &Vertex) -> Result<Option<bool>> {
219        let inner = self.inner.lock().await;
220        if inner.visited.contains(name) {
221            return Ok(Some(true));
222        } else if inner.state != State::Incomplete {
223            return Ok(Some(false));
224        }
225        Ok(None)
226    }
227
228    fn as_any(&self) -> &dyn Any {
229        self
230    }
231
232    fn hints(&self) -> &Hints {
233        &self.hints
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use std::collections::HashSet;
240
241    use super::super::tests::*;
242    use super::*;
243
244    fn lazy_set(a: &[u8]) -> LazySet {
245        LazySet::from_iter(
246            a.to_vec().into_iter().map(|b| Ok(to_name(b))),
247            Hints::default(),
248        )
249    }
250
251    #[test]
252    fn test_lazy_basic() -> Result<()> {
253        let set = lazy_set(b"\x11\x33\x22\x77\x22\x55\x11");
254        check_invariants(&set)?;
255        assert_eq!(shorten_iter(ni(set.iter())), ["11", "33", "22", "77", "55"]);
256        assert_eq!(
257            shorten_iter(ni(set.iter_rev())),
258            ["55", "77", "22", "33", "11"]
259        );
260        assert!(!nb(set.is_empty())?);
261        assert_eq!(nb(set.count_slow())?, 5);
262        assert_eq!(shorten_name(nb(set.first())?.unwrap()), "11");
263        assert_eq!(shorten_name(nb(set.last())?.unwrap()), "55");
264        Ok(())
265    }
266
267    #[test]
268    fn test_debug() {
269        let set = lazy_set(b"");
270        assert_eq!(dbg(&set), "<lazy [] + ? more>");
271        nb(set.count_slow()).unwrap();
272        assert_eq!(dbg(&set), "<lazy []>");
273
274        let set = lazy_set(b"\x11\x33\x22");
275        assert_eq!(dbg(&set), "<lazy [] + ? more>");
276        let mut iter = ni(set.iter()).unwrap();
277        iter.next();
278        assert_eq!(dbg(&set), "<lazy [1111] + ? more>");
279        iter.next();
280        assert_eq!(dbg(&set), "<lazy [1111, 3333] + ? more>");
281        iter.next();
282        assert_eq!(format!("{:2.2?}", &set), "<lazy [11, 33]+ 1 + ? more>");
283        iter.next();
284        assert_eq!(format!("{:1.3?}", &set), "<lazy [111] + 2 more>");
285    }
286
287    #[test]
288    fn test_lazy() -> Result<()> {
289        let set = lazy_set(b"\x11\x33\x22");
290        assert_eq!(nb(set.size_hint()), (0, None));
291        // is_empty() reads one next item.
292        assert!(!nb(set.is_empty())?);
293        assert_eq!(nb(set.size_hint()), (1, None));
294        // count() reads all items.
295        assert_eq!(nb(set.count_slow())?, 3);
296        assert_eq!(nb(set.size_hint()), (3, Some(3)));
297        Ok(())
298    }
299
300    quickcheck::quickcheck! {
301        fn test_lazy_quickcheck(a: Vec<u8>) -> bool {
302            let set = lazy_set(&a);
303            check_invariants(&set).unwrap();
304
305            let count = nb(set.count_slow()).unwrap() as usize;
306            assert!(count <= a.len());
307
308            let set2: HashSet<_> = a.iter().cloned().collect();
309            assert_eq!(count, set2.len());
310
311            true
312        }
313    }
314}