dag/nameset/
id_lazy.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::any::Any;
9use std::fmt;
10use std::sync::Arc;
11use std::sync::Mutex;
12use std::sync::MutexGuard;
13
14use indexmap::IndexSet;
15use nonblocking::non_blocking_result;
16
17use super::hints::Flags;
18use super::id_static::IdStaticSet;
19use super::AsyncNameSetQuery;
20use super::BoxVertexStream;
21use super::Hints;
22use crate::ops::DagAlgorithm;
23use crate::ops::IdConvert;
24use crate::protocol::disable_remote_protocol;
25use crate::Group;
26use crate::Id;
27use crate::IdSet;
28use crate::Result;
29use crate::VertexName;
30
31/// A set backed by a lazy iterator of Ids.
32pub struct IdLazySet {
33    // Mutex: iter() does not take &mut self.
34    // Arc: iter() result does not have a lifetime on this struct.
35    inner: Arc<Mutex<Inner>>,
36    pub map: Arc<dyn IdConvert + Send + Sync>,
37    pub(crate) dag: Arc<dyn DagAlgorithm + Send + Sync>,
38    hints: Hints,
39}
40
41struct Inner {
42    iter: Box<dyn Iterator<Item = Result<Id>> + Send + Sync>,
43    visited: IndexSet<Id>,
44    state: State,
45}
46
47impl Inner {
48    fn load_more(&mut self, n: usize, mut out: Option<&mut Vec<Id>>) -> Result<()> {
49        if matches!(self.state, State::Complete | State::Error) {
50            return Ok(());
51        }
52        for _ in 0..n {
53            match self.iter.next() {
54                Some(Ok(id)) => {
55                    if let Some(ref mut out) = out {
56                        out.push(id);
57                    }
58                    self.visited.insert(id);
59                }
60                None => {
61                    self.state = State::Complete;
62                    break;
63                }
64                Some(Err(err)) => {
65                    self.state = State::Error;
66                    return Err(err);
67                }
68            }
69        }
70        Ok(())
71    }
72}
73
74#[derive(Copy, Clone, Debug, PartialEq)]
75enum State {
76    Incomplete,
77    Complete,
78    Error,
79}
80
81pub struct Iter {
82    inner: Arc<Mutex<Inner>>,
83    index: usize,
84    map: Arc<dyn IdConvert + Send + Sync>,
85}
86
87impl Iter {
88    fn into_box_stream(self) -> BoxVertexStream {
89        Box::pin(futures::stream::unfold(self, |this| this.next()))
90    }
91
92    async fn next(mut self) -> Option<(Result<VertexName>, Self)> {
93        loop {
94            let state = {
95                let inner = self.inner.lock().unwrap();
96                inner.state
97            };
98            match state {
99                State::Error => break None,
100                State::Complete if self.inner.lock().unwrap().visited.len() <= self.index => {
101                    break None;
102                }
103                State::Complete | State::Incomplete => {
104                    let opt_id = {
105                        let inner = self.inner.lock().unwrap();
106                        inner.visited.get_index(self.index).cloned()
107                    };
108                    match opt_id {
109                        Some(id) => {
110                            self.index += 1;
111                            match self.map.vertex_name(id).await {
112                                Err(err) => {
113                                    self.inner.lock().unwrap().state = State::Error;
114                                    return Some((Err(err), self));
115                                }
116                                Ok(vertex) => {
117                                    break Some((Ok(vertex), self));
118                                }
119                            }
120                        }
121                        None => {
122                            // Data not available. Load more.
123                            let more = {
124                                let mut inner = self.inner.lock().unwrap();
125                                inner.load_more(1, None)
126                            };
127                            if let Err(err) = more {
128                                return Some((Err(err), self));
129                            }
130                        }
131                    }
132                }
133            }
134        }
135    }
136}
137
138struct DebugId {
139    id: Id,
140    name: Option<VertexName>,
141}
142
143impl fmt::Debug for DebugId {
144    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145        if let Some(name) = &self.name {
146            fmt::Debug::fmt(&name, f)?;
147            write!(f, "+{:?}", self.id)?;
148        } else {
149            write!(f, "{:?}", self.id)?;
150        }
151        Ok(())
152    }
153}
154
155impl fmt::Debug for IdLazySet {
156    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157        f.write_str("<lazy ")?;
158        let inner = self.inner.lock().unwrap();
159        let limit = f.width().unwrap_or(3);
160        f.debug_list()
161            .entries(inner.visited.iter().take(limit).map(|&id| DebugId {
162                id,
163                name: disable_remote_protocol(|| {
164                    non_blocking_result(self.map.vertex_name(id)).ok()
165                }),
166            }))
167            .finish()?;
168        let remaining = inner.visited.len().max(limit) - limit;
169        match (remaining, inner.state) {
170            (0, State::Incomplete) => f.write_str(" + ? more")?,
171            (n, State::Incomplete) => write!(f, "+ {} + ? more", n)?,
172            (0, _) => {}
173            (n, _) => write!(f, " + {} more", n)?,
174        }
175        f.write_str(">")?;
176        Ok(())
177    }
178}
179
180impl IdLazySet {
181    pub fn from_iter_idmap_dag<I>(
182        names: I,
183        map: Arc<dyn IdConvert + Send + Sync>,
184        dag: Arc<dyn DagAlgorithm + Send + Sync>,
185    ) -> Self
186    where
187        I: IntoIterator<Item = Result<Id>> + 'static,
188        <I as IntoIterator>::IntoIter: Send + Sync,
189    {
190        let iter = names.into_iter();
191        let inner = Inner {
192            iter: Box::new(iter),
193            visited: IndexSet::new(),
194            state: State::Incomplete,
195        };
196        let hints = Hints::new_with_idmap_dag(map.clone(), dag.clone());
197        Self {
198            inner: Arc::new(Mutex::new(inner)),
199            map,
200            dag,
201            hints,
202        }
203    }
204
205    /// Convert to an IdStaticSet.
206    pub fn to_static(&self) -> Result<IdStaticSet> {
207        let inner = self.load_all()?;
208        let mut spans = IdSet::empty();
209        for &id in inner.visited.iter() {
210            spans.push(id);
211        }
212        Ok(IdStaticSet::from_spans_idmap_dag(
213            spans,
214            self.map.clone(),
215            self.dag.clone(),
216        ))
217    }
218
219    fn load_all(&self) -> Result<MutexGuard<Inner>> {
220        let mut inner = self.inner.lock().unwrap();
221        inner.load_more(usize::max_value(), None)?;
222        Ok(inner)
223    }
224}
225
226#[async_trait::async_trait]
227impl AsyncNameSetQuery for IdLazySet {
228    async fn iter(&self) -> Result<BoxVertexStream> {
229        let inner = self.inner.clone();
230        let map = self.map.clone();
231        let iter = Iter {
232            inner,
233            index: 0,
234            map,
235        };
236        Ok(iter.into_box_stream())
237    }
238
239    async fn iter_rev(&self) -> Result<BoxVertexStream> {
240        let inner = self.load_all()?;
241        struct State {
242            map: Arc<dyn IdConvert + Send + Sync>,
243            iter: Box<dyn Iterator<Item = Id> + Send>,
244        }
245        let state = State {
246            map: self.map.clone(),
247            iter: Box::new(inner.visited.clone().into_iter().rev()),
248        };
249        async fn next(mut state: State) -> Option<(Result<VertexName>, State)> {
250            match state.iter.next() {
251                None => None,
252                Some(id) => {
253                    let result = state.map.vertex_name(id).await;
254                    Some((result, state))
255                }
256            }
257        }
258
259        let stream = futures::stream::unfold(state, next);
260        Ok(Box::pin(stream))
261    }
262
263    async fn count(&self) -> Result<usize> {
264        let inner = self.load_all()?;
265        Ok(inner.visited.len())
266    }
267
268    async fn last(&self) -> Result<Option<VertexName>> {
269        let opt_id = {
270            let inner = self.load_all()?;
271            inner.visited.iter().rev().nth(0).cloned()
272        };
273        match opt_id {
274            Some(id) => Ok(Some(self.map.vertex_name(id).await?)),
275            None => Ok(None),
276        }
277    }
278
279    async fn contains(&self, name: &VertexName) -> Result<bool> {
280        let id = match self
281            .map
282            .vertex_id_with_max_group(name, Group::NON_MASTER)
283            .await?
284        {
285            None => {
286                return Ok(false);
287            }
288            Some(id) => id,
289        };
290        let mut inner = self.inner.lock().unwrap();
291        if inner.visited.contains(&id) {
292            return Ok(true);
293        } else {
294            let mut loaded = Vec::new();
295            loop {
296                // Fast paths.
297                if let Some(&last_id) = inner.visited.iter().rev().next() {
298                    let hints = self.hints();
299                    if hints.contains(Flags::ID_DESC) {
300                        if last_id < id {
301                            return Ok(false);
302                        }
303                    } else if hints.contains(Flags::ID_ASC) {
304                        if last_id > id {
305                            return Ok(false);
306                        }
307                    }
308                }
309                loaded.clear();
310                inner.load_more(1, Some(&mut loaded))?;
311                debug_assert!(loaded.len() <= 1);
312                if loaded.is_empty() {
313                    break;
314                }
315                if loaded.first() == Some(&id) {
316                    return Ok(true);
317                }
318            }
319        }
320        Ok(false)
321    }
322
323    async fn contains_fast(&self, name: &VertexName) -> Result<Option<bool>> {
324        let id = match self
325            .map
326            .vertex_id_with_max_group(name, Group::NON_MASTER)
327            .await?
328        {
329            None => {
330                return Ok(Some(false));
331            }
332            Some(id) => id,
333        };
334        let inner = self.inner.lock().unwrap();
335        if inner.visited.contains(&id) {
336            return Ok(Some(true));
337        } else if inner.state != State::Incomplete {
338            return Ok(Some(false));
339        }
340        Ok(None)
341    }
342
343    fn as_any(&self) -> &dyn Any {
344        self
345    }
346
347    fn hints(&self) -> &Hints {
348        &self.hints
349    }
350
351    fn id_convert(&self) -> Option<&dyn IdConvert> {
352        Some(self.map.as_ref() as &dyn IdConvert)
353    }
354}
355
356#[cfg(test)]
357pub(crate) mod test_utils {
358    use std::sync::atomic::AtomicU64;
359    use std::sync::atomic::Ordering::AcqRel;
360
361    use super::*;
362    use crate::ops::PrefixLookup;
363    use crate::tests::dummy_dag::DummyDag;
364    use crate::VerLink;
365
366    static STR_ID_MAP_ID: AtomicU64 = AtomicU64::new(0);
367
368    pub(crate) struct StrIdMap {
369        id: String,
370        version: VerLink,
371    }
372
373    impl StrIdMap {
374        pub(crate) fn new() -> Self {
375            Self {
376                id: format!("str:{}", STR_ID_MAP_ID.fetch_add(1, AcqRel)),
377                version: VerLink::new(),
378            }
379        }
380    }
381
382    #[async_trait::async_trait]
383    impl PrefixLookup for StrIdMap {
384        async fn vertexes_by_hex_prefix(&self, _: &[u8], _: usize) -> Result<Vec<VertexName>> {
385            // Dummy implementation.
386            Ok(Vec::new())
387        }
388    }
389    #[async_trait::async_trait]
390    impl IdConvert for StrIdMap {
391        async fn vertex_id(&self, name: VertexName) -> Result<Id> {
392            let slice: [u8; 8] = name.as_ref().try_into().unwrap();
393            let id = u64::from_le(unsafe { std::mem::transmute(slice) });
394            Ok(Id(id))
395        }
396        async fn vertex_id_with_max_group(
397            &self,
398            name: &VertexName,
399            _max_group: Group,
400        ) -> Result<Option<Id>> {
401            if name.as_ref().len() == 8 {
402                let id = self.vertex_id(name.clone()).await?;
403                Ok(Some(id))
404            } else {
405                Ok(None)
406            }
407        }
408        async fn vertex_name(&self, id: Id) -> Result<VertexName> {
409            let buf: [u8; 8] = unsafe { std::mem::transmute(id.0.to_le()) };
410            Ok(VertexName::copy_from(&buf))
411        }
412        async fn contains_vertex_name(&self, name: &VertexName) -> Result<bool> {
413            Ok(name.as_ref().len() == 8)
414        }
415        fn map_id(&self) -> &str {
416            &self.id
417        }
418        fn map_version(&self) -> &VerLink {
419            &self.version
420        }
421        async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
422            Ok(ids.iter().map(|_| true).collect())
423        }
424        async fn contains_vertex_name_locally(&self, names: &[VertexName]) -> Result<Vec<bool>> {
425            Ok(names.iter().map(|name| name.as_ref().len() == 8).collect())
426        }
427    }
428
429    pub fn lazy_set(a: &[u64]) -> IdLazySet {
430        let ids: Vec<Id> = a.iter().map(|i| Id(*i as _)).collect();
431        IdLazySet::from_iter_idmap_dag(
432            ids.into_iter().map(Ok),
433            Arc::new(StrIdMap::new()),
434            Arc::new(DummyDag::new()),
435        )
436    }
437
438    pub fn lazy_set_inherit(a: &[u64], set: &IdLazySet) -> IdLazySet {
439        let ids: Vec<Id> = a.iter().map(|i| Id(*i as _)).collect();
440        IdLazySet::from_iter_idmap_dag(ids.into_iter().map(Ok), set.map.clone(), set.dag.clone())
441    }
442}
443
444#[cfg(all(test, feature = "indexedlog-backend"))]
445#[allow(clippy::redundant_clone)]
446pub(crate) mod tests {
447    use std::collections::HashSet;
448
449    use nonblocking::non_blocking_result as r;
450
451    use super::super::tests::*;
452    use super::super::NameSet;
453    use super::test_utils::*;
454    use super::*;
455
456    #[test]
457    fn test_id_lazy_basic() -> Result<()> {
458        let set = lazy_set(&[0x11, 0x33, 0x22, 0x77, 0x55]);
459        check_invariants(&set)?;
460        assert_eq!(shorten_iter(ni(set.iter())), ["11", "33", "22", "77", "55"]);
461        assert_eq!(
462            shorten_iter(ni(set.iter_rev())),
463            ["55", "77", "22", "33", "11"]
464        );
465        assert!(!nb(set.is_empty())?);
466        assert_eq!(nb(set.count())?, 5);
467        assert_eq!(shorten_name(nb(set.first())?.unwrap()), "11");
468        assert_eq!(shorten_name(nb(set.last())?.unwrap()), "55");
469        Ok(())
470    }
471
472    #[test]
473    fn test_hints_fast_paths() -> Result<()> {
474        let set = lazy_set(&[0x20, 0x50, 0x30, 0x70]);
475
476        // Incorrect hints, but useful for testing.
477        set.hints().add_flags(Flags::ID_ASC);
478
479        let v = |i: u64| -> VertexName { r(StrIdMap::new().vertex_name(Id(i))).unwrap() };
480        assert!(nb(set.contains(&v(0x20)))?);
481        assert!(nb(set.contains(&v(0x50)))?);
482        assert!(!nb(set.contains(&v(0x30)))?);
483
484        set.hints().add_flags(Flags::ID_DESC);
485        assert!(nb(set.contains(&v(0x30)))?);
486        assert!(!nb(set.contains(&v(0x70)))?);
487
488        Ok(())
489    }
490
491    #[test]
492    fn test_debug() {
493        let set = lazy_set(&[0]);
494        assert_eq!(format!("{:?}", set), "<lazy [] + ? more>");
495        nb(set.count()).unwrap();
496        assert_eq!(format!("{:?}", set), "<lazy [0000000000000000+0]>");
497
498        let set = lazy_set(&[1, 3, 2]);
499        assert_eq!(format!("{:?}", &set), "<lazy [] + ? more>");
500        let mut iter = ni(set.iter()).unwrap();
501        iter.next();
502        assert_eq!(
503            format!("{:?}", &set),
504            "<lazy [0100000000000000+1] + ? more>"
505        );
506        iter.next();
507        assert_eq!(
508            format!("{:?}", &set),
509            "<lazy [0100000000000000+1, 0300000000000000+3] + ? more>"
510        );
511        iter.next();
512        assert_eq!(format!("{:2.2?}", &set), "<lazy [01+1, 03+3]+ 1 + ? more>");
513        iter.next();
514        assert_eq!(format!("{:1.3?}", &set), "<lazy [010+1] + 2 more>");
515    }
516
517    #[test]
518    fn test_flatten() {
519        let set1 = lazy_set(&[3, 2, 4]);
520        let set2 = lazy_set_inherit(&[3, 7, 6], &set1);
521        let set1 = NameSet::from_query(set1);
522        let set2 = NameSet::from_query(set2);
523
524        // Show flatten by names, and flatten by ids.
525        // The first should be <static ...>, the second should be <spans ...>.
526        let show = |set: NameSet| {
527            [
528                format!("{:5.2?}", r(set.flatten_names()).unwrap()),
529                format!("{:5.2?}", r(set.flatten()).unwrap()),
530            ]
531        };
532
533        assert_eq!(
534            show(set1.clone() | set2.clone()),
535            [
536                "<static [03, 02, 04, 07, 06]>",
537                "<spans [06:07+6:7, 02:04+2:4]>"
538            ]
539        );
540        assert_eq!(
541            show(set1.clone() & set2.clone()),
542            ["<static [03]>", "<spans [03+3]>"]
543        );
544        assert_eq!(
545            show(set1.clone() - set2.clone()),
546            ["<static [02, 04]>", "<spans [04+4, 02+2]>"]
547        );
548    }
549
550    quickcheck::quickcheck! {
551        fn test_id_lazy_quickcheck(a: Vec<u64>) -> bool {
552            let set = lazy_set(&a);
553            check_invariants(&set).unwrap();
554
555            let count = nb(set.count()).unwrap();
556            assert!(count <= a.len());
557
558            let set2: HashSet<_> = a.iter().cloned().collect();
559            assert_eq!(count, set2.len());
560
561            true
562        }
563    }
564}