1use std::any::Any;
9use std::fmt;
10use std::sync::Arc;
11
12use nonblocking::non_blocking_result;
13
14use super::hints::Flags;
15use super::AsyncNameSetQuery;
16use super::BoxVertexStream;
17use super::Hints;
18use crate::ops::DagAlgorithm;
19use crate::ops::IdConvert;
20use crate::protocol::disable_remote_protocol;
21use crate::Group;
22use crate::IdSet;
23use crate::IdSetIter;
24use crate::IdSpan;
25use crate::Result;
26use crate::VertexName;
27
28pub struct IdStaticSet {
31 pub(crate) spans: IdSet,
32 pub(crate) map: Arc<dyn IdConvert + Send + Sync>,
33 pub(crate) dag: Arc<dyn DagAlgorithm + Send + Sync>,
34 hints: Hints,
35}
36
37struct Iter {
38 iter: IdSetIter<IdSet>,
39 map: Arc<dyn IdConvert + Send + Sync>,
40 reversed: bool,
41 buf: Vec<Result<VertexName>>,
42}
43
44impl Iter {
45 fn into_box_stream(self) -> BoxVertexStream {
46 Box::pin(futures::stream::unfold(self, |this| this.next()))
47 }
48
49 async fn next(mut self) -> Option<(Result<VertexName>, Self)> {
50 if let Some(name) = self.buf.pop() {
51 return Some((name, self));
52 }
53 let map = &self.map;
54 let opt_id = if self.reversed {
55 self.iter.next_back()
56 } else {
57 self.iter.next()
58 };
59 match opt_id {
60 None => None,
61 Some(id) => {
62 let contains = map
63 .contains_vertex_id_locally(&[id])
64 .await
65 .unwrap_or_default();
66 if contains == &[true] {
67 Some((map.vertex_name(id).await, self))
68 } else {
69 let batch_size = 131072;
71 let mut ids = Vec::with_capacity(batch_size);
72 ids.push(id);
73 for _ in ids.len()..batch_size {
74 if let Some(id) = if self.reversed {
75 self.iter.next_back()
76 } else {
77 self.iter.next()
78 } {
79 ids.push(id);
80 } else {
81 break;
82 }
83 }
84 ids.reverse();
85 self.buf = match self.map.vertex_name_batch(&ids).await {
86 Err(e) => return Some((Err(e), self)),
87 Ok(names) => names,
88 };
89 if self.buf.len() != ids.len() {
90 let result =
91 crate::errors::bug("vertex_name_batch does not return enough items");
92 return Some((result, self));
93 }
94 let name = self.buf.pop().expect("buf is not empty");
95 Some((name, self))
96 }
97 }
98 }
99 }
100}
101
102struct DebugSpan {
103 span: IdSpan,
104 low_name: Option<VertexName>,
105 high_name: Option<VertexName>,
106}
107
108impl fmt::Debug for DebugSpan {
109 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
110 match (
111 self.span.low == self.span.high,
112 &self.low_name,
113 &self.high_name,
114 ) {
115 (true, Some(name), _) => {
116 fmt::Debug::fmt(&name, f)?;
117 write!(f, "+{:?}", self.span.low)?;
118 }
119 (true, None, _) => {
120 write!(f, "{:?}", self.span.low)?;
121 }
122 (false, Some(low), Some(high)) => {
123 fmt::Debug::fmt(&low, f)?;
124 write!(f, ":")?;
125 fmt::Debug::fmt(&high, f)?;
126 write!(f, "+{:?}:{:?}", self.span.low, self.span.high)?;
127 }
128 (false, _, _) => {
129 write!(f, "{:?}:{:?}", self.span.low, self.span.high)?;
130 }
131 }
132 Ok(())
133 }
134}
135
136impl fmt::Debug for IdStaticSet {
137 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
138 write!(f, "<spans ")?;
139 let spans = self.spans.as_spans();
140 let limit = f.width().unwrap_or(3);
141 f.debug_list()
142 .entries(spans.iter().take(limit).map(|span| DebugSpan {
143 span: *span,
144 low_name: disable_remote_protocol(|| {
145 non_blocking_result(self.map.vertex_name(span.low)).ok()
146 }),
147 high_name: disable_remote_protocol(|| {
148 non_blocking_result(self.map.vertex_name(span.high)).ok()
149 }),
150 }))
151 .finish()?;
152 match spans.len().max(limit) - limit {
153 0 => {}
154 1 => write!(f, " + 1 span")?,
155 n => write!(f, " + {} spans", n)?,
156 }
157 write!(f, ">")?;
158 Ok(())
159 }
160}
161
162impl IdStaticSet {
163 pub(crate) fn from_spans_idmap_dag(
164 spans: IdSet,
165 map: Arc<dyn IdConvert + Send + Sync>,
166 dag: Arc<dyn DagAlgorithm + Send + Sync>,
167 ) -> Self {
168 let hints = Hints::new_with_idmap_dag(map.clone(), dag.clone());
169 hints.add_flags(Flags::ID_DESC | Flags::TOPO_DESC);
170 if spans.is_empty() {
171 hints.add_flags(Flags::EMPTY);
172 } else {
173 hints.set_min_id(spans.min().unwrap());
174 hints.set_max_id(spans.max().unwrap());
175 }
176 Self {
177 spans,
178 map,
179 hints,
180 dag,
181 }
182 }
183}
184
185#[async_trait::async_trait]
186impl AsyncNameSetQuery for IdStaticSet {
187 async fn iter(&self) -> Result<BoxVertexStream> {
188 let iter = Iter {
189 iter: self.spans.clone().into_iter(),
190 map: self.map.clone(),
191 reversed: false,
192 buf: Default::default(),
193 };
194 Ok(iter.into_box_stream())
195 }
196
197 async fn iter_rev(&self) -> Result<BoxVertexStream> {
198 let iter = Iter {
199 iter: self.spans.clone().into_iter(),
200 map: self.map.clone(),
201 reversed: true,
202 buf: Default::default(),
203 };
204 Ok(iter.into_box_stream())
205 }
206
207 async fn count(&self) -> Result<usize> {
208 Ok(self.spans.count() as usize)
209 }
210
211 async fn first(&self) -> Result<Option<VertexName>> {
212 debug_assert_eq!(self.spans.max(), self.spans.iter_desc().nth(0));
213 match self.spans.max() {
214 Some(id) => {
215 let map = &self.map;
216 let name = map.vertex_name(id).await?;
217 Ok(Some(name))
218 }
219 None => Ok(None),
220 }
221 }
222
223 async fn last(&self) -> Result<Option<VertexName>> {
224 debug_assert_eq!(self.spans.min(), self.spans.iter_desc().rev().nth(0));
225 match self.spans.min() {
226 Some(id) => {
227 let map = &self.map;
228 let name = map.vertex_name(id).await?;
229 Ok(Some(name))
230 }
231 None => Ok(None),
232 }
233 }
234
235 async fn is_empty(&self) -> Result<bool> {
236 Ok(self.spans.is_empty())
237 }
238
239 async fn contains(&self, name: &VertexName) -> Result<bool> {
240 let result = match self
241 .map
242 .vertex_id_with_max_group(name, Group::NON_MASTER)
243 .await?
244 {
245 Some(id) => self.spans.contains(id),
246 None => false,
247 };
248 Ok(result)
249 }
250
251 async fn contains_fast(&self, name: &VertexName) -> Result<Option<bool>> {
252 self.contains(name).await.map(Some)
253 }
254
255 fn as_any(&self) -> &dyn Any {
256 self
257 }
258
259 fn hints(&self) -> &Hints {
260 &self.hints
261 }
262
263 fn id_convert(&self) -> Option<&dyn IdConvert> {
264 Some(self.map.as_ref() as &dyn IdConvert)
265 }
266}
267
268#[cfg(test)]
269#[allow(clippy::redundant_clone)]
270pub(crate) mod tests {
271 use std::ops::Deref;
272
273 use nonblocking::non_blocking_result as r;
274
275 use super::super::tests::*;
276 use super::super::NameSet;
277 use super::*;
278 use crate::tests::build_segments;
279 use crate::DagAlgorithm;
280 use crate::NameDag;
281
282 pub(crate) fn with_dag<R, F: Fn(&NameDag) -> R>(func: F) -> R {
284 let built = build_segments(
285 r#"
286 A--B--C--D
287 \--E--F--G"#,
288 "D G",
289 2,
290 );
291 func(&built.name_dag)
294 }
295
296 #[test]
297 fn test_dag_invariants() -> Result<()> {
298 with_dag(|dag| {
299 let bef = r(dag.range("B".into(), "F".into()))?;
300 check_invariants(bef.deref())?;
301
302 Ok(())
303 })
304 }
305
306 #[test]
307 fn test_dag_fast_paths() -> Result<()> {
308 with_dag(|dag| {
309 let abcd = r(dag.ancestors("D".into()))?;
310 let abefg = r(dag.ancestors("G".into()))?;
311
312 let ab = abcd.intersection(&abefg);
313 check_invariants(ab.deref())?;
314
315 assert!(nb(abcd.contains(&vec![b'A'].into()))?);
316 assert!(!nb(abcd.contains(&vec![b'E'].into()))?);
317
318 assert_eq!(format!("{:?}", &ab), "<spans [A:B+0:1]>");
320
321 let abcdefg = abcd.union(&abefg);
322 check_invariants(abcd.deref())?;
323 assert_eq!(format!("{:?}", &abcdefg), "<spans [A:G+0:6]>");
325
326 let cd = abcd.difference(&abefg);
327 check_invariants(cd.deref())?;
328 assert_eq!(format!("{:?}", &cd), "<spans [C:D+2:3]>");
330
331 Ok(())
332 })
333 }
334
335 #[test]
336 fn test_dag_no_fast_paths() -> Result<()> {
337 let f = |s: NameSet| -> String { format!("{:?}", s) };
338 with_dag(|dag1| -> Result<()> {
339 with_dag(|dag2| -> Result<()> {
340 let abcd = r(dag1.ancestors("D".into()))?;
341 let abefg = r(dag2.ancestors("G".into()))?;
342
343 let ab = abcd.intersection(&abefg);
347 check_invariants(ab.deref())?;
348 assert_eq!(
350 format!("{:?}", &ab),
351 "<and <spans [A:D+0:3]> <spans [E:G+4:6, A:B+0:1]>>"
352 );
353
354 let abcdefg = abcd.union(&abefg);
355 check_invariants(abcd.deref())?;
356 assert_eq!(
358 format!("{:?}", &abcdefg),
359 "<or <spans [A:D+0:3]> <spans [E:G+4:6, A:B+0:1]>>"
360 );
361
362 let cd = abcd.difference(&abefg);
363 check_invariants(cd.deref())?;
364 assert_eq!(
366 format!("{:?}", &cd),
367 "<diff <spans [A:D+0:3]> <spans [E:G+4:6, A:B+0:1]>>"
368 );
369
370 let a1 = || r(dag1.all()).unwrap();
373 let a2 = || r(dag2.all()).unwrap();
374 assert_eq!(f(a1() & a2()), "<and <spans [A:G+0:6]> <spans [A:G+0:6]>>");
375 assert_eq!(f(a1() | a2()), "<or <spans [A:G+0:6]> <spans [A:G+0:6]>>");
376 assert_eq!(f(a1() - a2()), "<diff <spans [A:G+0:6]> <spans [A:G+0:6]>>");
377
378 let z = || NameSet::from("Z");
382 assert_eq!(f(z() & a2()), "<and <static [Z]> <spans [A:G+0:6]>>");
383 assert_eq!(f(z() | a2()), "<or <static [Z]> <spans [A:G+0:6]>>");
384 assert_eq!(f(z() - a2()), "<diff <static [Z]> <spans [A:G+0:6]>>");
385 assert_eq!(f(a1() & z()), "<and <static [Z]> <spans [A:G+0:6]>>");
386 assert_eq!(f(a1() | z()), "<or <spans [A:G+0:6]> <static [Z]>>");
387 assert_eq!(f(a1() - z()), "<diff <spans [A:G+0:6]> <static [Z]>>");
388
389 let e = || NameSet::empty();
391 assert_eq!(f(e() & a1()), "<empty>");
392 assert_eq!(f(e() | a1()), "<spans [A:G+0:6]>");
393 assert_eq!(f(e() - a1()), "<empty>");
394 assert_eq!(f(a1() & e()), "<empty>");
395 assert_eq!(f(a1() | e()), "<spans [A:G+0:6]>");
396 assert_eq!(f(a1() - e()), "<spans [A:G+0:6]>");
397
398 Ok(())
399 })
400 })
401 }
402
403 #[test]
404 fn test_dag_all() -> Result<()> {
405 with_dag(|dag| {
406 let all = r(dag.all())?;
407 assert_eq!(format!("{:?}", &all), "<spans [A:G+0:6]>");
408
409 let ac: NameSet = "A C".into();
410 let ac = r(dag.sort(&ac))?;
411
412 let intersection = all.intersection(&ac);
413 assert_eq!(format!("{:?}", &intersection), "<spans [C+2, A+0]>");
415 Ok(())
416 })
417 }
418
419 #[test]
420 fn test_sort() -> Result<()> {
421 with_dag(|dag| -> Result<()> {
422 let set = "G C A E".into();
423 let sorted = r(dag.sort(&set))?;
424 assert_eq!(format!("{:?}", &sorted), "<spans [G+6, E+4, C+2] + 1 span>");
425 Ok(())
426 })
427 }
428
429 #[test]
430 fn test_dag_hints_ancestors() -> Result<()> {
431 with_dag(|dag| -> Result<()> {
432 let abc = r(dag.ancestors("B C".into()))?;
433 let abe = r(dag.common_ancestors("E".into()))?;
434 let f: NameSet = "F".into();
435 let all = r(dag.all())?;
436
437 assert!(has_ancestors_flag(abc.clone()));
438 assert!(has_ancestors_flag(abe.clone()));
439 assert!(has_ancestors_flag(all.clone()));
440 assert!(has_ancestors_flag(r(dag.roots(abc.clone()))?));
441 assert!(has_ancestors_flag(r(dag.parents(all.clone()))?));
442
443 assert!(!has_ancestors_flag(f.clone()));
444 assert!(!has_ancestors_flag(r(dag.roots(f.clone()))?));
445 assert!(!has_ancestors_flag(r(dag.parents(f.clone()))?));
446
447 Ok(())
448 })
449 }
450
451 #[test]
452 fn test_dag_hints_ancestors_inheritance() -> Result<()> {
453 with_dag(|dag1| -> Result<()> {
454 with_dag(|dag2| -> Result<()> {
455 let abc = r(dag1.ancestors("B C".into()))?;
456
457 assert!(has_ancestors_flag(r(dag1.sort(&abc))?));
460 assert!(has_ancestors_flag(r(dag1.parents(abc.clone()))?));
461 assert!(has_ancestors_flag(r(dag1.roots(abc.clone()))?));
462
463 assert!(!has_ancestors_flag(r(dag2.sort(&abc))?));
467 assert!(!has_ancestors_flag(r(dag2.parents(abc.clone()))?));
468 assert!(!has_ancestors_flag(r(dag2.roots(abc.clone()))?));
469
470 Ok(())
471 })
472 })
473 }
474
475 #[test]
476 fn test_dag_hints_ancestors_fast_paths() -> Result<()> {
477 with_dag(|dag| -> Result<()> {
478 let bfg: NameSet = "B F G".into();
479
480 bfg.hints().add_flags(Flags::ANCESTORS);
482
483 assert_eq!(
485 format!("{:?}", r(dag.ancestors(bfg.clone()))?),
486 "<static [B, F, G]>"
487 );
488 assert_eq!(format!("{:?}", r(dag.heads(bfg.clone()))?), "<spans [G+6]>");
489
490 let bfg = r(dag.sort(&bfg))?;
492 bfg.hints().add_flags(Flags::ANCESTORS);
493 assert_eq!(
494 format!("{:?}", r(dag.ancestors(bfg.clone()))?),
495 "<spans [F:G+5:6, B+1]>"
496 );
497
498 assert_eq!(format!("{:?}", r(dag.heads(bfg.clone()))?), "<spans [G+6]>");
501
502 assert_eq!(
505 format!("{:?}", r(dag.ancestors(bfg.clone()))?),
506 "<spans [F:G+5:6, B+1]>"
507 );
508
509 Ok(())
510 })
511 }
512
513 fn has_ancestors_flag(set: NameSet) -> bool {
514 set.hints().contains(Flags::ANCESTORS)
515 }
516}