1use std::collections::BTreeSet;
11
12use futures::StreamExt;
13use futures::TryStreamExt;
14
15use crate::iddag::IdDag;
16use crate::iddagstore::IdDagStore;
17use crate::idmap::IdMapAssignHead;
18use crate::namedag::AbstractNameDag;
19use crate::nameset::NameSet;
20use crate::ops::CheckIntegrity;
21use crate::ops::DagAlgorithm;
22use crate::ops::IdConvert;
23use crate::ops::Persist;
24use crate::ops::TryClone;
25use crate::segment::SegmentFlags;
26use crate::Group;
27use crate::Id;
28use crate::Result;
29use crate::VertexName;
30
31#[async_trait::async_trait]
32impl<IS, M, P, S> CheckIntegrity for AbstractNameDag<IdDag<IS>, M, P, S>
33where
34 IS: IdDagStore + Persist + 'static,
35 IdDag<IS>: TryClone,
36 M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
37 P: TryClone + Send + Sync + 'static,
38 S: TryClone + Persist + Send + Sync + 'static,
39{
40 async fn check_universal_ids(&self) -> Result<Vec<Id>> {
41 let universal_ids: Vec<Id> = self.dag.universal_ids()?.into_iter().collect();
42 tracing::debug!("{} universally known vertexes", universal_ids.len());
43 let exists = self.map.contains_vertex_id_locally(&universal_ids).await?;
44 let missing_ids = universal_ids
45 .into_iter()
46 .zip(exists)
47 .filter_map(|(id, b)| if b { None } else { Some(id) })
48 .collect();
49 Ok(missing_ids)
50 }
51
52 async fn check_segments(&self) -> Result<Vec<String>> {
53 let mut problems = Vec::new();
55
56 let mut heads: BTreeSet<Id> = Default::default();
58 let mut roots: BTreeSet<Id> = Default::default();
59
60 for level in 0..=self.dag.max_level()? {
61 let mut expected_low = Id::MIN;
62
63 for seg in self.dag.iter_segments_ascending(Id::MIN, level)? {
65 let seg = seg?;
66 let span = seg.span()?;
67 let mut add_problem =
68 |msg| problems.push(format!("Level {} segment {:?} {}", level, &seg, msg));
69
70 if span.low.group() > expected_low.group() {
72 expected_low = span.low.group().min_id();
73 }
74 if span.low > span.high || span.low.group() != span.high.group() {
75 add_problem(format!("has invalid span {:?}", span));
76 }
77 if span.low < expected_low {
78 add_problem(format!(
79 "has unexpected span ({:?}), expected low ({:?})",
80 span, expected_low
81 ));
82 }
83 expected_low = span.high + 1;
84
85 let mut parents = seg.parents()?;
87 let orig_parents_len = parents.len();
88 parents.sort_unstable();
89 parents.dedup();
90 if parents.len() < orig_parents_len {
91 add_problem("has duplicated parents".to_string());
92 }
93 if parents.iter().any(|&p| p >= span.low) {
94 add_problem("has parents that might cause cycles".to_string());
95 }
96
97 if level == 0 {
99 let previous_head = heads.iter().rev().next().cloned();
100 if let Some(head) = previous_head {
101 if span.low <= head {
102 add_problem(format!(
103 "overlapped segments: {:?} with previous head {:?}",
104 span, head
105 ));
106 }
107 }
108 for p in &parents {
109 heads.remove(p);
110 }
111 heads.insert(span.high);
112 if parents.is_empty() {
113 roots.insert(span.low);
114 }
115 }
116
117 let mut expected_flags_max = SegmentFlags::empty();
119 let mut expected_flags_min = SegmentFlags::empty();
120 if roots.range(span.low..=span.high).take(1).count() > 0 {
121 expected_flags_min |= SegmentFlags::HAS_ROOT;
122 expected_flags_max |= SegmentFlags::HAS_ROOT;
123 }
124 if level == 0 && heads.len() == 1 && span.high.group() == Group::MASTER {
125 expected_flags_max |= SegmentFlags::ONLY_HEAD;
127 }
128 let flags = seg.flags()?;
129 if !flags.contains(expected_flags_min) || !expected_flags_max.contains(flags) {
130 add_problem(format!(
131 "has unexpected flags: {:?} (expected: min: {:?}, max: {:?})",
132 flags, expected_flags_min, expected_flags_max
133 ));
134 }
135
136 if level > 0 {
138 let mut expected_parents = Vec::with_capacity(parents.len());
139 for seg in self.dag.iter_segments_ascending(span.low, level - 1)? {
140 let seg = seg?;
141 let subspan = seg.span()?;
142 if subspan.high > span.high && subspan.low <= span.high {
143 add_problem(format!(
144 "does not align with low-level segment {:?}",
145 &seg
146 ));
147 }
148 if subspan.low > span.high {
149 break;
150 }
151 for p in seg.parents()? {
152 if p < span.low {
153 expected_parents.push(p);
154 }
155 }
156 }
157 expected_parents.sort_unstable();
158 expected_parents.dedup();
159 if parents != expected_parents {
160 add_problem(format!(
161 "has unexpected parents (expected: {:?})",
162 expected_parents
163 ));
164 }
165 }
166 }
167 }
168 Ok(problems)
169 }
170
171 async fn check_isomorphic_graph(
172 &self,
173 other: &dyn DagAlgorithm,
174 heads: NameSet,
175 ) -> Result<Vec<String>> {
176 let mut problems = Vec::new();
177
178 tracing::debug!("prefetching merges and parents");
181 for graph in [self, other] as [&dyn DagAlgorithm; 2] {
182 if !graph.is_vertex_lazy() {
183 continue;
184 }
185 let related = graph.master_group().await? & graph.ancestors(heads.clone()).await?;
186 let merges = graph.merges(related).await?;
187 let parents = graph.parents(merges.clone()).await?;
188 let prefetch = merges | parents;
189 let mut iter = prefetch.iter().await?;
190 while let Some(_) = iter.next().await {}
191 }
192 tracing::trace!("prefetched merges and parents");
193
194 let mut to_check: Vec<VertexName> = heads.iter().await?.try_collect().await?;
210 let mut visited: BTreeSet<VertexName> = Default::default();
211 while let Some(head) = to_check.pop() {
212 if !visited.insert(head.clone()) {
213 continue;
214 }
215
216 let head_id = self.vertex_id(head.clone()).await?;
218 let seg = match self.dag.find_flat_segment_including_id(head_id)? {
219 Some(seg) => seg,
220 None => {
221 problems.push(format!(
222 "head_id {:?} should be covered by a flat segment",
223 head_id
224 ));
225 continue;
226 }
227 };
228 let span = seg.span()?;
229 let root_id = span.low;
230 let root = self.vertex_name(root_id).await?;
231 let parents = self.parent_names(root.clone()).await?;
232 let mut add_problem = |msg| {
233 problems.push(format!(
234 "range {:?}::{:?} with parents {:?}: {}",
235 &root, &head, &parents, msg
236 ));
237 };
238 tracing::trace!("checking range {:?}::{:?}", &root, &head);
239
240 let this_count = head_id.0 - root_id.0 + 1;
243 let set = match other.range(root.clone().into(), head.clone().into()).await {
244 Ok(set) => set,
245 Err(e) => {
246 add_problem(format!("cannot resolve range on the other graph: {:?}", e));
247 continue;
248 }
249 };
250 let other_count = set.count().await? as u64;
251 if other_count != this_count {
252 add_problem(format!(
253 "length mismatch: {} != {}",
254 this_count, other_count
255 ));
256 }
257
258 let other_merges = other.merges(set).await?.count().await?;
260 let this_merges = if parents.len() > 1 { 1 } else { 0 };
261 if other_merges != this_merges {
262 add_problem(format!(
263 "merge mismatch: {} != {}",
264 this_merges, other_merges
265 ));
266 }
267
268 let other_parents = match other.parent_names(root.clone()).await {
270 Ok(ps) => ps,
271 Err(e) => {
272 add_problem(format!(
273 "cannot get parents of {:?} on the other graph: {:?}",
274 &root, e
275 ));
276 continue;
277 }
278 };
279 if other_parents != parents {
280 add_problem(format!(
281 "parents mismatch: {:?} != {:?}",
282 &parents, other_parents
283 ));
284 }
285
286 to_check.extend(parents);
288 }
289
290 Ok(problems)
291 }
292}