dag/
namedag.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! # namedag
9//!
10//! Combination of IdMap and IdDag.
11
12use std::collections::BTreeMap;
13use std::collections::HashSet;
14use std::env::var;
15use std::fmt;
16use std::io;
17use std::ops::Deref;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::RwLock;
21
22use dag_types::FlatSegment;
23use futures::future::BoxFuture;
24use futures::FutureExt;
25use futures::StreamExt;
26use futures::TryStreamExt;
27use nonblocking::non_blocking_result;
28
29use crate::clone::CloneData;
30use crate::errors::bug;
31use crate::errors::programming;
32use crate::errors::DagError;
33use crate::errors::NotFoundError;
34use crate::id::Group;
35use crate::id::Id;
36use crate::id::VertexName;
37use crate::iddag::IdDag;
38use crate::iddag::IdDagAlgorithm;
39use crate::iddagstore::IdDagStore;
40use crate::idmap::CoreMemIdMap;
41use crate::idmap::IdMapAssignHead;
42use crate::idmap::IdMapWrite;
43use crate::nameset::hints::Flags;
44use crate::nameset::hints::Hints;
45use crate::nameset::NameSet;
46use crate::ops::CheckIntegrity;
47use crate::ops::DagAddHeads;
48use crate::ops::DagAlgorithm;
49use crate::ops::DagExportCloneData;
50use crate::ops::DagExportPullData;
51use crate::ops::DagImportCloneData;
52use crate::ops::DagImportPullData;
53use crate::ops::DagPersistent;
54use crate::ops::DagStrip;
55use crate::ops::IdConvert;
56use crate::ops::IdMapSnapshot;
57use crate::ops::IntVersion;
58use crate::ops::Open;
59use crate::ops::Parents;
60use crate::ops::Persist;
61use crate::ops::PrefixLookup;
62use crate::ops::ToIdSet;
63use crate::ops::TryClone;
64use crate::protocol;
65use crate::protocol::is_remote_protocol_disabled;
66use crate::protocol::AncestorPath;
67use crate::protocol::Process;
68use crate::protocol::RemoteIdConvertProtocol;
69use crate::segment::PreparedFlatSegments;
70use crate::segment::SegmentFlags;
71use crate::types_ext::PreparedFlatSegmentsExt;
72use crate::utils;
73use crate::Error::NeedSlowPath;
74use crate::IdSet;
75use crate::IdSpan;
76use crate::Level;
77use crate::Result;
78use crate::VerLink;
79use crate::VertexListWithOptions;
80
81mod builder;
82#[cfg(any(test, feature = "indexedlog-backend"))]
83mod indexedlog_namedag;
84mod mem_namedag;
85
86pub use builder::NameDagBuilder;
87#[cfg(any(test, feature = "indexedlog-backend"))]
88pub use indexedlog_namedag::IndexedLogNameDagPath;
89#[cfg(any(test, feature = "indexedlog-backend"))]
90pub use indexedlog_namedag::NameDag;
91pub use mem_namedag::MemNameDag;
92pub use mem_namedag::MemNameDagPath;
93
94pub struct AbstractNameDag<I, M, P, S>
95where
96    I: Send + Sync,
97    M: Send + Sync,
98    P: Send + Sync,
99    S: Send + Sync,
100{
101    pub(crate) dag: I,
102    pub(crate) map: M,
103
104    /// A read-only snapshot of the `NameDag`.
105    /// Lazily calculated.
106    snapshot: RwLock<Option<Arc<Self>>>,
107
108    /// Heads added via `add_heads` that are not flushed yet.
109    pending_heads: VertexListWithOptions,
110
111    /// Path used to open this `NameDag`.
112    path: P,
113
114    /// Extra state of the `NameDag`.
115    state: S,
116
117    /// Identity of the dag. Derived from `path`.
118    id: String,
119
120    /// `Id`s that are persisted on disk. Used to answer `dirty()`.
121    persisted_id_set: IdSet,
122
123    /// Overlay IdMap. Used to store IdMap results resolved using remote
124    /// protocols.
125    overlay_map: Arc<RwLock<CoreMemIdMap>>,
126
127    /// `Id`s that are allowed in the `overlay_map`. A protection.
128    /// The `overlay_map` is shared (Arc) and its ID should not exceed the
129    /// existing maximum ID at `map` open time. The IDs from
130    /// 0..overlay_map_next_id are considered immutable, but lazy.
131    overlay_map_id_set: IdSet,
132
133    /// The source of `overlay_map`s. This avoids absolute Ids, and is
134    /// used to flush overlay_map content shall the IdMap change on
135    /// disk.
136    overlay_map_paths: Arc<Mutex<Vec<(AncestorPath, Vec<VertexName>)>>>,
137
138    /// Defines how to communicate with a remote service.
139    /// The actual logic probably involves networking like HTTP etc
140    /// and is intended to be implemented outside the `dag` crate.
141    remote_protocol: Arc<dyn RemoteIdConvertProtocol>,
142
143    /// A negative cache. Vertexes that are looked up remotely, and the remote
144    /// confirmed the vertexes are outside the master group.
145    missing_vertexes_confirmed_by_remote: Arc<RwLock<HashSet<VertexName>>>,
146}
147
148impl<D, M, P, S> AbstractNameDag<D, M, P, S>
149where
150    D: Send + Sync,
151    M: Send + Sync,
152    P: Send + Sync,
153    S: Send + Sync,
154{
155    /// Extract inner states. Useful for advanced use-cases.
156    pub fn into_idmap_dag(self) -> (M, D) {
157        (self.map, self.dag)
158    }
159
160    /// Extract inner states. Useful for advanced use-cases.
161    pub fn into_idmap_dag_path_state(self) -> (M, D, P, S) {
162        (self.map, self.dag, self.path, self.state)
163    }
164}
165
166#[async_trait::async_trait]
167impl<IS, M, P, S> DagPersistent for AbstractNameDag<IdDag<IS>, M, P, S>
168where
169    IS: IdDagStore + Persist,
170    IdDag<IS>: TryClone + 'static,
171    M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
172    P: Open<OpenTarget = Self> + Send + Sync + 'static,
173    S: TryClone + IntVersion + Persist + Send + Sync + 'static,
174{
175    /// Add vertexes and their ancestors to the on-disk DAG.
176    ///
177    /// This is similar to calling `add_heads` followed by `flush`.
178    /// But is faster.
179    async fn add_heads_and_flush(
180        &mut self,
181        parents: &dyn Parents,
182        heads: &VertexListWithOptions,
183    ) -> Result<()> {
184        if !self.pending_heads.is_empty() {
185            return programming(format!(
186                "ProgrammingError: add_heads_and_flush called with pending heads ({:?})",
187                &self.pending_heads.vertexes(),
188            ));
189        }
190
191        // Take lock.
192        //
193        // Reload meta and logs. This drops in-memory changes, which is fine because we have
194        // checked there are no in-memory changes at the beginning.
195        //
196        // Also see comments in `NameDagState::lock()`.
197        let old_version = self.state.int_version();
198        let lock = self.state.lock()?;
199        let map_lock = self.map.lock()?;
200        let dag_lock = self.dag.lock()?;
201        self.state.reload(&lock)?;
202        let new_version = self.state.int_version();
203        if old_version != new_version {
204            self.invalidate_snapshot();
205            self.invalidate_missing_vertex_cache();
206            self.invalidate_overlay_map()?;
207        }
208
209        self.map.reload(&map_lock)?;
210        self.dag.reload(&dag_lock)?;
211
212        // Build.
213        self.build_with_lock(parents, heads, &map_lock).await?;
214
215        // Write to disk.
216        self.map.persist(&map_lock)?;
217        self.dag.persist(&dag_lock)?;
218        self.state.persist(&lock)?;
219        drop(dag_lock);
220        drop(map_lock);
221        drop(lock);
222
223        self.persisted_id_set = self.dag.all_ids_in_groups(&Group::ALL)?;
224        debug_assert_eq!(self.dirty().await?.count().await?, 0);
225        Ok(())
226    }
227
228    /// Write in-memory DAG to disk. This will also pick up changes to
229    /// the DAG by other processes.
230    ///
231    /// This function re-assigns ids for vertexes. That requires the
232    /// pending ids and vertexes to be non-lazy. If you're changing
233    /// internal structures (ex. dag and map) directly, or introducing
234    /// lazy vertexes, then avoid this function. Instead, lock and
235    /// flush directly (see `add_heads_and_flush`, `import_clone_data`).
236    ///
237    /// `heads` specify additional options for special vertexes. This
238    /// overrides the `VertexOptions` provided to `add_head`. If `heads`
239    /// is empty, then `VertexOptions` provided to `add_head` will be
240    /// used.
241    async fn flush(&mut self, heads: &VertexListWithOptions) -> Result<()> {
242        // Sanity check.
243        for result in self.vertex_id_batch(&heads.vertexes()).await? {
244            result?;
245        }
246        // Previous version of the API requires `master_heads: &[Vertex]`.
247        // Warn about possible misuses.
248        if !heads.vertexes_by_group(Group::NON_MASTER).is_empty() {
249            return programming(format!(
250                "NameDag::flush({:?}) is probably misused (group is not master)",
251                heads
252            ));
253        }
254
255        // Write cached IdMap to disk.
256        self.flush_cached_idmap().await?;
257
258        // Constructs a new graph so we can copy pending data from the existing graph.
259        let mut new_name_dag: Self = self.path.open()?;
260
261        let parents: &(dyn DagAlgorithm + Send + Sync) = self;
262        let non_master_heads: VertexListWithOptions = self.pending_heads.clone();
263        let seg_size = self.dag.get_new_segment_size();
264        new_name_dag.dag.set_new_segment_size(seg_size);
265        new_name_dag.set_remote_protocol(self.remote_protocol.clone());
266        new_name_dag.maybe_reuse_caches_from(self);
267        let heads = heads.clone().chain(non_master_heads);
268        new_name_dag.add_heads_and_flush(&parents, &heads).await?;
269        *self = new_name_dag;
270        Ok(())
271    }
272
273    /// Write in-memory IdMap paths to disk so the next time we don't need to
274    /// ask remote service for IdMap translation.
275    #[tracing::instrument(skip(self))]
276    async fn flush_cached_idmap(&self) -> Result<()> {
277        // The map might have changed on disk. We cannot use the ids in overlay_map
278        // directly. Instead, re-translate the paths.
279
280        // Prepare data to insert. Do not hold Mutex across async yield points.
281        let mut to_insert: Vec<(AncestorPath, Vec<VertexName>)> = Vec::new();
282        std::mem::swap(&mut to_insert, &mut *self.overlay_map_paths.lock().unwrap());
283        if to_insert.is_empty() {
284            return Ok(());
285        }
286
287        // Lock, reload from disk. Use a new state so the existing dag is not affected.
288        tracing::debug!(target: "dag::cache", "flushing cached idmap ({} items)", to_insert.len());
289        let mut new: Self = self.path.open()?;
290        let lock = new.state.lock()?;
291        let map_lock = new.map.lock()?;
292        let dag_lock = new.dag.lock()?;
293        new.state.reload(&lock)?;
294        new.map.reload(&map_lock)?;
295        new.dag.reload(&dag_lock)?;
296        new.maybe_reuse_caches_from(self);
297        std::mem::swap(&mut to_insert, &mut *new.overlay_map_paths.lock().unwrap());
298        new.flush_cached_idmap_with_lock(&map_lock).await?;
299
300        new.state.persist(&lock)?;
301
302        Ok(())
303    }
304}
305
306impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
307where
308    IS: IdDagStore,
309    IdDag<IS>: TryClone + 'static,
310    M: TryClone + IdConvert + IdMapWrite + Persist + Send + Sync + 'static,
311    P: Send + Sync + 'static,
312    S: TryClone + Send + Sync + 'static,
313{
314    /// Implementation detail. Must be protected by a lock.
315    async fn flush_cached_idmap_with_lock(&mut self, map_lock: &M::Lock) -> Result<()> {
316        let mut to_insert: Vec<(AncestorPath, Vec<VertexName>)> = Vec::new();
317        std::mem::swap(&mut to_insert, &mut *self.overlay_map_paths.lock().unwrap());
318        if to_insert.is_empty() {
319            return Ok(());
320        }
321
322        let id_names = calculate_id_name_from_paths(
323            &self.map,
324            &*self.dag,
325            &self.overlay_map_id_set,
326            &to_insert,
327        )
328        .await?;
329
330        // For testing purpose, skip inserting certain vertexes.
331        let mut skip_vertexes: Option<HashSet<VertexName>> = None;
332        if crate::is_testing() {
333            if let Ok(s) = var("DAG_SKIP_FLUSH_VERTEXES") {
334                skip_vertexes = Some(
335                    s.split(",")
336                        .filter_map(|s| VertexName::from_hex(s.as_bytes()).ok())
337                        .collect(),
338                )
339            }
340        }
341
342        for (id, name) in id_names {
343            if let Some(skip) = &skip_vertexes {
344                if skip.contains(&name) {
345                    tracing::info!(
346                        target: "dag::cache",
347                        "skip flushing {:?}-{} to IdMap set by DAG_SKIP_FLUSH_VERTEXES",
348                        &name,
349                        id
350                    );
351                    continue;
352                }
353            }
354            tracing::debug!(target: "dag::cache", "insert {:?}-{} to IdMap", &name, id);
355            self.map.insert(id, name.as_ref()).await?;
356        }
357
358        self.map.persist(map_lock)?;
359        Ok(())
360    }
361}
362
363impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
364where
365    IS: Send + Sync + 'static,
366    M: Send + Sync + 'static,
367    P: Send + Sync + 'static,
368    S: IntVersion + Send + Sync + 'static,
369{
370    /// Attempt to reuse caches from `other` if two `NameDag`s are compatible.
371    /// Usually called when `self` is newly created.
372    fn maybe_reuse_caches_from(&mut self, other: &Self) {
373        if self.state.int_version() != other.state.int_version()
374            || self.persisted_id_set.as_spans() != other.persisted_id_set.as_spans()
375        {
376            tracing::debug!(target: "dag::cache", "cannot reuse cache");
377            return;
378        }
379        tracing::debug!(
380            target: "dag::cache", "reusing cache ({} missing)",
381            other.missing_vertexes_confirmed_by_remote.read().unwrap().len(),
382        );
383        self.missing_vertexes_confirmed_by_remote =
384            other.missing_vertexes_confirmed_by_remote.clone();
385        self.overlay_map = other.overlay_map.clone();
386        self.overlay_map_paths = other.overlay_map_paths.clone();
387    }
388}
389
390#[async_trait::async_trait]
391impl<IS, M, P, S> DagAddHeads for AbstractNameDag<IdDag<IS>, M, P, S>
392where
393    IS: IdDagStore,
394    IdDag<IS>: TryClone,
395    M: TryClone + IdMapAssignHead + Send + Sync + 'static,
396    P: TryClone + Send + Sync + 'static,
397    S: TryClone + Send + Sync + 'static,
398{
399    /// Add vertexes and their ancestors to the in-memory DAG.
400    ///
401    /// This does not write to disk. Use `add_heads_and_flush` to add heads
402    /// and write to disk more efficiently.
403    ///
404    /// The added vertexes are immediately query-able.
405    ///
406    /// Note: heads with `reserve_size > 0` must be passed in even if they
407    /// already exist and are not being added to the graph for the id
408    /// reservation to work correctly.
409    async fn add_heads(
410        &mut self,
411        parents: &dyn Parents,
412        heads: &VertexListWithOptions,
413    ) -> Result<bool> {
414        self.invalidate_snapshot();
415
416        // Populate vertex negative cache to reduce round-trips doing remote lookups.
417        self.populate_missing_vertexes_for_add_heads(parents, &heads.vertexes())
418            .await?;
419
420        // heads might require highest_group = MASTER. That might trigger
421        // id re-assigning if the NON_MASTER group is not empty. For simplicity,
422        // we don't want to deal with id reassignment here.
423        //
424        // Practically, there are 2 use-cases:
425        // - Server wants highest_group = MASTER and it does not use NON_MASTER.
426        // - Client only needs highest_group = NON_MASTER (default) here.
427        //
428        // Support both cases. That is:
429        // - If highest_group = MASTER is specified, then NON_MASTER group
430        //   must be empty to ensure no id reassignment (checked below).
431        // - If highest_group = MASTER is not used, then it's okay whatever.
432        let master_heads = heads.vertexes_by_group(Group::MASTER);
433        if !master_heads.is_empty() {
434            let all = self.dag.all()?;
435            let has_non_master = match all.max() {
436                Some(id) => id.group() == Group::NON_MASTER,
437                None => false,
438            };
439            if has_non_master {
440                return programming(concat!(
441                    "add_heads() called with highest_group = MASTER but NON_MASTER group is not empty. ",
442                    "To avoid id reassignment this is not supported. ",
443                    "Pass highest_group = NON_MASTER, and call flush() (common on client use-case), ",
444                    "or avoid inserting to NON_MASTER group (common on server use-case).",
445                ));
446            }
447        }
448
449        // Performance-wise, add_heads + flush is slower than
450        // add_heads_and_flush.
451        //
452        // Practically, the callsite might want to use add_heads + flush
453        // instead of add_heads_and_flush, if:
454        // - The callsites cannot figure out "master_heads" at the same time
455        //   it does the graph change. For example, hg might know commits
456        //   before bookmark movements.
457        // - The callsite is trying some temporary graph changes, and does
458        //   not want to pollute the on-disk DAG. For example, calculating
459        //   a preview of a rebase.
460        // Update IdMap. Keep track of what heads are added.
461        let mut outcome = PreparedFlatSegments::default();
462        let mut covered = self.dag().all_ids_in_groups(&Group::ALL)?;
463        let mut reserved = calculate_initial_reserved(self, &covered, heads).await?;
464        for (head, opts) in heads.vertex_options() {
465            let need_assigning = match self
466                .vertex_id_with_max_group(&head, opts.highest_group)
467                .await?
468            {
469                Some(id) => !self.dag.contains_id(id)?,
470                None => true,
471            };
472            if need_assigning {
473                let group = opts.highest_group;
474                let prepared_segments = self
475                    .assign_head(head.clone(), parents, group, &mut covered, &reserved)
476                    .await?;
477                outcome.merge(prepared_segments);
478                if opts.reserve_size > 0 {
479                    let low = self.map.vertex_id(head.clone()).await? + 1;
480                    update_reserved(&mut reserved, &covered, low, opts.reserve_size);
481                }
482                self.pending_heads.push((head, opts));
483            }
484        }
485
486        // Update segments in the NON_MASTER group.
487        self.dag
488            .build_segments_from_prepared_flat_segments(&outcome)?;
489
490        Ok(outcome.segment_count() > 0)
491    }
492}
493
494#[async_trait::async_trait]
495impl<IS, M, P, S> DagStrip for AbstractNameDag<IdDag<IS>, M, P, S>
496where
497    IS: IdDagStore + Persist,
498    IdDag<IS>: TryClone,
499    M: TryClone + Persist + IdMapWrite + IdConvert + Send + Sync + 'static,
500    P: TryClone + Open<OpenTarget = Self> + Send + Sync + 'static,
501    S: TryClone + IntVersion + Persist + Send + Sync + 'static,
502{
503    async fn strip(&mut self, set: &NameSet) -> Result<()> {
504        if !self.pending_heads.is_empty() {
505            return programming(format!(
506                "strip does not support pending heads ({:?})",
507                &self.pending_heads.vertexes(),
508            ));
509        }
510
511        // Do strip with a lock to avoid cases where descendants are added to
512        // the stripped segments.
513        let mut new: Self = self.path.open()?;
514        let (lock, map_lock, dag_lock) = new.reload()?;
515        new.set_remote_protocol(self.remote_protocol.clone());
516        new.maybe_reuse_caches_from(self);
517
518        new.strip_with_lock(set, &map_lock).await?;
519        new.persist(lock, map_lock, dag_lock)?;
520
521        *self = new;
522        Ok(())
523    }
524}
525
526impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
527where
528    IS: IdDagStore,
529    IdDag<IS>: TryClone,
530    M: TryClone + Persist + IdMapWrite + IdConvert + Send + Sync + 'static,
531    P: TryClone + Send + Sync + 'static,
532    S: TryClone + Send + Sync + 'static,
533{
534    /// Internal impelementation of "strip".
535    async fn strip_with_lock(&mut self, set: &NameSet, map_lock: &M::Lock) -> Result<()> {
536        if !self.pending_heads.is_empty() {
537            return programming(format!(
538                "strip does not support pending heads ({:?})",
539                &self.pending_heads.vertexes(),
540            ));
541        }
542
543        let id_set = self.to_id_set(set).await?;
544
545        // Heads in the master group must be known. Strip might "create" heads that are not
546        // currently known. Resolve them to ensure graph integrity.
547        let head_ids: Vec<Id> = {
548            // strip will include descendants.
549            let to_strip = self.dag.descendants(id_set.clone())?;
550            // only vertexes in the master group can be lazy.
551            let master_group = self.dag.master_group()?;
552            let master_group_after_strip = master_group.difference(&to_strip);
553            let heads_before_strip = self.dag.heads_ancestors(master_group)?;
554            let heads_after_strip = self.dag.heads_ancestors(master_group_after_strip)?;
555            let new_heads = heads_after_strip.difference(&heads_before_strip);
556            new_heads.iter_desc().collect()
557        };
558        let heads_after_strip = self.vertex_name_batch(&head_ids).await?;
559        tracing::debug!(target: "dag::strip", "heads after strip: {:?}", &heads_after_strip);
560        // Write IdMap cache first, they will become problematic to write
561        // after "remove" because the `VerLink`s might become incompatible.
562        self.flush_cached_idmap_with_lock(map_lock).await?;
563
564        let removed_id_set = self.dag.strip(id_set)?;
565        tracing::debug!(target: "dag::strip", "removed id set: {:?}", &removed_id_set);
566
567        let mut removed_vertexes = Vec::new();
568        for span in removed_id_set.iter_span_desc() {
569            let vertexes = self.map.remove_range(span.low, span.high).await?;
570            removed_vertexes.extend(vertexes);
571        }
572        tracing::debug!(target: "dag::strip", "removed vertexes: {:?}", &removed_vertexes);
573
574        // Add removed names to missing cache.
575        self.missing_vertexes_confirmed_by_remote
576            .write()
577            .unwrap()
578            .extend(removed_vertexes);
579
580        // Snapshot cannot be reused.
581        self.invalidate_snapshot();
582
583        Ok(())
584    }
585}
586
587#[async_trait::async_trait]
588impl<IS, M, P, S> IdMapWrite for AbstractNameDag<IdDag<IS>, M, P, S>
589where
590    IS: IdDagStore,
591    IdDag<IS>: TryClone,
592    M: TryClone + IdMapAssignHead + Send + Sync,
593    P: TryClone + Send + Sync,
594    S: TryClone + Send + Sync,
595{
596    async fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
597        self.map.insert(id, name).await
598    }
599
600    async fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<VertexName>> {
601        self.map.remove_range(low, high).await
602    }
603}
604
605#[async_trait::async_trait]
606impl<IS, M, P, S> DagImportCloneData for AbstractNameDag<IdDag<IS>, M, P, S>
607where
608    IS: IdDagStore + Persist + 'static,
609    IdDag<IS>: TryClone,
610    M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
611    P: TryClone + Send + Sync + 'static,
612    S: TryClone + Persist + Send + Sync + 'static,
613{
614    async fn import_clone_data(&mut self, clone_data: CloneData<VertexName>) -> Result<()> {
615        // Write directly to disk. Bypassing "flush()" that re-assigns Ids
616        // using parent functions.
617        let (lock, map_lock, dag_lock) = self.reload()?;
618
619        if !self.dag.all()?.is_empty() {
620            return programming("Cannot import clone data for non-empty graph");
621        }
622        for (id, name) in clone_data.idmap {
623            tracing::debug!(target: "dag::clone", "insert IdMap: {:?}-{:?}", &name, id);
624            self.map.insert(id, name.as_ref()).await?;
625        }
626        self.dag
627            .build_segments_from_prepared_flat_segments(&clone_data.flat_segments)?;
628
629        self.verify_missing().await?;
630
631        self.persist(lock, map_lock, dag_lock)
632    }
633}
634
635impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
636where
637    IS: IdDagStore + Persist + 'static,
638    IdDag<IS>: TryClone,
639    M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
640    P: TryClone + Send + Sync + 'static,
641    S: TryClone + Persist + Send + Sync + 'static,
642{
643    /// Verify that universally known vertexes and heads are present in IdMap.
644    async fn verify_missing(&self) -> Result<()> {
645        let missing: Vec<Id> = self.check_universal_ids().await?;
646        if !missing.is_empty() {
647            let msg = format!(
648                concat!(
649                    "Clone data does not contain vertex for {:?}. ",
650                    "This is most likely a server-side bug."
651                ),
652                missing,
653            );
654            return programming(msg);
655        }
656
657        Ok(())
658    }
659
660    fn reload(&mut self) -> Result<(S::Lock, M::Lock, IS::Lock)> {
661        let lock = self.state.lock()?;
662        let map_lock = self.map.lock()?;
663        let dag_lock = self.dag.lock()?;
664        self.state.reload(&lock)?;
665        self.map.reload(&map_lock)?;
666        self.dag.reload(&dag_lock)?;
667
668        Ok((lock, map_lock, dag_lock))
669    }
670
671    fn persist(&mut self, lock: S::Lock, map_lock: M::Lock, dag_lock: IS::Lock) -> Result<()> {
672        self.map.persist(&map_lock)?;
673        self.dag.persist(&dag_lock)?;
674        self.state.persist(&lock)?;
675
676        self.invalidate_overlay_map()?;
677        self.persisted_id_set = self.dag.all_ids_in_groups(&Group::ALL)?;
678
679        Ok(())
680    }
681}
682
683#[async_trait::async_trait]
684impl<IS, M, P, S> DagImportPullData for AbstractNameDag<IdDag<IS>, M, P, S>
685where
686    IS: IdDagStore + Persist,
687    IdDag<IS>: TryClone,
688    M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
689    P: Open<OpenTarget = Self> + TryClone + Send + Sync + 'static,
690    S: IntVersion + TryClone + Persist + Send + Sync + 'static,
691{
692    async fn import_pull_data(
693        &mut self,
694        clone_data: CloneData<VertexName>,
695        heads: &VertexListWithOptions,
696    ) -> Result<()> {
697        if !self.pending_heads.is_empty() {
698            return programming(format!(
699                "import_pull_data called with pending heads ({:?})",
700                &self.pending_heads.vertexes(),
701            ));
702        }
703        let non_master_heads = heads.vertexes_by_group(Group::NON_MASTER);
704        if !non_master_heads.is_empty() {
705            return programming(format!(
706                concat!(
707                    "import_pull_data called with non-master heads ({:?}). ",
708                    "This is unsupported because the pull data is lazy and can only be inserted to the master group.",
709                ),
710                non_master_heads
711            ));
712        }
713
714        for id in clone_data.flat_segments.parents_head_and_roots() {
715            if !clone_data.idmap.contains_key(&id) {
716                return programming(format!(
717                    "server does not provide name for id {:?} in pull data",
718                    id
719                ));
720            }
721        }
722
723        // Constructs a new graph so we don't expose a broken `self` state on error.
724        let mut new: Self = self.path.open()?;
725        let (lock, map_lock, dag_lock) = new.reload()?;
726        new.set_remote_protocol(self.remote_protocol.clone());
727        new.maybe_reuse_caches_from(self);
728
729        // Parents that should exist in the local graph. Look them up in 1 round-trip
730        // and insert to the local graph.
731        // Also check that roots of the new segments do not overlap with the local graph.
732        // For example,
733        //
734        //      D          When the client has B (and A, C), and is pulling D,
735        //     /|\         the server provides D, E, F, with parents B and C,
736        //    F B E        and roots F and E.
737        //      |\|        The client must have B and C, and must not have F
738        //      A C        or E.
739        {
740            let mut root_ids: Vec<Id> = Vec::new();
741            let mut parent_ids: Vec<Id> = Vec::new();
742            let segments = &clone_data.flat_segments.segments;
743            let id_set = IdSet::from_spans(segments.iter().map(|s| s.low..=s.high));
744            for seg in segments {
745                let pids: Vec<Id> = seg.parents.iter().copied().collect();
746                // Parents that are not part of the pull vertexes should exist
747                // in the local graph.
748                let connected_pids: Vec<Id> = pids
749                    .iter()
750                    .copied()
751                    .filter(|&p| !id_set.contains(p))
752                    .collect();
753                if connected_pids.len() == pids.len() {
754                    // The "low" of the segment is a root (of vertexes to insert).
755                    // It needs an overlap check.
756                    root_ids.push(seg.low);
757                }
758                parent_ids.extend(connected_pids);
759            }
760
761            let to_names = |ids: &[Id], hint: &str| -> Result<Vec<VertexName>> {
762                let names = ids.iter().map(|i| match clone_data.idmap.get(&i) {
763                    Some(v) => Ok(v.clone()),
764                    None => {
765                        programming(format!("server does not provide name for {} {:?}", hint, i))
766                    }
767                });
768                names.collect()
769            };
770
771            let parent_names = to_names(&parent_ids, "parent")?;
772            let root_names = to_names(&root_ids, "root")?;
773            tracing::trace!(
774                "pull: connected parents: {:?}, roots: {:?}",
775                &parent_names,
776                &root_names
777            );
778
779            // Pre-lookup in one round-trip.
780            let mut names = parent_names
781                .iter()
782                .chain(root_names.iter())
783                .cloned()
784                .collect::<Vec<_>>();
785            names.sort_unstable();
786            names.dedup();
787            let resolved = new.vertex_id_batch(&names).await?;
788            assert_eq!(resolved.len(), names.len());
789            for (id, name) in resolved.into_iter().zip(names) {
790                if let Ok(id) = id {
791                    if !new.map.contains_vertex_name(&name).await? {
792                        tracing::debug!(target: "dag::pull", "insert IdMap: {:?}-{:?}", &name, id);
793                        new.map.insert(id, name.as_ref()).await?;
794                    }
795                }
796            }
797
798            for name in root_names {
799                if new.contains_vertex_name(&name).await? {
800                    let e = NeedSlowPath(format!("{:?} exists in local graph", name));
801                    return Err(e);
802                }
803            }
804
805            let client_parents = new.vertex_id_batch(&parent_names).await?;
806            client_parents.into_iter().collect::<Result<Vec<Id>>>()?;
807        }
808
809        // Prepare states used below.
810        let mut prepared_client_segments = PreparedFlatSegments::default();
811        let server_idmap = &clone_data.idmap;
812        let server_idmap_by_name: BTreeMap<&VertexName, Id> =
813            server_idmap.iter().map(|(&id, name)| (name, id)).collect();
814        // `taken` is the union of `covered` and `reserved`, mainly used by `find_free_span`.
815        let mut taken = {
816            let covered = new.dag().all_ids_in_groups(&[Group::MASTER])?;
817            // Normally we would want `calculate_initial_reserved` here. But we calculate head
818            // reservation for all `heads` in order, instead of just considering heads in the
819            // `clone_data`. So we're fine without the "initial reserved". In other words, the
820            // `calculate_initial_reserved` logic is "inlined" into the `for ... in heads`
821            // loop below.
822            covered
823        };
824
825        // Index used by lookups.
826        let server_seg_by_high: BTreeMap<Id, &FlatSegment> = clone_data
827            .flat_segments
828            .segments
829            .iter()
830            .map(|s| (s.high, s))
831            .collect();
832        let find_server_seg_contains_server_id = |server_id: Id| -> Result<&FlatSegment> {
833            let seg = match server_seg_by_high.range(server_id..).next() {
834                Some((_high, &seg)) => {
835                    if seg.low <= server_id && seg.high >= server_id {
836                        Some(seg)
837                    } else {
838                        None
839                    }
840                }
841                None => None,
842            };
843            seg.ok_or_else(|| {
844                DagError::Programming(format!(
845                    "server does not provide segment covering id {}",
846                    server_id
847                ))
848            })
849        };
850
851        // Insert segments by visiting the heads.
852        //
853        // Similar to `IdMap::assign_head`, but insert a segment at a time,
854        // not a vertex at a time.
855        //
856        // Only the MASTER group supports laziness. So we only care about it.
857        for (head, opts) in heads.vertex_options() {
858            let mut stack: Vec<&FlatSegment> = vec![];
859            if let Some(&head_server_id) = server_idmap_by_name.get(&head) {
860                let head_server_seg = find_server_seg_contains_server_id(head_server_id)?;
861                stack.push(head_server_seg);
862            }
863
864            while let Some(server_seg) = stack.pop() {
865                let high_vertex = server_idmap[&server_seg.high].clone();
866                let client_high_id = new
867                    .map
868                    .vertex_id_with_max_group(&high_vertex, Group::NON_MASTER)
869                    .await?;
870                match client_high_id {
871                    Some(id) if id.group() == Group::MASTER => {
872                        // `server_seg` is present in MASTER group (previously inserted
873                        // by this loop). No need to insert or visit parents.
874                        continue;
875                    }
876                    Some(id) => {
877                        // `id` in NON_MASTER group. This should not really happen because we have
878                        // checked all "roots" are missing in the local graph. See `NeedSlowPath`
879                        // above.
880                        let e = NeedSlowPath(format!(
881                            "{:?} exists in local graph as {:?} - fast path requires MASTER group",
882                            &high_vertex, id
883                        ));
884                        return Err(e);
885                    }
886                    None => {}
887                }
888
889                let parent_server_ids = &server_seg.parents;
890                let parent_names: Vec<VertexName> = {
891                    let iter = parent_server_ids.iter().map(|id| server_idmap[id].clone());
892                    iter.collect()
893                };
894
895                // The client parent ids in the MASTER group.
896                let mut parent_client_ids = Vec::new();
897                let mut missng_parent_server_ids = Vec::new();
898
899                // Calculate `parent_client_ids`, and `missng_parent_server_ids`.
900                // Intentiaonlly using `new.map` not `new` to bypass remote lookups.
901                {
902                    let client_id_res = new.map.vertex_id_batch(&parent_names).await?;
903                    assert_eq!(client_id_res.len(), parent_server_ids.len());
904                    for (res, &server_id) in client_id_res.into_iter().zip(parent_server_ids) {
905                        match res {
906                            Ok(id) if id.group() != Group::MASTER => {
907                                return Err(NeedSlowPath(format!(
908                                    "{:?} exists id in local graph as {:?} - fast path requires MASTER group",
909                                    &parent_names, id
910                                )));
911                            }
912                            Ok(id) => {
913                                parent_client_ids.push(id);
914                            }
915                            Err(crate::Error::VertexNotFound(_)) => {
916                                missng_parent_server_ids.push(server_id);
917                            }
918                            Err(e) => return Err(e),
919                        }
920                    }
921                }
922
923                if !missng_parent_server_ids.is_empty() {
924                    // Parents are not ready. Needs revisit this segment after inserting parents.
925                    stack.push(server_seg);
926                    // Insert missing parents.
927                    // First parent, first insertion.
928                    for &server_id in missng_parent_server_ids.iter().rev() {
929                        let parent_server_seg = find_server_seg_contains_server_id(server_id)?;
930                        stack.push(parent_server_seg);
931                    }
932                    continue;
933                }
934
935                // All parents are present. Time to insert this segment.
936                // Find a suitable low..=high range.
937                let candidate_id = parent_client_ids
938                    .iter()
939                    .max()
940                    .copied()
941                    .unwrap_or(Group::MASTER.min_id())
942                    + 1;
943                let size = server_seg.high.0 - server_seg.low.0 + 1;
944                let span = find_free_span(&taken, candidate_id, size, false);
945
946                // Map the server_seg.low..=server_seg.high to client span.low..=span.high.
947                // Insert to IdMap.
948                for (&server_id, name) in server_idmap.range(server_seg.low..=server_seg.high) {
949                    let client_id = server_id + span.low.0 - server_seg.low.0;
950                    if client_id.group() != Group::MASTER {
951                        return Err(crate::Error::IdOverflow(Group::MASTER));
952                    }
953                    new.map.insert(client_id, name.as_ref()).await?;
954                }
955
956                // Prepare insertion to IdDag.
957                prepared_client_segments.push_segment(span.low, span.high, &parent_client_ids);
958
959                // Mark the range as taken.
960                taken.push(span);
961            }
962
963            // Consider reservation for `head` by updating `taken`.
964            if opts.reserve_size > 0 {
965                let head_client_id = new.map.vertex_id(head).await?;
966                let span = find_free_span(&taken, head_client_id + 1, opts.reserve_size as _, true);
967                taken.push(span);
968            }
969        }
970
971        new.dag
972            .build_segments_from_prepared_flat_segments(&prepared_client_segments)?;
973
974        if cfg!(debug_assertions) {
975            new.verify_missing().await?;
976        }
977
978        new.persist(lock, map_lock, dag_lock)?;
979        *self = new;
980        Ok(())
981    }
982}
983
984#[async_trait::async_trait]
985impl<IS, M, P, S> DagExportCloneData for AbstractNameDag<IdDag<IS>, M, P, S>
986where
987    IS: IdDagStore,
988    IdDag<IS>: TryClone,
989    M: IdConvert + TryClone + Send + Sync + 'static,
990    P: TryClone + Send + Sync + 'static,
991    S: TryClone + Send + Sync + 'static,
992{
993    async fn export_clone_data(&self) -> Result<CloneData<VertexName>> {
994        let idmap: BTreeMap<Id, VertexName> = {
995            let ids: Vec<Id> = self.dag.universal_ids()?.into_iter().collect();
996            tracing::debug!("export: {} universally known vertexes", ids.len());
997            let names = {
998                let fallible_names = self.vertex_name_batch(&ids).await?;
999                let mut names = Vec::with_capacity(fallible_names.len());
1000                for name in fallible_names {
1001                    names.push(name?);
1002                }
1003                names
1004            };
1005            ids.into_iter().zip(names).collect()
1006        };
1007
1008        let flat_segments: PreparedFlatSegments = {
1009            let segments = self.dag.next_segments(Id::MIN, 0)?;
1010            let mut prepared = Vec::with_capacity(segments.len());
1011            for segment in segments {
1012                let span = segment.span()?;
1013                let parents = segment.parents()?;
1014                prepared.push(FlatSegment {
1015                    low: span.low,
1016                    high: span.high,
1017                    parents,
1018                });
1019            }
1020            PreparedFlatSegments {
1021                segments: prepared.into_iter().collect(),
1022            }
1023        };
1024
1025        let data = CloneData {
1026            flat_segments,
1027            idmap,
1028        };
1029        Ok(data)
1030    }
1031}
1032
1033#[async_trait::async_trait]
1034impl<IS, M, P, S> DagExportPullData for AbstractNameDag<IdDag<IS>, M, P, S>
1035where
1036    IS: IdDagStore,
1037    IdDag<IS>: TryClone,
1038    M: IdConvert + TryClone + Send + Sync + 'static,
1039    P: TryClone + Send + Sync + 'static,
1040    S: TryClone + Send + Sync + 'static,
1041{
1042    async fn export_pull_data(&self, set: &NameSet) -> Result<CloneData<VertexName>> {
1043        let id_set = self.to_id_set(&set).await?;
1044
1045        let flat_segments = self.dag.idset_to_flat_segments(id_set)?;
1046        let ids: Vec<_> = flat_segments.parents_head_and_roots().into_iter().collect();
1047
1048        let idmap: BTreeMap<Id, VertexName> = {
1049            tracing::debug!("pull: {} vertexes in idmap", ids.len());
1050            let names = {
1051                let fallible_names = self.vertex_name_batch(&ids).await?;
1052                let mut names = Vec::with_capacity(fallible_names.len());
1053                for name in fallible_names {
1054                    names.push(name?);
1055                }
1056                names
1057            };
1058            assert_eq!(ids.len(), names.len());
1059            ids.into_iter().zip(names).collect()
1060        };
1061
1062        let data = CloneData {
1063            flat_segments,
1064            idmap,
1065        };
1066        Ok(data)
1067    }
1068}
1069
1070impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
1071where
1072    IS: IdDagStore,
1073    IdDag<IS>: TryClone,
1074    M: TryClone + Send + Sync,
1075    P: TryClone + Send + Sync,
1076    S: TryClone + Send + Sync,
1077{
1078    /// Invalidate cached content. Call this before changing the graph
1079    /// so `version` in `snapshot` is dropped, and `version.bump()` might
1080    /// have a faster path.
1081    ///
1082    /// Forgetting to call this function might hurt performance a bit, but does
1083    /// not affect correctness.
1084    fn invalidate_snapshot(&mut self) {
1085        *self.snapshot.write().unwrap() = None;
1086    }
1087
1088    fn invalidate_missing_vertex_cache(&mut self) {
1089        tracing::debug!(target: "dag::cache", "cleared missing cache");
1090        *self.missing_vertexes_confirmed_by_remote.write().unwrap() = Default::default();
1091    }
1092
1093    fn invalidate_overlay_map(&mut self) -> Result<()> {
1094        self.overlay_map = Default::default();
1095        self.update_overlay_map_id_set()?;
1096        tracing::debug!(target: "dag::cache", "cleared overlay map cache");
1097        Ok(())
1098    }
1099
1100    fn update_overlay_map_id_set(&mut self) -> Result<()> {
1101        self.overlay_map_id_set = self.dag.master_group()?;
1102        Ok(())
1103    }
1104
1105    /// Attempt to get a snapshot of this graph.
1106    pub(crate) fn try_snapshot(&self) -> Result<Arc<Self>> {
1107        if let Some(s) = self.snapshot.read().unwrap().deref() {
1108            if s.dag.version() == self.dag.version() {
1109                return Ok(Arc::clone(s));
1110            }
1111        }
1112
1113        let mut snapshot = self.snapshot.write().unwrap();
1114        match snapshot.deref() {
1115            Some(s) if s.dag.version() == self.dag.version() => Ok(s.clone()),
1116            _ => {
1117                let cloned = Self {
1118                    dag: self.dag.try_clone()?,
1119                    map: self.map.try_clone()?,
1120                    snapshot: Default::default(),
1121                    pending_heads: self.pending_heads.clone(),
1122                    persisted_id_set: self.persisted_id_set.clone(),
1123                    path: self.path.try_clone()?,
1124                    state: self.state.try_clone()?,
1125                    id: self.id.clone(),
1126                    // If we do deep clone here we can remove `overlay_map_next_id`
1127                    // protection. However that could be too expensive.
1128                    overlay_map: Arc::clone(&self.overlay_map),
1129                    overlay_map_id_set: self.overlay_map_id_set.clone(),
1130                    overlay_map_paths: Arc::clone(&self.overlay_map_paths),
1131                    remote_protocol: self.remote_protocol.clone(),
1132                    missing_vertexes_confirmed_by_remote: Arc::clone(
1133                        &self.missing_vertexes_confirmed_by_remote,
1134                    ),
1135                };
1136                let result = Arc::new(cloned);
1137                *snapshot = Some(Arc::clone(&result));
1138                Ok(result)
1139            }
1140        }
1141    }
1142
1143    pub fn dag(&self) -> &IdDag<IS> {
1144        &self.dag
1145    }
1146
1147    pub fn map(&self) -> &M {
1148        &self.map
1149    }
1150
1151    /// Set the remote protocol for converting between Id and Vertex remotely.
1152    ///
1153    /// This is usually used on "sparse" ("lazy") Dag where the IdMap is incomplete
1154    /// for vertexes in the master groups.
1155    pub fn set_remote_protocol(&mut self, protocol: Arc<dyn RemoteIdConvertProtocol>) {
1156        self.remote_protocol = protocol;
1157    }
1158
1159    pub(crate) fn get_remote_protocol(&self) -> Arc<dyn RemoteIdConvertProtocol> {
1160        self.remote_protocol.clone()
1161    }
1162}
1163
1164impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
1165where
1166    IS: IdDagStore,
1167    IdDag<IS>: TryClone,
1168    M: TryClone + IdMapAssignHead + Send + Sync + 'static,
1169    P: TryClone + Send + Sync + 'static,
1170    S: TryClone + Send + Sync + 'static,
1171{
1172    async fn populate_missing_vertexes_for_add_heads(
1173        &mut self,
1174        parents: &dyn Parents,
1175        heads: &[VertexName],
1176    ) -> Result<()> {
1177        if self.is_vertex_lazy() {
1178            let unassigned = calculate_definitely_unassigned_vertexes(self, parents, heads).await?;
1179            let mut missing = self.missing_vertexes_confirmed_by_remote.write().unwrap();
1180            for v in unassigned {
1181                if missing.insert(v.clone()) {
1182                    tracing::trace!(target: "dag::cache", "cached missing {:?} (definitely missing)", &v);
1183                }
1184            }
1185        }
1186        Ok(())
1187    }
1188}
1189
1190/// Calculate vertexes that are definitely not assigned (not in the IdMap,
1191/// and not in the lazy part of the IdMap) according to
1192/// `hint_pending_subdag`. This does not report all unassigned vertexes.
1193/// But the reported vertexes are guaranteed not assigned.
1194///
1195/// If X is assigned, then X's parents must have been assigned.
1196/// If X is not assigned, then all X's descendants are not assigned.
1197///
1198/// This function visits the "roots" of "parents", and if they are not assigned,
1199/// then add their descendants to the "unassigned" result set.
1200async fn calculate_definitely_unassigned_vertexes<IS, M, P, S>(
1201    this: &AbstractNameDag<IdDag<IS>, M, P, S>,
1202    parents: &dyn Parents,
1203    heads: &[VertexName],
1204) -> Result<Vec<VertexName>>
1205where
1206    IS: IdDagStore,
1207    IdDag<IS>: TryClone,
1208    M: TryClone + IdMapAssignHead + Send + Sync + 'static,
1209    P: TryClone + Send + Sync + 'static,
1210    S: TryClone + Send + Sync + 'static,
1211{
1212    // subdag: vertexes to insert
1213    //
1214    // For example, when adding C---D to the graph A---B:
1215    //
1216    //      A---B
1217    //           \
1218    //            C---D
1219    //
1220    // The subdag is C---D (C does not have parent).
1221    //
1222    // Extra checks are needed because upon reload, the main graph
1223    // A---B might already contain part of the subdag to be added.
1224    let subdag = parents.hint_subdag_for_insertion(heads).await?;
1225
1226    let mut remaining = subdag.all().await?;
1227    let mut unassigned = NameSet::empty();
1228
1229    // For lazy graph, avoid some remote lookups by figuring out
1230    // some definitely unassigned (missing) vertexes. For example,
1231    //
1232    //      A---B---C
1233    //           \
1234    //            D---E
1235    //
1236    // When adding D---E (subdag, new vertex that might trigger remote
1237    // lookup) with parent B to the main graph (A--B--C),
1238    // 1. If B exists, and is not in the master group, then B and its
1239    //    descendants cannot be not lazy, and there is no need to lookup
1240    //    D remotely.
1241    // 2. If B exists, and is in the master group, and all its children
1242    //    except D (i.e. C) are known locally, and the vertex name of D
1243    //    does not match other children (C), we know that D cannot be
1244    //    in the lazy part of the main graph, and can skip the remote
1245    //    lookup.
1246    let mut unassigned_roots = Vec::new();
1247    if this.is_vertex_lazy() {
1248        let roots = subdag.roots(remaining.clone()).await?;
1249        let mut roots_iter = roots.iter().await?;
1250        while let Some(root) = roots_iter.next().await {
1251            let root = root?;
1252
1253            // Do a local "contains" check.
1254            if matches!(
1255                &this.contains_vertex_name_locally(&[root.clone()]).await?[..],
1256                [true]
1257            ) {
1258                tracing::debug!(target: "dag::definitelymissing", "root {:?} is already known", &root);
1259                continue;
1260            }
1261
1262            let root_parents_id_set = {
1263                let root_parents = parents.parent_names(root.clone()).await?;
1264                let root_parents_set = match this
1265                    .sort(&NameSet::from_static_names(root_parents))
1266                    .await
1267                {
1268                    Ok(set) => set,
1269                    Err(_) => {
1270                        tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear (parents cannot be resolved)", &root);
1271                        continue;
1272                    }
1273                };
1274                this.to_id_set(&root_parents_set).await?
1275            };
1276
1277            // If there are no parents of `root`, we cannot confidently test
1278            // whether `root` is missing or not.
1279            if root_parents_id_set.is_empty() {
1280                tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear (no parents)", &root);
1281                continue;
1282            }
1283
1284            // All parents of `root` are non-lazy.
1285            // So `root` is non-lazy and the local "contains" check is the same
1286            // as a remote "contains" check.
1287            if root_parents_id_set
1288                .iter_desc()
1289                .all(|i| i.group() == Group::NON_MASTER)
1290            {
1291                tracing::debug!(target: "dag::definitelymissing", "root {:?} is not assigned (non-lazy parent)", &root);
1292                unassigned_roots.push(root);
1293                continue;
1294            }
1295
1296            // All children of lazy parents of `root` are known locally.
1297            // So `root` cannot match an existing vertex in the lazy graph.
1298            let children_ids: Vec<Id> = this
1299                .dag
1300                .children(root_parents_id_set)?
1301                .iter_desc()
1302                .collect();
1303            if this
1304                .map
1305                .contains_vertex_id_locally(&children_ids)
1306                .await?
1307                .iter()
1308                .all(|b| *b)
1309            {
1310                tracing::debug!(target: "dag::definitelymissing", "root {:?} is not assigned (children of parents are known)", &root);
1311                unassigned_roots.push(root);
1312                continue;
1313            }
1314
1315            tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear", &root);
1316        }
1317
1318        if !unassigned_roots.is_empty() {
1319            unassigned = subdag
1320                .descendants(NameSet::from_static_names(unassigned_roots))
1321                .await?;
1322            remaining = remaining.difference(&unassigned);
1323        }
1324    }
1325
1326    // Figure out unassigned (missing) vertexes that do need to be inserted.
1327    // This is done via utils::filter_known.
1328    let filter_known = |sample: &[VertexName]| -> BoxFuture<Result<Vec<VertexName>>> {
1329        let sample = sample.to_vec();
1330        async {
1331            let known_bools: Vec<bool> = {
1332                let ids = this.vertex_id_batch(&sample).await?;
1333                ids.into_iter().map(|i| i.is_ok()).collect()
1334            };
1335            debug_assert_eq!(sample.len(), known_bools.len());
1336            let known = sample
1337                .into_iter()
1338                .zip(known_bools)
1339                .filter_map(|(v, b)| if b { Some(v) } else { None })
1340                .collect();
1341            Ok(known)
1342        }
1343        .boxed()
1344    };
1345    let assigned = utils::filter_known(remaining.clone(), &filter_known).await?;
1346    unassigned = unassigned.union(&remaining.difference(&assigned));
1347    tracing::debug!(target: "dag::definitelymissing", "unassigned (missing): {:?}", &unassigned);
1348
1349    let unassigned = unassigned.iter().await?.try_collect().await?;
1350    Ok(unassigned)
1351}
1352
1353// The "client" Dag. Using a remote protocol to fill lazy part of the vertexes.
1354impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
1355where
1356    IS: IdDagStore,
1357    IdDag<IS>: TryClone,
1358    M: IdConvert + TryClone + Send + Sync,
1359    P: TryClone + Send + Sync,
1360    S: TryClone + Send + Sync,
1361{
1362    /// Resolve vertexes remotely and cache the result in the overlay map.
1363    /// Return the resolved ids in the given order. Not all names are resolved.
1364    async fn resolve_vertexes_remotely(&self, names: &[VertexName]) -> Result<Vec<Option<Id>>> {
1365        if names.is_empty() {
1366            return Ok(Vec::new());
1367        }
1368        if is_remote_protocol_disabled() {
1369            return Err(io::Error::new(
1370                io::ErrorKind::WouldBlock,
1371                "resolving vertexes remotely disabled",
1372            )
1373            .into());
1374        }
1375        if names.len() < 30 {
1376            tracing::debug!(target: "dag::protocol", "resolve names {:?} remotely", &names);
1377        } else {
1378            tracing::debug!(target: "dag::protocol", "resolve names ({}) remotely", names.len());
1379        }
1380        crate::failpoint!("dag-resolve-vertexes-remotely");
1381        let request: protocol::RequestNameToLocation =
1382            (self.map(), self.dag()).process(names.to_vec()).await?;
1383        let path_names = self
1384            .remote_protocol
1385            .resolve_names_to_relative_paths(request.heads, request.names)
1386            .await?;
1387        self.insert_relative_paths(path_names).await?;
1388        let overlay = self.overlay_map.read().unwrap();
1389        let mut ids = Vec::with_capacity(names.len());
1390        let mut missing = self.missing_vertexes_confirmed_by_remote.write().unwrap();
1391        for name in names {
1392            if let Some(id) = overlay.lookup_vertex_id(name) {
1393                ids.push(Some(id));
1394            } else {
1395                tracing::trace!(target: "dag::cache", "cached missing {:?} (server confirmed)", &name);
1396                missing.insert(name.clone());
1397                ids.push(None);
1398            }
1399        }
1400        Ok(ids)
1401    }
1402
1403    /// Resolve ids remotely and cache the result in the overlay map.
1404    /// Return the resolved ids in the given order. All ids must be resolved.
1405    async fn resolve_ids_remotely(&self, ids: &[Id]) -> Result<Vec<VertexName>> {
1406        if ids.is_empty() {
1407            return Ok(Vec::new());
1408        }
1409        if is_remote_protocol_disabled() {
1410            return Err(io::Error::new(
1411                io::ErrorKind::WouldBlock,
1412                "resolving ids remotely disabled",
1413            )
1414            .into());
1415        }
1416        if ids.len() < 30 {
1417            tracing::debug!(target: "dag::protocol", "resolve ids {:?} remotely", &ids);
1418        } else {
1419            tracing::debug!(target: "dag::protocol", "resolve ids ({}) remotely", ids.len());
1420        }
1421        crate::failpoint!("dag-resolve-ids-remotely");
1422        let request: protocol::RequestLocationToName = (self.map(), self.dag())
1423            .process(IdSet::from_spans(ids.iter().copied()))
1424            .await?;
1425        let path_names = self
1426            .remote_protocol
1427            .resolve_relative_paths_to_names(request.paths)
1428            .await?;
1429        self.insert_relative_paths(path_names).await?;
1430        let overlay = self.overlay_map.read().unwrap();
1431        let mut names = Vec::with_capacity(ids.len());
1432        for &id in ids {
1433            if let Some(name) = overlay.lookup_vertex_name(id) {
1434                names.push(name);
1435            } else {
1436                return id.not_found();
1437            }
1438        }
1439        Ok(names)
1440    }
1441
1442    /// Insert `x~n` relative paths to the overlay IdMap.
1443    async fn insert_relative_paths(
1444        &self,
1445        path_names: Vec<(AncestorPath, Vec<VertexName>)>,
1446    ) -> Result<()> {
1447        if path_names.is_empty() {
1448            return Ok(());
1449        }
1450        let to_insert: Vec<(Id, VertexName)> = calculate_id_name_from_paths(
1451            self.map(),
1452            self.dag().deref(),
1453            &self.overlay_map_id_set,
1454            &path_names,
1455        )
1456        .await?;
1457
1458        let mut paths = self.overlay_map_paths.lock().unwrap();
1459        paths.extend(path_names);
1460        drop(paths);
1461
1462        let mut overlay = self.overlay_map.write().unwrap();
1463        for (id, name) in to_insert {
1464            tracing::trace!(target: "dag::cache", "cached mapping {:?} <=> {:?}", id, &name);
1465            overlay.insert_vertex_id_name(id, name);
1466        }
1467
1468        Ok(())
1469    }
1470}
1471
1472/// Calculate (id, name) pairs to insert from (path, [name]) pairs.
1473async fn calculate_id_name_from_paths(
1474    map: &dyn IdConvert,
1475    dag: &dyn IdDagAlgorithm,
1476    overlay_map_id_set: &IdSet,
1477    path_names: &[(AncestorPath, Vec<VertexName>)],
1478) -> Result<Vec<(Id, VertexName)>> {
1479    if path_names.is_empty() {
1480        return Ok(Vec::new());
1481    }
1482    let mut to_insert: Vec<(Id, VertexName)> =
1483        Vec::with_capacity(path_names.iter().map(|(_, ns)| ns.len()).sum());
1484    for (path, names) in path_names {
1485        if names.is_empty() {
1486            continue;
1487        }
1488        // Resolve x~n to id. x is "universally known" so it should exist locally.
1489        let x_id = map.vertex_id(path.x.clone()).await.map_err(|e| {
1490            let msg = format!(
1491                concat!(
1492                    "Cannot resolve x ({:?}) in x~n locally. The x is expected to be known ",
1493                    "locally and is populated at clone time. This x~n is used to convert ",
1494                    "{:?} to a location in the graph. (Check initial clone logic) ",
1495                    "(Error: {})",
1496                ),
1497                &path.x, &names[0], e
1498            );
1499            crate::Error::Programming(msg)
1500        })?;
1501        tracing::trace!(
1502            "resolve path {:?} names {:?} (x = {}) to overlay",
1503            &path,
1504            &names,
1505            x_id
1506        );
1507        if !overlay_map_id_set.contains(x_id) {
1508            crate::failpoint!("dag-error-x-n-overflow");
1509            let msg = format!(
1510                concat!(
1511                    "Server returned x~n (x = {:?} {}, n = {}). But x is out of range ",
1512                    "({:?}). This is not expected and indicates some ",
1513                    "logic error on the server side."
1514                ),
1515                &path.x, x_id, path.n, overlay_map_id_set
1516            );
1517            return programming(msg);
1518        }
1519        let mut id = match dag.first_ancestor_nth(x_id, path.n).map_err(|e| {
1520            let msg = format!(
1521                concat!(
1522                    "Cannot resolve x~n (x = {:?} {}, n = {}): {}. ",
1523                    "This indicates the client-side graph is somewhat incompatible from the ",
1524                    "server-side graph. Something (server-side or client-side) was probably ",
1525                    "seriously wrong before this error."
1526                ),
1527                &path.x, x_id, path.n, e
1528            );
1529            crate::Error::Programming(msg)
1530        }) {
1531            Err(e) => {
1532                crate::failpoint!("dag-error-x-n-unresolvable");
1533                return Err(e);
1534            }
1535            Ok(id) => id,
1536        };
1537        if names.len() < 30 {
1538            tracing::debug!("resolved {:?} => {} {:?}", &path, id, &names);
1539        } else {
1540            tracing::debug!("resolved {:?} => {} {:?} ...", &path, id, &names[0]);
1541        }
1542        for (i, name) in names.into_iter().enumerate() {
1543            if i > 0 {
1544                // Follow id's first parent.
1545                id = match dag.parent_ids(id)?.first().cloned() {
1546                    Some(id) => id,
1547                    None => {
1548                        let msg = format!(
1549                            concat!(
1550                                "Cannot resolve x~(n+i) (x = {:?} {}, n = {}, i = {}) locally. ",
1551                                "This indicates the client-side graph is somewhat incompatible ",
1552                                "from the server-side graph. Something (server-side or ",
1553                                "client-side) was probably seriously wrong before this error."
1554                            ),
1555                            &path.x, x_id, path.n, i
1556                        );
1557                        return programming(msg);
1558                    }
1559                }
1560            }
1561
1562            tracing::trace!(" resolved {:?} = {:?}", id, &name,);
1563            to_insert.push((id, name.clone()));
1564        }
1565    }
1566    Ok(to_insert)
1567}
1568
1569// The server Dag. IdMap is complete. Provide APIs for client Dag to resolve vertexes.
1570// Currently mainly used for testing purpose.
1571#[async_trait::async_trait]
1572impl<IS, M, P, S> RemoteIdConvertProtocol for AbstractNameDag<IdDag<IS>, M, P, S>
1573where
1574    IS: IdDagStore,
1575    IdDag<IS>: TryClone,
1576    M: IdConvert + TryClone + Send + Sync + 'static,
1577    P: TryClone + Send + Sync + 'static,
1578    S: TryClone + Send + Sync + 'static,
1579{
1580    async fn resolve_names_to_relative_paths(
1581        &self,
1582        heads: Vec<VertexName>,
1583        names: Vec<VertexName>,
1584    ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
1585        let request = protocol::RequestNameToLocation { names, heads };
1586        let response: protocol::ResponseIdNamePair =
1587            (self.map(), self.dag()).process(request).await?;
1588        Ok(response.path_names)
1589    }
1590
1591    async fn resolve_relative_paths_to_names(
1592        &self,
1593        paths: Vec<AncestorPath>,
1594    ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
1595        let request = protocol::RequestLocationToName { paths };
1596        let response: protocol::ResponseIdNamePair =
1597            (self.map(), self.dag()).process(request).await?;
1598        Ok(response.path_names)
1599    }
1600}
1601
1602// On "snapshot".
1603#[async_trait::async_trait]
1604impl<IS, M, P, S> RemoteIdConvertProtocol for Arc<AbstractNameDag<IdDag<IS>, M, P, S>>
1605where
1606    IS: IdDagStore,
1607    IdDag<IS>: TryClone,
1608    M: IdConvert + TryClone + Send + Sync + 'static,
1609    P: TryClone + Send + Sync + 'static,
1610    S: TryClone + Send + Sync + 'static,
1611{
1612    async fn resolve_names_to_relative_paths(
1613        &self,
1614        heads: Vec<VertexName>,
1615        names: Vec<VertexName>,
1616    ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
1617        self.deref()
1618            .resolve_names_to_relative_paths(heads, names)
1619            .await
1620    }
1621
1622    async fn resolve_relative_paths_to_names(
1623        &self,
1624        paths: Vec<AncestorPath>,
1625    ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
1626        self.deref().resolve_relative_paths_to_names(paths).await
1627    }
1628}
1629
1630// Dag operations. Those are just simple wrappers around [`IdDag`].
1631// See [`IdDag`] for the actual implementations of these algorithms.
1632
1633/// DAG related read-only algorithms.
1634#[async_trait::async_trait]
1635impl<IS, M, P, S> DagAlgorithm for AbstractNameDag<IdDag<IS>, M, P, S>
1636where
1637    IS: IdDagStore,
1638    IdDag<IS>: TryClone + 'static,
1639    M: TryClone + IdConvert + Sync + Send + 'static,
1640    P: TryClone + Sync + Send + 'static,
1641    S: TryClone + Sync + Send + 'static,
1642{
1643    /// Sort a `NameSet` topologically.
1644    async fn sort(&self, set: &NameSet) -> Result<NameSet> {
1645        let hints = set.hints();
1646        if hints.contains(Flags::TOPO_DESC)
1647            && matches!(hints.dag_version(), Some(v) if v <= self.dag_version())
1648            && matches!(hints.id_map_version(), Some(v) if v <= self.map_version())
1649        {
1650            Ok(set.clone())
1651        } else {
1652            let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1653            let mut spans = IdSet::empty();
1654            let mut iter = set.iter().await?.chunks(1 << 17);
1655            while let Some(names) = iter.next().await {
1656                let names = names.into_iter().collect::<Result<Vec<_>>>()?;
1657                let ids = self.vertex_id_batch(&names).await?;
1658                for id in ids {
1659                    spans.push(id?);
1660                }
1661            }
1662            let result = NameSet::from_spans_dag(spans, self)?;
1663            result.hints().add_flags(flags);
1664            Ok(result)
1665        }
1666    }
1667
1668    /// Get ordered parent vertexes.
1669    async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
1670        let id = self.vertex_id(name).await?;
1671        let parent_ids = self.dag().parent_ids(id)?;
1672        let mut result = Vec::with_capacity(parent_ids.len());
1673        for id in parent_ids {
1674            result.push(self.vertex_name(id).await?);
1675        }
1676        Ok(result)
1677    }
1678
1679    /// Returns a set that covers all vertexes tracked by this DAG.
1680    async fn all(&self) -> Result<NameSet> {
1681        let spans = self.dag().all()?;
1682        let result = NameSet::from_spans_dag(spans, self)?;
1683        result.hints().add_flags(Flags::FULL);
1684        Ok(result)
1685    }
1686
1687    /// Returns a set that covers all vertexes in the master group.
1688    async fn master_group(&self) -> Result<NameSet> {
1689        let spans = self.dag().master_group()?;
1690        let result = NameSet::from_spans_dag(spans, self)?;
1691        result.hints().add_flags(Flags::ANCESTORS);
1692        Ok(result)
1693    }
1694
1695    /// Calculates all ancestors reachable from any name from the given set.
1696    async fn ancestors(&self, set: NameSet) -> Result<NameSet> {
1697        if set.hints().contains(Flags::ANCESTORS)
1698            && set.hints().dag_version() <= Some(self.dag_version())
1699        {
1700            return Ok(set);
1701        }
1702        let spans = self.to_id_set(&set).await?;
1703        let spans = self.dag().ancestors(spans)?;
1704        let result = NameSet::from_spans_dag(spans, self)?;
1705        result.hints().add_flags(Flags::ANCESTORS);
1706        Ok(result)
1707    }
1708
1709    /// Like `ancestors` but follows only the first parents.
1710    async fn first_ancestors(&self, set: NameSet) -> Result<NameSet> {
1711        // If set == ancestors(set), then first_ancestors(set) == set.
1712        if set.hints().contains(Flags::ANCESTORS)
1713            && set.hints().dag_version() <= Some(self.dag_version())
1714        {
1715            return Ok(set);
1716        }
1717        let spans = self.to_id_set(&set).await?;
1718        let spans = self.dag().first_ancestors(spans)?;
1719        let result = NameSet::from_spans_dag(spans, self)?;
1720        #[cfg(test)]
1721        {
1722            result.assert_eq(crate::default_impl::first_ancestors(self, set).await?);
1723        }
1724        Ok(result)
1725    }
1726
1727    /// Calculate merges within the given set.
1728    async fn merges(&self, set: NameSet) -> Result<NameSet> {
1729        let spans = self.to_id_set(&set).await?;
1730        let spans = self.dag().merges(spans)?;
1731        let result = NameSet::from_spans_dag(spans, self)?;
1732        #[cfg(test)]
1733        {
1734            result.assert_eq(crate::default_impl::merges(self, set).await?);
1735        }
1736        Ok(result)
1737    }
1738
1739    /// Calculates parents of the given set.
1740    ///
1741    /// Note: Parent order is not preserved. Use [`NameDag::parent_names`]
1742    /// to preserve order.
1743    async fn parents(&self, set: NameSet) -> Result<NameSet> {
1744        // Preserve ANCESTORS flag. If ancestors(x) == x, then ancestors(parents(x)) == parents(x).
1745        let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1746        let spans = self.dag().parents(self.to_id_set(&set).await?)?;
1747        let result = NameSet::from_spans_dag(spans, self)?;
1748        result.hints().add_flags(flags);
1749        #[cfg(test)]
1750        {
1751            result.assert_eq(crate::default_impl::parents(self, set).await?);
1752        }
1753        Ok(result)
1754    }
1755
1756    /// Calculates the n-th first ancestor.
1757    async fn first_ancestor_nth(&self, name: VertexName, n: u64) -> Result<Option<VertexName>> {
1758        #[cfg(test)]
1759        let name2 = name.clone();
1760        let id = self.vertex_id(name).await?;
1761        let id = self.dag().try_first_ancestor_nth(id, n)?;
1762        let result = match id {
1763            None => None,
1764            Some(id) => Some(self.vertex_name(id).await?),
1765        };
1766        #[cfg(test)]
1767        {
1768            let result2 = crate::default_impl::first_ancestor_nth(self, name2, n).await?;
1769            assert_eq!(result, result2);
1770        }
1771        Ok(result)
1772    }
1773
1774    /// Calculates heads of the given set.
1775    async fn heads(&self, set: NameSet) -> Result<NameSet> {
1776        if set.hints().contains(Flags::ANCESTORS)
1777            && set.hints().dag_version() <= Some(self.dag_version())
1778        {
1779            // heads_ancestors is faster.
1780            return self.heads_ancestors(set).await;
1781        }
1782        let spans = self.dag().heads(self.to_id_set(&set).await?)?;
1783        let result = NameSet::from_spans_dag(spans, self)?;
1784        #[cfg(test)]
1785        {
1786            result.assert_eq(crate::default_impl::heads(self, set).await?);
1787        }
1788        Ok(result)
1789    }
1790
1791    /// Calculates children of the given set.
1792    async fn children(&self, set: NameSet) -> Result<NameSet> {
1793        let spans = self.dag().children(self.to_id_set(&set).await?)?;
1794        let result = NameSet::from_spans_dag(spans, self)?;
1795        Ok(result)
1796    }
1797
1798    /// Calculates roots of the given set.
1799    async fn roots(&self, set: NameSet) -> Result<NameSet> {
1800        let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1801        let spans = self.dag().roots(self.to_id_set(&set).await?)?;
1802        let result = NameSet::from_spans_dag(spans, self)?;
1803        result.hints().add_flags(flags);
1804        #[cfg(test)]
1805        {
1806            result.assert_eq(crate::default_impl::roots(self, set).await?);
1807        }
1808        Ok(result)
1809    }
1810
1811    /// Calculates one "greatest common ancestor" of the given set.
1812    ///
1813    /// If there are no common ancestors, return None.
1814    /// If there are multiple greatest common ancestors, pick one arbitrarily.
1815    /// Use `gca_all` to get all of them.
1816    async fn gca_one(&self, set: NameSet) -> Result<Option<VertexName>> {
1817        let result: Option<VertexName> = match self.dag().gca_one(self.to_id_set(&set).await?)? {
1818            None => None,
1819            Some(id) => Some(self.vertex_name(id).await?),
1820        };
1821        #[cfg(test)]
1822        {
1823            assert_eq!(&result, &crate::default_impl::gca_one(self, set).await?);
1824        }
1825        Ok(result)
1826    }
1827
1828    /// Calculates all "greatest common ancestor"s of the given set.
1829    /// `gca_one` is faster if an arbitrary answer is ok.
1830    async fn gca_all(&self, set: NameSet) -> Result<NameSet> {
1831        let spans = self.dag().gca_all(self.to_id_set(&set).await?)?;
1832        let result = NameSet::from_spans_dag(spans, self)?;
1833        #[cfg(test)]
1834        {
1835            result.assert_eq(crate::default_impl::gca_all(self, set).await?);
1836        }
1837        Ok(result)
1838    }
1839
1840    /// Calculates all common ancestors of the given set.
1841    async fn common_ancestors(&self, set: NameSet) -> Result<NameSet> {
1842        let spans = self.dag().common_ancestors(self.to_id_set(&set).await?)?;
1843        let result = NameSet::from_spans_dag(spans, self)?;
1844        result.hints().add_flags(Flags::ANCESTORS);
1845        #[cfg(test)]
1846        {
1847            result.assert_eq(crate::default_impl::common_ancestors(self, set).await?);
1848        }
1849        Ok(result)
1850    }
1851
1852    /// Tests if `ancestor` is an ancestor of `descendant`.
1853    async fn is_ancestor(&self, ancestor: VertexName, descendant: VertexName) -> Result<bool> {
1854        #[cfg(test)]
1855        let result2 =
1856            crate::default_impl::is_ancestor(self, ancestor.clone(), descendant.clone()).await?;
1857        let ancestor_id = self.vertex_id(ancestor).await?;
1858        let descendant_id = self.vertex_id(descendant).await?;
1859        let result = self.dag().is_ancestor(ancestor_id, descendant_id)?;
1860        #[cfg(test)]
1861        {
1862            assert_eq!(&result, &result2);
1863        }
1864        Ok(result)
1865    }
1866
1867    /// Calculates "heads" of the ancestors of the given set. That is,
1868    /// Find Y, which is the smallest subset of set X, where `ancestors(Y)` is
1869    /// `ancestors(X)`.
1870    ///
1871    /// This is faster than calculating `heads(ancestors(set))`.
1872    ///
1873    /// This is different from `heads`. In case set contains X and Y, and Y is
1874    /// an ancestor of X, but not the immediate ancestor, `heads` will include
1875    /// Y while this function won't.
1876    async fn heads_ancestors(&self, set: NameSet) -> Result<NameSet> {
1877        let spans = self.dag().heads_ancestors(self.to_id_set(&set).await?)?;
1878        let result = NameSet::from_spans_dag(spans, self)?;
1879        #[cfg(test)]
1880        {
1881            // default_impl::heads_ancestors calls `heads` if `Flags::ANCESTORS`
1882            // is set. Prevent infinite loop.
1883            if !set.hints().contains(Flags::ANCESTORS) {
1884                result.assert_eq(crate::default_impl::heads_ancestors(self, set).await?);
1885            }
1886        }
1887        Ok(result)
1888    }
1889
1890    /// Calculates the "dag range" - vertexes reachable from both sides.
1891    async fn range(&self, roots: NameSet, heads: NameSet) -> Result<NameSet> {
1892        let roots = self.to_id_set(&roots).await?;
1893        let heads = self.to_id_set(&heads).await?;
1894        let spans = self.dag().range(roots, heads)?;
1895        let result = NameSet::from_spans_dag(spans, self)?;
1896        Ok(result)
1897    }
1898
1899    /// Calculates the descendants of the given set.
1900    async fn descendants(&self, set: NameSet) -> Result<NameSet> {
1901        let spans = self.dag().descendants(self.to_id_set(&set).await?)?;
1902        let result = NameSet::from_spans_dag(spans, self)?;
1903        Ok(result)
1904    }
1905
1906    /// Vertexes buffered in memory, not yet written to disk.
1907    async fn dirty(&self) -> Result<NameSet> {
1908        let all = self.dag().all()?;
1909        let spans = all.difference(&self.persisted_id_set);
1910        let set = NameSet::from_spans_dag(spans, self)?;
1911        Ok(set)
1912    }
1913
1914    fn is_vertex_lazy(&self) -> bool {
1915        !self.remote_protocol.is_local()
1916    }
1917
1918    /// Get a snapshot of the current graph.
1919    fn dag_snapshot(&self) -> Result<Arc<dyn DagAlgorithm + Send + Sync>> {
1920        Ok(self.try_snapshot()? as Arc<dyn DagAlgorithm + Send + Sync>)
1921    }
1922
1923    fn id_dag_snapshot(&self) -> Result<Arc<dyn IdDagAlgorithm + Send + Sync>> {
1924        let store = self.dag.try_clone()?.store;
1925        Ok(Arc::new(store))
1926    }
1927
1928    fn dag_id(&self) -> &str {
1929        &self.id
1930    }
1931
1932    fn dag_version(&self) -> &VerLink {
1933        &self.dag.version()
1934    }
1935}
1936
1937/// Extract the ANCESTORS flag if the set with the `hints` is bound to a
1938/// compatible DAG.
1939fn extract_ancestor_flag_if_compatible(hints: &Hints, dag_version: &VerLink) -> Flags {
1940    if hints.dag_version() <= Some(dag_version) {
1941        hints.flags() & Flags::ANCESTORS
1942    } else {
1943        Flags::empty()
1944    }
1945}
1946
1947#[async_trait::async_trait]
1948impl<I, M, P, S> PrefixLookup for AbstractNameDag<I, M, P, S>
1949where
1950    I: Send + Sync,
1951    M: PrefixLookup + Send + Sync,
1952    P: Send + Sync,
1953    S: Send + Sync,
1954{
1955    async fn vertexes_by_hex_prefix(
1956        &self,
1957        hex_prefix: &[u8],
1958        limit: usize,
1959    ) -> Result<Vec<VertexName>> {
1960        let mut list = self.map.vertexes_by_hex_prefix(hex_prefix, limit).await?;
1961        let overlay_list = self
1962            .overlay_map
1963            .read()
1964            .unwrap()
1965            .lookup_vertexes_by_hex_prefix(hex_prefix, limit)?;
1966        list.extend(overlay_list);
1967        list.sort_unstable();
1968        list.dedup();
1969        list.truncate(limit);
1970        Ok(list)
1971    }
1972}
1973
1974#[async_trait::async_trait]
1975impl<IS, M, P, S> IdConvert for AbstractNameDag<IdDag<IS>, M, P, S>
1976where
1977    IS: IdDagStore,
1978    IdDag<IS>: TryClone,
1979    M: IdConvert + TryClone + Send + Sync + 'static,
1980    P: TryClone + Send + Sync + 'static,
1981    S: TryClone + Send + Sync + 'static,
1982{
1983    async fn vertex_id(&self, name: VertexName) -> Result<Id> {
1984        match self.map.vertex_id(name.clone()).await {
1985            Ok(id) => Ok(id),
1986            Err(crate::Error::VertexNotFound(_)) if self.is_vertex_lazy() => {
1987                if let Some(id) = self.overlay_map.read().unwrap().lookup_vertex_id(&name) {
1988                    return Ok(id);
1989                }
1990                if self
1991                    .missing_vertexes_confirmed_by_remote
1992                    .read()
1993                    .unwrap()
1994                    .contains(&name)
1995                {
1996                    return name.not_found();
1997                }
1998                let ids = self.resolve_vertexes_remotely(&[name.clone()]).await?;
1999                if let Some(Some(id)) = ids.first() {
2000                    Ok(*id)
2001                } else {
2002                    // ids is empty.
2003                    name.not_found()
2004                }
2005            }
2006            Err(e) => Err(e),
2007        }
2008    }
2009
2010    async fn vertex_id_with_max_group(
2011        &self,
2012        name: &VertexName,
2013        max_group: Group,
2014    ) -> Result<Option<Id>> {
2015        match self.map.vertex_id_with_max_group(name, max_group).await {
2016            Ok(Some(id)) => Ok(Some(id)),
2017            Err(err) => Err(err),
2018            Ok(None) if self.is_vertex_lazy() => {
2019                if let Some(id) = self.overlay_map.read().unwrap().lookup_vertex_id(&name) {
2020                    return Ok(Some(id));
2021                }
2022                if self
2023                    .missing_vertexes_confirmed_by_remote
2024                    .read()
2025                    .unwrap()
2026                    .contains(&name)
2027                {
2028                    return Ok(None);
2029                }
2030                if max_group == Group::MASTER
2031                    && self
2032                        .map
2033                        .vertex_id_with_max_group(name, Group::NON_MASTER)
2034                        .await?
2035                        .is_some()
2036                {
2037                    // If the vertex exists in the non-master group. Then it must be missing in the
2038                    // master group.
2039                    return Ok(None);
2040                }
2041                match self.resolve_vertexes_remotely(&[name.clone()]).await {
2042                    Ok(ids) => match ids.first() {
2043                        Some(Some(id)) => Ok(Some(*id)),
2044                        Some(None) | None => Ok(None),
2045                    },
2046                    Err(e) => Err(e),
2047                }
2048            }
2049            Ok(None) => Ok(None),
2050        }
2051    }
2052
2053    async fn vertex_name(&self, id: Id) -> Result<VertexName> {
2054        match self.map.vertex_name(id).await {
2055            Ok(name) => Ok(name),
2056            Err(crate::Error::IdNotFound(_)) if self.is_vertex_lazy() => {
2057                if let Some(name) = self.overlay_map.read().unwrap().lookup_vertex_name(id) {
2058                    return Ok(name);
2059                }
2060                // Only ids <= max(MASTER group) can be lazy.
2061                let max_master_id = self.dag.master_group()?.max();
2062                if Some(id) > max_master_id {
2063                    return id.not_found();
2064                }
2065                let names = self.resolve_ids_remotely(&[id]).await?;
2066                if let Some(name) = names.into_iter().next() {
2067                    Ok(name)
2068                } else {
2069                    id.not_found()
2070                }
2071            }
2072            Err(e) => Err(e),
2073        }
2074    }
2075
2076    async fn contains_vertex_name(&self, name: &VertexName) -> Result<bool> {
2077        match self.map.contains_vertex_name(name).await {
2078            Ok(true) => Ok(true),
2079            Ok(false) if self.is_vertex_lazy() => {
2080                if self
2081                    .overlay_map
2082                    .read()
2083                    .unwrap()
2084                    .lookup_vertex_id(name)
2085                    .is_some()
2086                {
2087                    return Ok(true);
2088                }
2089                if self
2090                    .missing_vertexes_confirmed_by_remote
2091                    .read()
2092                    .unwrap()
2093                    .contains(&name)
2094                {
2095                    return Ok(false);
2096                }
2097                match self.resolve_vertexes_remotely(&[name.clone()]).await {
2098                    Ok(ids) => match ids.first() {
2099                        Some(Some(_)) => Ok(true),
2100                        Some(None) | None => Ok(false),
2101                    },
2102                    Err(e) => Err(e),
2103                }
2104            }
2105            Ok(false) => Ok(false),
2106            Err(e) => Err(e),
2107        }
2108    }
2109
2110    async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
2111        let mut list = self.map.contains_vertex_id_locally(ids).await?;
2112        let map = self.overlay_map.read().unwrap();
2113        for (b, id) in list.iter_mut().zip(ids.iter().copied()) {
2114            if !*b {
2115                *b = *b || map.has_vertex_id(id);
2116            }
2117        }
2118        Ok(list)
2119    }
2120
2121    async fn contains_vertex_name_locally(&self, names: &[VertexName]) -> Result<Vec<bool>> {
2122        tracing::trace!("contains_vertex_name_locally names: {:?}", &names);
2123        let mut list = self.map.contains_vertex_name_locally(names).await?;
2124        tracing::trace!("contains_vertex_name_locally list (local): {:?}", &list);
2125        assert_eq!(list.len(), names.len());
2126        let map = self.overlay_map.read().unwrap();
2127        for (b, name) in list.iter_mut().zip(names.iter()) {
2128            if !*b && map.has_vertex_name(name) {
2129                tracing::trace!("contains_vertex_name_locally overlay has {:?}", &name);
2130                *b = true;
2131            }
2132        }
2133        Ok(list)
2134    }
2135
2136    async fn vertex_name_batch(&self, ids: &[Id]) -> Result<Vec<Result<VertexName>>> {
2137        let mut list = self.map.vertex_name_batch(ids).await?;
2138        if self.is_vertex_lazy() {
2139            // Read from overlay map cache.
2140            {
2141                let map = self.overlay_map.read().unwrap();
2142                for (r, id) in list.iter_mut().zip(ids) {
2143                    if let Some(name) = map.lookup_vertex_name(*id) {
2144                        *r = Ok(name);
2145                    }
2146                }
2147            }
2148            // Read from missing_vertexes_confirmed_by_remote cache.
2149            let missing_indexes: Vec<usize> = {
2150                let max_master_id = self.dag.master_group()?.max();
2151                list.iter()
2152                    .enumerate()
2153                    .filter_map(|(i, r)| match r {
2154                        // Only resolve ids that are <= max(master) remotely.
2155                        Err(_) if Some(ids[i]) <= max_master_id => Some(i),
2156                        Err(_) | Ok(_) => None,
2157                    })
2158                    .collect()
2159            };
2160            let missing_ids: Vec<Id> = missing_indexes.iter().map(|i| ids[*i]).collect();
2161            let resolved = self.resolve_ids_remotely(&missing_ids).await?;
2162            for (i, name) in missing_indexes.into_iter().zip(resolved.into_iter()) {
2163                list[i] = Ok(name);
2164            }
2165        }
2166        Ok(list)
2167    }
2168
2169    async fn vertex_id_batch(&self, names: &[VertexName]) -> Result<Vec<Result<Id>>> {
2170        let mut list = self.map.vertex_id_batch(names).await?;
2171        if self.is_vertex_lazy() {
2172            // Read from overlay map cache.
2173            {
2174                let map = self.overlay_map.read().unwrap();
2175                for (r, name) in list.iter_mut().zip(names) {
2176                    if let Some(id) = map.lookup_vertex_id(name) {
2177                        *r = Ok(id);
2178                    }
2179                }
2180            }
2181            // Read from missing_vertexes_confirmed_by_remote cache.
2182            let missing_indexes: Vec<usize> = {
2183                let known_missing = self.missing_vertexes_confirmed_by_remote.read().unwrap();
2184                list.iter()
2185                    .enumerate()
2186                    .filter_map(|(i, r)| {
2187                        if r.is_err() && !known_missing.contains(&names[i]) {
2188                            Some(i)
2189                        } else {
2190                            None
2191                        }
2192                    })
2193                    .collect()
2194            };
2195            if !missing_indexes.is_empty() {
2196                let missing_names: Vec<VertexName> =
2197                    missing_indexes.iter().map(|i| names[*i].clone()).collect();
2198                let resolved = self.resolve_vertexes_remotely(&missing_names).await?;
2199                for (i, id) in missing_indexes.into_iter().zip(resolved.into_iter()) {
2200                    if let Some(id) = id {
2201                        list[i] = Ok(id);
2202                    }
2203                }
2204            }
2205        }
2206        Ok(list)
2207    }
2208
2209    fn map_id(&self) -> &str {
2210        self.map.map_id()
2211    }
2212
2213    fn map_version(&self) -> &VerLink {
2214        self.map.map_version()
2215    }
2216}
2217
2218impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
2219where
2220    IS: IdDagStore,
2221    IdDag<IS>: TryClone + 'static,
2222    M: TryClone + Persist + IdMapWrite + IdConvert + Sync + Send + 'static,
2223    P: TryClone + Sync + Send + 'static,
2224    S: TryClone + Sync + Send + 'static,
2225{
2226    /// Build IdMap and Segments for the given heads.
2227    /// Update IdMap and IdDag to include the given heads and their ancestors.
2228    ///
2229    /// Handle "reassign" cases. For example, when adding P to the master group
2230    /// and one of its parent N2 is in the non-master group:
2231    ///
2232    /// ```plain,ignore
2233    ///     1--2--3             3---P
2234    ///         \                  /
2235    ///          N1-N2-N3        N2
2236    /// ```
2237    ///
2238    /// To maintain topological order, N2 need to be re-assigned to the master
2239    /// group. This is done by temporarily removing N1-N2-N3, re-insert N1-N2
2240    /// as 4-5 to be able to insert P, then re-insert N3 in the non-master
2241    /// group:
2242    ///
2243    /// ```plain,ignore
2244    ///     1--2--3 --6 (P)
2245    ///         \    /
2246    ///          4--5 --N1
2247    /// ```
2248    async fn build_with_lock(
2249        &mut self,
2250        parents: &dyn Parents,
2251        heads: &VertexListWithOptions,
2252        map_lock: &M::Lock,
2253    ) -> Result<()> {
2254        // `std::borrow::Cow` without `Clone` constraint.
2255        enum Input<'a> {
2256            Borrowed(&'a dyn Parents, &'a VertexListWithOptions),
2257            Owned(Box<dyn Parents>, VertexListWithOptions),
2258        }
2259
2260        // Manual recursion. async fn does not support recursion.
2261        let mut stack = vec![Input::Borrowed(parents, heads)];
2262
2263        // Avoid infinite loop (buggy logic).
2264        let mut loop_count = 0;
2265
2266        while let Some(input) = stack.pop() {
2267            loop_count += 1;
2268            if loop_count > 2 {
2269                return bug("should not loop > 2 times (1st insertion+strip, 2nd reinsert)");
2270            }
2271
2272            let (parents, heads) = match &input {
2273                Input::Borrowed(p, h) => (*p, *h),
2274                Input::Owned(p, h) => (p.as_ref(), h),
2275            };
2276
2277            // Populate vertex negative cache to reduce round-trips doing remote lookups.
2278            if self.is_vertex_lazy() {
2279                let heads: Vec<VertexName> = heads.vertexes();
2280                self.populate_missing_vertexes_for_add_heads(parents, &heads)
2281                    .await?;
2282            }
2283
2284            // Backup, then remove vertexes that need to be reassigned. Actual reassignment
2285            // happens in the next loop iteration.
2286            let to_reassign: NameSet = self.find_vertexes_to_reassign(parents, heads).await?;
2287            if !to_reassign.is_empty().await? {
2288                let reinsert_heads: VertexListWithOptions = {
2289                    let heads = self
2290                        .heads(
2291                            self.descendants(to_reassign.clone())
2292                                .await?
2293                                .difference(&to_reassign),
2294                        )
2295                        .await?;
2296                    tracing::debug!(target: "dag::reassign", "need to rebuild heads: {:?}", &heads);
2297                    let heads: Vec<VertexName> = heads.iter().await?.try_collect().await?;
2298                    VertexListWithOptions::from(heads)
2299                };
2300                let reinsert_parents: Box<dyn Parents> = Box::new(self.dag_snapshot()?);
2301                self.strip_with_lock(&to_reassign, map_lock).await?;
2302
2303                // Rebuild non-master ids and segments on the next iteration.
2304                stack.push(Input::Owned(reinsert_parents, reinsert_heads));
2305            };
2306
2307            // Update IdMap.
2308            let mut outcome = PreparedFlatSegments::default();
2309            let mut covered = self.dag().all_ids_in_groups(&Group::ALL)?;
2310            let mut reserved = calculate_initial_reserved(self, &covered, heads).await?;
2311            for group in [Group::MASTER, Group::NON_MASTER] {
2312                for (vertex, opts) in heads.vertex_options() {
2313                    if opts.highest_group != group {
2314                        continue;
2315                    }
2316                    // Important: do not call self.map.assign_head. It does not trigger
2317                    // remote protocol properly. Call self.assign_head instead.
2318                    let prepared_segments = self
2319                        .assign_head(vertex.clone(), parents, group, &mut covered, &reserved)
2320                        .await?;
2321                    outcome.merge(prepared_segments);
2322                    // Update reserved.
2323                    if opts.reserve_size > 0 {
2324                        let low = self.map.vertex_id(vertex).await? + 1;
2325                        update_reserved(&mut reserved, &covered, low, opts.reserve_size);
2326                    }
2327                }
2328            }
2329
2330            // Update segments.
2331            self.dag
2332                .build_segments_from_prepared_flat_segments(&outcome)?;
2333
2334            // The master group might have new vertexes inserted, which will
2335            // affect the `overlay_map_id_set`.
2336            self.update_overlay_map_id_set()?;
2337        }
2338
2339        Ok(())
2340    }
2341
2342    /// Find vertexes that need to be reassigned from the non-master group
2343    /// to the master group. That is,
2344    /// `ancestors(master_heads_to_insert) & existing_non_master_group`
2345    ///
2346    /// Assume pre-fetching (populate_missing_vertexes_for_add_heads)
2347    /// was done, so this function can just use naive DFS without worrying
2348    /// about excessive remote lookups.
2349    async fn find_vertexes_to_reassign(
2350        &self,
2351        parents: &dyn Parents,
2352        heads: &VertexListWithOptions,
2353    ) -> Result<NameSet> {
2354        // Heads that need to be inserted to the master group.
2355        let master_heads = heads.vertexes_by_group(Group::MASTER);
2356
2357        // Visit vertexes recursively.
2358        let mut id_set = IdSet::empty();
2359        let mut to_visit: Vec<VertexName> = master_heads;
2360        let mut visited = HashSet::new();
2361        while let Some(vertex) = to_visit.pop() {
2362            if !visited.insert(vertex.clone()) {
2363                continue;
2364            }
2365            let id = self.vertex_id_optional(&vertex).await?;
2366            // None: The vertex/id is not yet inserted to IdMap.
2367            if let Some(id) = id {
2368                if id.group() == Group::MASTER {
2369                    // Already exist in the master group. Stop visiting.
2370                    continue;
2371                } else {
2372                    // Need reassign. Need to continue visiting.
2373                    id_set.push(id);
2374                }
2375            }
2376            let parents = parents.parent_names(vertex).await?;
2377            to_visit.extend(parents);
2378        }
2379
2380        let set = NameSet::from_spans_dag(id_set, self)?;
2381        tracing::debug!(target: "dag::reassign", "need to reassign: {:?}", &set);
2382        Ok(set)
2383    }
2384}
2385
2386/// Calculate the initial "reserved" set used before inserting new vertexes.
2387/// Only heads that have non-zero reserve_size and are presnet in the graph
2388/// take effect. In other words, heads that are known to be not present in
2389/// the local graph (ex. being added), or have zero reserve_size can be
2390/// skipped as an optimization.
2391async fn calculate_initial_reserved(
2392    map: &dyn IdConvert,
2393    covered: &IdSet,
2394    heads: &VertexListWithOptions,
2395) -> Result<IdSet> {
2396    let mut reserved = IdSet::empty();
2397    for (vertex, opts) in heads.vertex_options() {
2398        if opts.reserve_size == 0 {
2399            // Avoid potentially costly remote lookup.
2400            continue;
2401        }
2402        if let Some(id) = map
2403            .vertex_id_with_max_group(&vertex, opts.highest_group)
2404            .await?
2405        {
2406            update_reserved(&mut reserved, covered, id + 1, opts.reserve_size);
2407        }
2408    }
2409    Ok(reserved)
2410}
2411
2412fn update_reserved(reserved: &mut IdSet, covered: &IdSet, low: Id, reserve_size: u32) {
2413    if reserve_size == 0 {
2414        return;
2415    }
2416    let span = find_free_span(covered, low, reserve_size as _, true);
2417    reserved.push(span);
2418}
2419
2420/// Find a span with constraints:
2421/// - does not overlap with `covered`.
2422/// - `span.low` >= the given `low`.
2423/// - if `shrink_to_fit` is `false`, `span.high - span.low` must be `reserve_size`.
2424/// - if `shrink_to_fit` is `true`, the span can be smaller than `reserve_size` to
2425///   fill up existing gaps in `covered`.
2426///
2427/// `reserve_size` cannot be 0.
2428fn find_free_span(covered: &IdSet, low: Id, reserve_size: u64, shrink_to_fit: bool) -> IdSpan {
2429    assert!(reserve_size > 0);
2430    let mut low = low;
2431    let mut high;
2432    loop {
2433        high = (low + (reserve_size as u64) - 1).min(low.group().max_id());
2434        // Try to reserve id..=id+reserve_size-1
2435        let reserved = IdSet::from_spans(vec![low..=high]);
2436        let intersected = reserved.intersection(covered);
2437        if let Some(span) = intersected.iter_span_asc().next() {
2438            // Overlap with existing covered spans. Decrease `high` so it
2439            // no longer overlap.
2440            let last_free = span.low - 1;
2441            if last_free >= low && shrink_to_fit {
2442                // Use the remaining part of the previous reservation.
2443                //   [----------reserved--------------]
2444                //             [--intersected--]
2445                //   ^                                ^
2446                //   low                              high
2447                //            ^
2448                //            last_free
2449                //   [reserved] <- remaining of the previous reservation
2450                //            ^
2451                //            high
2452                high = last_free;
2453            } else {
2454                // No space on the left side. Try the right side.
2455                //   [--------reserved-------]
2456                //   [--intersected--]
2457                //   ^                       ^
2458                //   low                     high
2459                //        try next -> [------reserved------]
2460                //  ^                 ^
2461                //  last_free         low (try next)
2462                low = span.high + 1;
2463                continue;
2464            }
2465        }
2466        break;
2467    }
2468    let span = IdSpan::new(low, high);
2469    if !shrink_to_fit {
2470        assert_eq!(span.count(), reserve_size);
2471    }
2472    span
2473}
2474
2475fn is_ok_some<T>(value: Result<Option<T>>) -> bool {
2476    match value {
2477        Ok(Some(_)) => true,
2478        _ => false,
2479    }
2480}
2481
2482impl<IS, M, P, S> IdMapSnapshot for AbstractNameDag<IdDag<IS>, M, P, S>
2483where
2484    IS: IdDagStore,
2485    IdDag<IS>: TryClone + 'static,
2486    M: TryClone + IdConvert + Send + Sync + 'static,
2487    P: TryClone + Send + Sync + 'static,
2488    S: TryClone + Send + Sync + 'static,
2489{
2490    fn id_map_snapshot(&self) -> Result<Arc<dyn IdConvert + Send + Sync>> {
2491        Ok(self.try_snapshot()? as Arc<dyn IdConvert + Send + Sync>)
2492    }
2493}
2494
2495impl<IS, M, P, S> fmt::Debug for AbstractNameDag<IdDag<IS>, M, P, S>
2496where
2497    IS: IdDagStore,
2498    M: IdConvert + Send + Sync,
2499    P: Send + Sync,
2500    S: Send + Sync,
2501{
2502    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2503        debug(&self.dag, &self.map, f)
2504    }
2505}
2506
2507pub(crate) fn debug_segments_by_level_group<S: IdDagStore>(
2508    iddag: &IdDag<S>,
2509    idmap: &dyn IdConvert,
2510    level: Level,
2511    group: Group,
2512) -> Vec<String> {
2513    let mut result = Vec::new();
2514    // Show Id, with optional hash.
2515    let show = |id: Id| DebugId {
2516        id,
2517        name: non_blocking_result(idmap.vertex_name(id)).ok(),
2518    };
2519    let show_flags = |flags: SegmentFlags| -> String {
2520        let mut result = Vec::new();
2521        if flags.contains(SegmentFlags::HAS_ROOT) {
2522            result.push("Root");
2523        }
2524        if flags.contains(SegmentFlags::ONLY_HEAD) {
2525            result.push("OnlyHead");
2526        }
2527        result.join(" ")
2528    };
2529
2530    if let Ok(segments) = iddag.next_segments(group.min_id(), level) {
2531        for segment in segments.into_iter().rev() {
2532            if let (Ok(span), Ok(parents), Ok(flags)) =
2533                (segment.span(), segment.parents(), segment.flags())
2534            {
2535                let mut line = format!(
2536                    "{:.12?} : {:.12?} {:.12?}",
2537                    show(span.low),
2538                    show(span.high),
2539                    parents.into_iter().map(show).collect::<Vec<_>>(),
2540                );
2541                let flags = show_flags(flags);
2542                if !flags.is_empty() {
2543                    line += &format!(" {}", flags);
2544                }
2545                result.push(line);
2546            }
2547        }
2548    }
2549    result
2550}
2551
2552fn debug<S: IdDagStore>(
2553    iddag: &IdDag<S>,
2554    idmap: &dyn IdConvert,
2555    f: &mut fmt::Formatter,
2556) -> fmt::Result {
2557    if let Ok(max_level) = iddag.max_level() {
2558        writeln!(f, "Max Level: {}", max_level)?;
2559        for lv in (0..=max_level).rev() {
2560            writeln!(f, " Level {}", lv)?;
2561            for group in Group::ALL.iter().cloned() {
2562                writeln!(f, "  {}:", group)?;
2563                if let Ok(segments) = iddag.next_segments(group.min_id(), lv) {
2564                    writeln!(f, "   Segments: {}", segments.len())?;
2565                    for line in debug_segments_by_level_group(iddag, idmap, lv, group) {
2566                        writeln!(f, "    {}", line)?;
2567                    }
2568                }
2569            }
2570        }
2571    }
2572
2573    Ok(())
2574}
2575
2576struct DebugId {
2577    id: Id,
2578    name: Option<VertexName>,
2579}
2580
2581impl fmt::Debug for DebugId {
2582    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2583        if let Some(name) = &self.name {
2584            fmt::Debug::fmt(&name, f)?;
2585            f.write_str("+")?;
2586        }
2587        write!(f, "{:?}", self.id)?;
2588        Ok(())
2589    }
2590}