dag/
integrity.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! Integrity checks.
9
10use std::collections::BTreeSet;
11
12use futures::StreamExt;
13use futures::TryStreamExt;
14
15use crate::iddag::IdDag;
16use crate::iddagstore::IdDagStore;
17use crate::idmap::IdMapAssignHead;
18use crate::namedag::AbstractNameDag;
19use crate::nameset::NameSet;
20use crate::ops::CheckIntegrity;
21use crate::ops::DagAlgorithm;
22use crate::ops::IdConvert;
23use crate::ops::Persist;
24use crate::ops::TryClone;
25use crate::segment::SegmentFlags;
26use crate::Group;
27use crate::Id;
28use crate::Result;
29use crate::VertexName;
30
31#[async_trait::async_trait]
32impl<IS, M, P, S> CheckIntegrity for AbstractNameDag<IdDag<IS>, M, P, S>
33where
34    IS: IdDagStore + Persist + 'static,
35    IdDag<IS>: TryClone,
36    M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
37    P: TryClone + Send + Sync + 'static,
38    S: TryClone + Persist + Send + Sync + 'static,
39{
40    async fn check_universal_ids(&self) -> Result<Vec<Id>> {
41        let universal_ids: Vec<Id> = self.dag.universal_ids()?.into_iter().collect();
42        tracing::debug!("{} universally known vertexes", universal_ids.len());
43        let exists = self.map.contains_vertex_id_locally(&universal_ids).await?;
44        let missing_ids = universal_ids
45            .into_iter()
46            .zip(exists)
47            .filter_map(|(id, b)| if b { None } else { Some(id) })
48            .collect();
49        Ok(missing_ids)
50    }
51
52    async fn check_segments(&self) -> Result<Vec<String>> {
53        // Detected problems.
54        let mut problems = Vec::new();
55
56        // Track heads and roots in the graph.
57        let mut heads: BTreeSet<Id> = Default::default();
58        let mut roots: BTreeSet<Id> = Default::default();
59
60        for level in 0..=self.dag.max_level()? {
61            let mut expected_low = Id::MIN;
62
63            // Check all levels.
64            for seg in self.dag.iter_segments_ascending(Id::MIN, level)? {
65                let seg = seg?;
66                let span = seg.span()?;
67                let mut add_problem =
68                    |msg| problems.push(format!("Level {} segment {:?} {}", level, &seg, msg));
69
70                // Spans need to be sorted and non-overlapping within a group.
71                if span.low.group() > expected_low.group() {
72                    expected_low = span.low.group().min_id();
73                }
74                if span.low > span.high || span.low.group() != span.high.group() {
75                    add_problem(format!("has invalid span {:?}", span));
76                }
77                if span.low < expected_low {
78                    add_problem(format!(
79                        "has unexpected span ({:?}), expected low ({:?})",
80                        span, expected_low
81                    ));
82                }
83                expected_low = span.high + 1;
84
85                // Parents should be < low to avoid cycles, and should not have duplicates.
86                let mut parents = seg.parents()?;
87                let orig_parents_len = parents.len();
88                parents.sort_unstable();
89                parents.dedup();
90                if parents.len() < orig_parents_len {
91                    add_problem("has duplicated parents".to_string());
92                }
93                if parents.iter().any(|&p| p >= span.low) {
94                    add_problem("has parents that might cause cycles".to_string());
95                }
96
97                // Maintain heads and roots. This can only be calculated from flat segments.
98                if level == 0 {
99                    let previous_head = heads.iter().rev().next().cloned();
100                    if let Some(head) = previous_head {
101                        if span.low <= head {
102                            add_problem(format!(
103                                "overlapped segments: {:?} with previous head {:?}",
104                                span, head
105                            ));
106                        }
107                    }
108                    for p in &parents {
109                        heads.remove(p);
110                    }
111                    heads.insert(span.high);
112                    if parents.is_empty() {
113                        roots.insert(span.low);
114                    }
115                }
116
117                // Check flags. min: must have flags, max: all possible flags.
118                let mut expected_flags_max = SegmentFlags::empty();
119                let mut expected_flags_min = SegmentFlags::empty();
120                if roots.range(span.low..=span.high).take(1).count() > 0 {
121                    expected_flags_min |= SegmentFlags::HAS_ROOT;
122                    expected_flags_max |= SegmentFlags::HAS_ROOT;
123                }
124                if level == 0 && heads.len() == 1 && span.high.group() == Group::MASTER {
125                    // ONLY_HEAD is optional.
126                    expected_flags_max |= SegmentFlags::ONLY_HEAD;
127                }
128                let flags = seg.flags()?;
129                if !flags.contains(expected_flags_min) || !expected_flags_max.contains(flags) {
130                    add_problem(format!(
131                        "has unexpected flags: {:?} (expected: min: {:?}, max: {:?})",
132                        flags, expected_flags_min, expected_flags_max
133                    ));
134                }
135
136                // Check parents for high-level segments.
137                if level > 0 {
138                    let mut expected_parents = Vec::with_capacity(parents.len());
139                    for seg in self.dag.iter_segments_ascending(span.low, level - 1)? {
140                        let seg = seg?;
141                        let subspan = seg.span()?;
142                        if subspan.high > span.high && subspan.low <= span.high {
143                            add_problem(format!(
144                                "does not align with low-level segment {:?}",
145                                &seg
146                            ));
147                        }
148                        if subspan.low > span.high {
149                            break;
150                        }
151                        for p in seg.parents()? {
152                            if p < span.low {
153                                expected_parents.push(p);
154                            }
155                        }
156                    }
157                    expected_parents.sort_unstable();
158                    expected_parents.dedup();
159                    if parents != expected_parents {
160                        add_problem(format!(
161                            "has unexpected parents (expected: {:?})",
162                            expected_parents
163                        ));
164                    }
165                }
166            }
167        }
168        Ok(problems)
169    }
170
171    async fn check_isomorphic_graph(
172        &self,
173        other: &dyn DagAlgorithm,
174        heads: NameSet,
175    ) -> Result<Vec<String>> {
176        let mut problems = Vec::new();
177
178        // Prefetch merges and their parents in both graphs' master group.
179        // This reduces round-trips.
180        tracing::debug!("prefetching merges and parents");
181        for graph in [self, other] as [&dyn DagAlgorithm; 2] {
182            if !graph.is_vertex_lazy() {
183                continue;
184            }
185            let related = graph.master_group().await? & graph.ancestors(heads.clone()).await?;
186            let merges = graph.merges(related).await?;
187            let parents = graph.parents(merges.clone()).await?;
188            let prefetch = merges | parents;
189            let mut iter = prefetch.iter().await?;
190            while let Some(_) = iter.next().await {}
191        }
192        tracing::trace!("prefetched merges and parents");
193
194        // To verify two graphs are isomorphic. Start from some heads,
195        // check and compare their linear ancestors recursively.
196        //
197        // For example, starting from "to_check" having just "E":
198        //
199        //      A--C--D--E      B--C--D--E
200        //        /               /
201        //       B               A
202        //
203        // 1. Figure out the linear portion (C--D--E).
204        // 2. Check the linear portion is the same in both graphs.
205        //    (only its root C can be a merge and C has the same parents)
206        // 3. Remove the head (E) from "to_check", insert root (C)'s parents
207        //    to "to_check".
208        // 4. Repeat from 1 until "to_check" is empty.
209        let mut to_check: Vec<VertexName> = heads.iter().await?.try_collect().await?;
210        let mut visited: BTreeSet<VertexName> = Default::default();
211        while let Some(head) = to_check.pop() {
212            if !visited.insert(head.clone()) {
213                continue;
214            }
215
216            // Use flat segment to figure out the linear portion.
217            let head_id = self.vertex_id(head.clone()).await?;
218            let seg = match self.dag.find_flat_segment_including_id(head_id)? {
219                Some(seg) => seg,
220                None => {
221                    problems.push(format!(
222                        "head_id {:?} should be covered by a flat segment",
223                        head_id
224                    ));
225                    continue;
226                }
227            };
228            let span = seg.span()?;
229            let root_id = span.low;
230            let root = self.vertex_name(root_id).await?;
231            let parents = self.parent_names(root.clone()).await?;
232            let mut add_problem = |msg| {
233                problems.push(format!(
234                    "range {:?}::{:?} with parents {:?}: {}",
235                    &root, &head, &parents, msg
236                ));
237            };
238            tracing::trace!("checking range {:?}::{:?}", &root, &head);
239
240            // Check against the other graph for various properties.
241            // Check vertex count in the range.
242            let this_count = head_id.0 - root_id.0 + 1;
243            let set = match other.range(root.clone().into(), head.clone().into()).await {
244                Ok(set) => set,
245                Err(e) => {
246                    add_problem(format!("cannot resolve range on the other graph: {:?}", e));
247                    continue;
248                }
249            };
250            let other_count = set.count().await? as u64;
251            if other_count != this_count {
252                add_problem(format!(
253                    "length mismatch: {} != {}",
254                    this_count, other_count
255                ));
256            }
257
258            // Check that merge can only be at most 1 (`root`).
259            let other_merges = other.merges(set).await?.count().await?;
260            let this_merges = if parents.len() > 1 { 1 } else { 0 };
261            if other_merges != this_merges {
262                add_problem(format!(
263                    "merge mismatch: {} != {}",
264                    this_merges, other_merges
265                ));
266            }
267
268            // Check parents of root.
269            let other_parents = match other.parent_names(root.clone()).await {
270                Ok(ps) => ps,
271                Err(e) => {
272                    add_problem(format!(
273                        "cannot get parents of {:?} on the other graph: {:?}",
274                        &root, e
275                    ));
276                    continue;
277                }
278            };
279            if other_parents != parents {
280                add_problem(format!(
281                    "parents mismatch: {:?} != {:?}",
282                    &parents, other_parents
283                ));
284            }
285
286            // Check parents recursively.
287            to_check.extend(parents);
288        }
289
290        Ok(problems)
291    }
292}