dag/nameset/
id_static.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::any::Any;
9use std::fmt;
10use std::sync::Arc;
11
12use nonblocking::non_blocking_result;
13
14use super::hints::Flags;
15use super::AsyncNameSetQuery;
16use super::BoxVertexStream;
17use super::Hints;
18use crate::ops::DagAlgorithm;
19use crate::ops::IdConvert;
20use crate::protocol::disable_remote_protocol;
21use crate::Group;
22use crate::IdSet;
23use crate::IdSetIter;
24use crate::IdSpan;
25use crate::Result;
26use crate::VertexName;
27
28/// A set backed by [`IdSet`] + [`IdMap`].
29/// Efficient for DAG calculation.
30pub struct IdStaticSet {
31    pub(crate) spans: IdSet,
32    pub(crate) map: Arc<dyn IdConvert + Send + Sync>,
33    pub(crate) dag: Arc<dyn DagAlgorithm + Send + Sync>,
34    hints: Hints,
35}
36
37struct Iter {
38    iter: IdSetIter<IdSet>,
39    map: Arc<dyn IdConvert + Send + Sync>,
40    reversed: bool,
41    buf: Vec<Result<VertexName>>,
42}
43
44impl Iter {
45    fn into_box_stream(self) -> BoxVertexStream {
46        Box::pin(futures::stream::unfold(self, |this| this.next()))
47    }
48
49    async fn next(mut self) -> Option<(Result<VertexName>, Self)> {
50        if let Some(name) = self.buf.pop() {
51            return Some((name, self));
52        }
53        let map = &self.map;
54        let opt_id = if self.reversed {
55            self.iter.next_back()
56        } else {
57            self.iter.next()
58        };
59        match opt_id {
60            None => None,
61            Some(id) => {
62                let contains = map
63                    .contains_vertex_id_locally(&[id])
64                    .await
65                    .unwrap_or_default();
66                if contains == &[true] {
67                    Some((map.vertex_name(id).await, self))
68                } else {
69                    // On demand prefetch in batch.
70                    let batch_size = 131072;
71                    let mut ids = Vec::with_capacity(batch_size);
72                    ids.push(id);
73                    for _ in ids.len()..batch_size {
74                        if let Some(id) = if self.reversed {
75                            self.iter.next_back()
76                        } else {
77                            self.iter.next()
78                        } {
79                            ids.push(id);
80                        } else {
81                            break;
82                        }
83                    }
84                    ids.reverse();
85                    self.buf = match self.map.vertex_name_batch(&ids).await {
86                        Err(e) => return Some((Err(e), self)),
87                        Ok(names) => names,
88                    };
89                    if self.buf.len() != ids.len() {
90                        let result =
91                            crate::errors::bug("vertex_name_batch does not return enough items");
92                        return Some((result, self));
93                    }
94                    let name = self.buf.pop().expect("buf is not empty");
95                    Some((name, self))
96                }
97            }
98        }
99    }
100}
101
102struct DebugSpan {
103    span: IdSpan,
104    low_name: Option<VertexName>,
105    high_name: Option<VertexName>,
106}
107
108impl fmt::Debug for DebugSpan {
109    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
110        match (
111            self.span.low == self.span.high,
112            &self.low_name,
113            &self.high_name,
114        ) {
115            (true, Some(name), _) => {
116                fmt::Debug::fmt(&name, f)?;
117                write!(f, "+{:?}", self.span.low)?;
118            }
119            (true, None, _) => {
120                write!(f, "{:?}", self.span.low)?;
121            }
122            (false, Some(low), Some(high)) => {
123                fmt::Debug::fmt(&low, f)?;
124                write!(f, ":")?;
125                fmt::Debug::fmt(&high, f)?;
126                write!(f, "+{:?}:{:?}", self.span.low, self.span.high)?;
127            }
128            (false, _, _) => {
129                write!(f, "{:?}:{:?}", self.span.low, self.span.high)?;
130            }
131        }
132        Ok(())
133    }
134}
135
136impl fmt::Debug for IdStaticSet {
137    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
138        write!(f, "<spans ")?;
139        let spans = self.spans.as_spans();
140        let limit = f.width().unwrap_or(3);
141        f.debug_list()
142            .entries(spans.iter().take(limit).map(|span| DebugSpan {
143                span: *span,
144                low_name: disable_remote_protocol(|| {
145                    non_blocking_result(self.map.vertex_name(span.low)).ok()
146                }),
147                high_name: disable_remote_protocol(|| {
148                    non_blocking_result(self.map.vertex_name(span.high)).ok()
149                }),
150            }))
151            .finish()?;
152        match spans.len().max(limit) - limit {
153            0 => {}
154            1 => write!(f, " + 1 span")?,
155            n => write!(f, " + {} spans", n)?,
156        }
157        write!(f, ">")?;
158        Ok(())
159    }
160}
161
162impl IdStaticSet {
163    pub(crate) fn from_spans_idmap_dag(
164        spans: IdSet,
165        map: Arc<dyn IdConvert + Send + Sync>,
166        dag: Arc<dyn DagAlgorithm + Send + Sync>,
167    ) -> Self {
168        let hints = Hints::new_with_idmap_dag(map.clone(), dag.clone());
169        hints.add_flags(Flags::ID_DESC | Flags::TOPO_DESC);
170        if spans.is_empty() {
171            hints.add_flags(Flags::EMPTY);
172        } else {
173            hints.set_min_id(spans.min().unwrap());
174            hints.set_max_id(spans.max().unwrap());
175        }
176        Self {
177            spans,
178            map,
179            hints,
180            dag,
181        }
182    }
183}
184
185#[async_trait::async_trait]
186impl AsyncNameSetQuery for IdStaticSet {
187    async fn iter(&self) -> Result<BoxVertexStream> {
188        let iter = Iter {
189            iter: self.spans.clone().into_iter(),
190            map: self.map.clone(),
191            reversed: false,
192            buf: Default::default(),
193        };
194        Ok(iter.into_box_stream())
195    }
196
197    async fn iter_rev(&self) -> Result<BoxVertexStream> {
198        let iter = Iter {
199            iter: self.spans.clone().into_iter(),
200            map: self.map.clone(),
201            reversed: true,
202            buf: Default::default(),
203        };
204        Ok(iter.into_box_stream())
205    }
206
207    async fn count(&self) -> Result<usize> {
208        Ok(self.spans.count() as usize)
209    }
210
211    async fn first(&self) -> Result<Option<VertexName>> {
212        debug_assert_eq!(self.spans.max(), self.spans.iter_desc().nth(0));
213        match self.spans.max() {
214            Some(id) => {
215                let map = &self.map;
216                let name = map.vertex_name(id).await?;
217                Ok(Some(name))
218            }
219            None => Ok(None),
220        }
221    }
222
223    async fn last(&self) -> Result<Option<VertexName>> {
224        debug_assert_eq!(self.spans.min(), self.spans.iter_desc().rev().nth(0));
225        match self.spans.min() {
226            Some(id) => {
227                let map = &self.map;
228                let name = map.vertex_name(id).await?;
229                Ok(Some(name))
230            }
231            None => Ok(None),
232        }
233    }
234
235    async fn is_empty(&self) -> Result<bool> {
236        Ok(self.spans.is_empty())
237    }
238
239    async fn contains(&self, name: &VertexName) -> Result<bool> {
240        let result = match self
241            .map
242            .vertex_id_with_max_group(name, Group::NON_MASTER)
243            .await?
244        {
245            Some(id) => self.spans.contains(id),
246            None => false,
247        };
248        Ok(result)
249    }
250
251    async fn contains_fast(&self, name: &VertexName) -> Result<Option<bool>> {
252        self.contains(name).await.map(Some)
253    }
254
255    fn as_any(&self) -> &dyn Any {
256        self
257    }
258
259    fn hints(&self) -> &Hints {
260        &self.hints
261    }
262
263    fn id_convert(&self) -> Option<&dyn IdConvert> {
264        Some(self.map.as_ref() as &dyn IdConvert)
265    }
266}
267
268#[cfg(test)]
269#[allow(clippy::redundant_clone)]
270pub(crate) mod tests {
271    use std::ops::Deref;
272
273    use nonblocking::non_blocking_result as r;
274
275    use super::super::tests::*;
276    use super::super::NameSet;
277    use super::*;
278    use crate::tests::build_segments;
279    use crate::DagAlgorithm;
280    use crate::NameDag;
281
282    /// Test with a predefined DAG.
283    pub(crate) fn with_dag<R, F: Fn(&NameDag) -> R>(func: F) -> R {
284        let built = build_segments(
285            r#"
286            A--B--C--D
287                \--E--F--G"#,
288            "D G",
289            2,
290        );
291        //  0--1--2--3
292        //      \--4--5--6
293        func(&built.name_dag)
294    }
295
296    #[test]
297    fn test_dag_invariants() -> Result<()> {
298        with_dag(|dag| {
299            let bef = r(dag.range("B".into(), "F".into()))?;
300            check_invariants(bef.deref())?;
301
302            Ok(())
303        })
304    }
305
306    #[test]
307    fn test_dag_fast_paths() -> Result<()> {
308        with_dag(|dag| {
309            let abcd = r(dag.ancestors("D".into()))?;
310            let abefg = r(dag.ancestors("G".into()))?;
311
312            let ab = abcd.intersection(&abefg);
313            check_invariants(ab.deref())?;
314
315            assert!(nb(abcd.contains(&vec![b'A'].into()))?);
316            assert!(!nb(abcd.contains(&vec![b'E'].into()))?);
317
318            // should not be "<and <...> <...>>"
319            assert_eq!(format!("{:?}", &ab), "<spans [A:B+0:1]>");
320
321            let abcdefg = abcd.union(&abefg);
322            check_invariants(abcd.deref())?;
323            // should not be "<or <...> <...>>"
324            assert_eq!(format!("{:?}", &abcdefg), "<spans [A:G+0:6]>");
325
326            let cd = abcd.difference(&abefg);
327            check_invariants(cd.deref())?;
328            // should not be "<difference <...> <...>>"
329            assert_eq!(format!("{:?}", &cd), "<spans [C:D+2:3]>");
330
331            Ok(())
332        })
333    }
334
335    #[test]
336    fn test_dag_no_fast_paths() -> Result<()> {
337        let f = |s: NameSet| -> String { format!("{:?}", s) };
338        with_dag(|dag1| -> Result<()> {
339            with_dag(|dag2| -> Result<()> {
340                let abcd = r(dag1.ancestors("D".into()))?;
341                let abefg = r(dag2.ancestors("G".into()))?;
342
343                // Since abcd and abefg are from 2 "separate" Dags, fast paths should not
344                // be used for intersection, union, and difference.
345
346                let ab = abcd.intersection(&abefg);
347                check_invariants(ab.deref())?;
348                // should not be "<spans ...>"
349                assert_eq!(
350                    format!("{:?}", &ab),
351                    "<and <spans [A:D+0:3]> <spans [E:G+4:6, A:B+0:1]>>"
352                );
353
354                let abcdefg = abcd.union(&abefg);
355                check_invariants(abcd.deref())?;
356                // should not be "<spans ...>"
357                assert_eq!(
358                    format!("{:?}", &abcdefg),
359                    "<or <spans [A:D+0:3]> <spans [E:G+4:6, A:B+0:1]>>"
360                );
361
362                let cd = abcd.difference(&abefg);
363                check_invariants(cd.deref())?;
364                // should not be "<spans ...>"
365                assert_eq!(
366                    format!("{:?}", &cd),
367                    "<diff <spans [A:D+0:3]> <spans [E:G+4:6, A:B+0:1]>>"
368                );
369
370                // Should not use FULL hint fast paths for "&, |, -" operations, because
371                // dag1 and dag2 are not considered compatible.
372                let a1 = || r(dag1.all()).unwrap();
373                let a2 = || r(dag2.all()).unwrap();
374                assert_eq!(f(a1() & a2()), "<and <spans [A:G+0:6]> <spans [A:G+0:6]>>");
375                assert_eq!(f(a1() | a2()), "<or <spans [A:G+0:6]> <spans [A:G+0:6]>>");
376                assert_eq!(f(a1() - a2()), "<diff <spans [A:G+0:6]> <spans [A:G+0:6]>>");
377
378                // No fast path for manually constructed StaticSet either, because
379                // the StaticSets do not have DAG associated to test compatibility.
380                // However, "all & z" is changed to "z & all" for performance.
381                let z = || NameSet::from("Z");
382                assert_eq!(f(z() & a2()), "<and <static [Z]> <spans [A:G+0:6]>>");
383                assert_eq!(f(z() | a2()), "<or <static [Z]> <spans [A:G+0:6]>>");
384                assert_eq!(f(z() - a2()), "<diff <static [Z]> <spans [A:G+0:6]>>");
385                assert_eq!(f(a1() & z()), "<and <static [Z]> <spans [A:G+0:6]>>");
386                assert_eq!(f(a1() | z()), "<or <spans [A:G+0:6]> <static [Z]>>");
387                assert_eq!(f(a1() - z()), "<diff <spans [A:G+0:6]> <static [Z]>>");
388
389                // EMPTY fast paths can still be used.
390                let e = || NameSet::empty();
391                assert_eq!(f(e() & a1()), "<empty>");
392                assert_eq!(f(e() | a1()), "<spans [A:G+0:6]>");
393                assert_eq!(f(e() - a1()), "<empty>");
394                assert_eq!(f(a1() & e()), "<empty>");
395                assert_eq!(f(a1() | e()), "<spans [A:G+0:6]>");
396                assert_eq!(f(a1() - e()), "<spans [A:G+0:6]>");
397
398                Ok(())
399            })
400        })
401    }
402
403    #[test]
404    fn test_dag_all() -> Result<()> {
405        with_dag(|dag| {
406            let all = r(dag.all())?;
407            assert_eq!(format!("{:?}", &all), "<spans [A:G+0:6]>");
408
409            let ac: NameSet = "A C".into();
410            let ac = r(dag.sort(&ac))?;
411
412            let intersection = all.intersection(&ac);
413            // should not be "<and ...>"
414            assert_eq!(format!("{:?}", &intersection), "<spans [C+2, A+0]>");
415            Ok(())
416        })
417    }
418
419    #[test]
420    fn test_sort() -> Result<()> {
421        with_dag(|dag| -> Result<()> {
422            let set = "G C A E".into();
423            let sorted = r(dag.sort(&set))?;
424            assert_eq!(format!("{:?}", &sorted), "<spans [G+6, E+4, C+2] + 1 span>");
425            Ok(())
426        })
427    }
428
429    #[test]
430    fn test_dag_hints_ancestors() -> Result<()> {
431        with_dag(|dag| -> Result<()> {
432            let abc = r(dag.ancestors("B C".into()))?;
433            let abe = r(dag.common_ancestors("E".into()))?;
434            let f: NameSet = "F".into();
435            let all = r(dag.all())?;
436
437            assert!(has_ancestors_flag(abc.clone()));
438            assert!(has_ancestors_flag(abe.clone()));
439            assert!(has_ancestors_flag(all.clone()));
440            assert!(has_ancestors_flag(r(dag.roots(abc.clone()))?));
441            assert!(has_ancestors_flag(r(dag.parents(all.clone()))?));
442
443            assert!(!has_ancestors_flag(f.clone()));
444            assert!(!has_ancestors_flag(r(dag.roots(f.clone()))?));
445            assert!(!has_ancestors_flag(r(dag.parents(f.clone()))?));
446
447            Ok(())
448        })
449    }
450
451    #[test]
452    fn test_dag_hints_ancestors_inheritance() -> Result<()> {
453        with_dag(|dag1| -> Result<()> {
454            with_dag(|dag2| -> Result<()> {
455                let abc = r(dag1.ancestors("B C".into()))?;
456
457                // The ANCESTORS flag is kept by 'sort', 'parents', 'roots' on
458                // the same dag.
459                assert!(has_ancestors_flag(r(dag1.sort(&abc))?));
460                assert!(has_ancestors_flag(r(dag1.parents(abc.clone()))?));
461                assert!(has_ancestors_flag(r(dag1.roots(abc.clone()))?));
462
463                // The ANCESTORS flag is removed on a different dag, since the
464                // different dag does not assume same graph / ancestry
465                // relationship.
466                assert!(!has_ancestors_flag(r(dag2.sort(&abc))?));
467                assert!(!has_ancestors_flag(r(dag2.parents(abc.clone()))?));
468                assert!(!has_ancestors_flag(r(dag2.roots(abc.clone()))?));
469
470                Ok(())
471            })
472        })
473    }
474
475    #[test]
476    fn test_dag_hints_ancestors_fast_paths() -> Result<()> {
477        with_dag(|dag| -> Result<()> {
478            let bfg: NameSet = "B F G".into();
479
480            // Set the ANCESTORS flag. It's incorrect but make it easier to test fast paths.
481            bfg.hints().add_flags(Flags::ANCESTORS);
482
483            // Fast paths are not used if the set is not "bound" to the dag.
484            assert_eq!(
485                format!("{:?}", r(dag.ancestors(bfg.clone()))?),
486                "<static [B, F, G]>"
487            );
488            assert_eq!(format!("{:?}", r(dag.heads(bfg.clone()))?), "<spans [G+6]>");
489
490            // Binding to the Dag enables fast paths.
491            let bfg = r(dag.sort(&bfg))?;
492            bfg.hints().add_flags(Flags::ANCESTORS);
493            assert_eq!(
494                format!("{:?}", r(dag.ancestors(bfg.clone()))?),
495                "<spans [F:G+5:6, B+1]>"
496            );
497
498            // 'heads' has a fast path that uses 'heads_ancestors' to do the calculation.
499            // (in this case the result is incorrect because the hints are wrong).
500            assert_eq!(format!("{:?}", r(dag.heads(bfg.clone()))?), "<spans [G+6]>");
501
502            // 'ancestors' has a fast path that returns set as-is.
503            // (in this case the result is incorrect because the hints are wrong).
504            assert_eq!(
505                format!("{:?}", r(dag.ancestors(bfg.clone()))?),
506                "<spans [F:G+5:6, B+1]>"
507            );
508
509            Ok(())
510        })
511    }
512
513    fn has_ancestors_flag(set: NameSet) -> bool {
514        set.hints().contains(Flags::ANCESTORS)
515    }
516}