Skip to main content

dag/
dag.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! # dag
9//!
10//! Combination of IdMap and IdDag.
11
12use std::collections::BTreeMap;
13use std::collections::HashMap;
14use std::collections::HashSet;
15use std::env::var;
16use std::fmt;
17use std::io;
18use std::ops::Deref;
19use std::sync::atomic::AtomicUsize;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::RwLock;
24
25use dag_types::FlatSegment;
26use futures::future::BoxFuture;
27use futures::FutureExt;
28use futures::StreamExt;
29use futures::TryStreamExt;
30use nonblocking::non_blocking_result;
31
32use crate::clone::CloneData;
33use crate::default_impl;
34use crate::errors::bug;
35use crate::errors::programming;
36use crate::errors::DagError;
37use crate::errors::NotFoundError;
38use crate::id::Group;
39use crate::id::Id;
40use crate::id::Vertex;
41use crate::iddag::IdDag;
42use crate::iddag::IdDagAlgorithm;
43use crate::iddagstore::IdDagStore;
44use crate::idmap::CoreMemIdMap;
45use crate::idmap::IdMapAssignHead;
46use crate::idmap::IdMapWrite;
47use crate::lifecycle::LifecycleId;
48use crate::ops::CheckIntegrity;
49use crate::ops::DagAddHeads;
50use crate::ops::DagAlgorithm;
51use crate::ops::DagExportCloneData;
52use crate::ops::DagExportPullData;
53use crate::ops::DagImportCloneData;
54use crate::ops::DagImportPullData;
55use crate::ops::DagPersistent;
56use crate::ops::DagStrip;
57use crate::ops::IdConvert;
58use crate::ops::IdMapSnapshot;
59use crate::ops::Open;
60use crate::ops::Parents;
61use crate::ops::Persist;
62use crate::ops::PrefixLookup;
63use crate::ops::StorageVersion;
64use crate::ops::ToIdSet;
65use crate::ops::TryClone;
66use crate::protocol;
67use crate::protocol::is_remote_protocol_disabled;
68use crate::protocol::AncestorPath;
69use crate::protocol::Process;
70use crate::protocol::RemoteIdConvertProtocol;
71use crate::segment::PreparedFlatSegments;
72use crate::segment::SegmentFlags;
73use crate::set::hints::Flags;
74use crate::set::hints::Hints;
75use crate::set::id_static::BasicIterationOrder;
76use crate::set::Set;
77use crate::types_ext::PreparedFlatSegmentsExt;
78use crate::utils;
79use crate::Error::NeedSlowPath;
80use crate::IdSet;
81use crate::IdSpan;
82use crate::Level;
83use crate::Result;
84use crate::VerLink;
85use crate::VertexListWithOptions;
86use crate::VertexOptions;
87
88mod builder;
89#[cfg(any(test, feature = "indexedlog-backend"))]
90mod indexedlog_dag;
91mod mem_dag;
92
93pub use builder::DagBuilder;
94#[cfg(any(test, feature = "indexedlog-backend"))]
95pub use indexedlog_dag::Dag;
96#[cfg(any(test, feature = "indexedlog-backend"))]
97pub use indexedlog_dag::IndexedLogDagPath;
98pub use mem_dag::MemDag;
99pub use mem_dag::MemDagPath;
100
101pub struct AbstractDag<I, M, P, S>
102where
103    I: Send + Sync,
104    M: Send + Sync,
105    P: Send + Sync,
106    S: Send + Sync,
107{
108    pub(crate) dag: I,
109    pub(crate) map: M,
110
111    /// A read-only snapshot of the `Dag`.
112    /// Lazily calculated.
113    snapshot: RwLock<Option<Arc<Self>>>,
114
115    /// Non-virtual heads added via `add_heads` that are not flushed yet.
116    /// They can be flushed by `flush()`.
117    pending_heads: VertexListWithOptions,
118
119    /// Path used to open this `Dag`.
120    path: P,
121
122    /// Extra state of the `Dag`.
123    state: S,
124
125    /// Identity of the dag. Derived from `path`.
126    id: String,
127
128    /// `Id`s that are persisted on disk. Used to answer `dirty()`.
129    persisted_id_set: IdSet,
130
131    /// Overlay IdMap. Used to store IdMap results resolved using remote
132    /// protocols.
133    overlay_map: Arc<RwLock<CoreMemIdMap>>,
134
135    /// `Id`s that are allowed in the `overlay_map`. A protection.
136    /// The `overlay_map` is shared (Arc) and its ID should not exceed the
137    /// existing maximum ID at `map` open time. The IDs from
138    /// 0..overlay_map_next_id are considered immutable, but lazy.
139    overlay_map_id_set: IdSet,
140
141    /// The source of `overlay_map`s. This avoids absolute Ids, and is
142    /// used to flush overlay_map content shall the IdMap change on
143    /// disk.
144    overlay_map_paths: Arc<Mutex<Vec<(AncestorPath, Vec<Vertex>)>>>,
145
146    /// Defines how to communicate with a remote service.
147    /// The actual logic probably involves networking like HTTP etc
148    /// and is intended to be implemented outside the `dag` crate.
149    remote_protocol: Arc<dyn RemoteIdConvertProtocol>,
150
151    /// If set, clear and insert to the VIRTUAL group after reloading.
152    managed_virtual_group: Option<Arc<(Box<dyn Parents>, VertexListWithOptions /* derived */)>>,
153
154    /// A negative cache. Vertexes that are looked up remotely, and the remote
155    /// confirmed the vertexes are outside the master group.
156    missing_vertexes_confirmed_by_remote: Arc<RwLock<HashSet<Vertex>>>,
157
158    /// Internal stats (for testing and debugging).
159    lifecycle_id: LifecycleId,
160    pub(crate) internal_stats: DagInternalStats,
161}
162
163/// Statistics of dag internals. Useful to check if fast paths are used.
164#[derive(Debug, Default)]
165pub struct DagInternalStats {
166    /// Bumps when sort(set) takes O(set) slow path.
167    pub sort_slow_path_count: AtomicUsize,
168}
169
170impl<D, M, P, S> AbstractDag<D, M, P, S>
171where
172    D: Send + Sync,
173    M: Send + Sync,
174    P: Send + Sync,
175    S: Send + Sync,
176{
177    /// Extract inner states. Useful for advanced use-cases.
178    pub fn into_idmap_dag(self) -> (M, D) {
179        (self.map, self.dag)
180    }
181
182    /// Extract inner states. Useful for advanced use-cases.
183    pub fn into_idmap_dag_path_state(self) -> (M, D, P, S) {
184        (self.map, self.dag, self.path, self.state)
185    }
186}
187
188impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
189where
190    IS: IdDagStore,
191    IdDag<IS>: TryClone,
192    M: Send + Sync + IdMapWrite + IdMapAssignHead + TryClone + 'static,
193    P: Send + Sync + TryClone + 'static,
194    S: Send + Sync + TryClone + 'static,
195{
196    /// Set the content of the VIRTUAL group that survives reloading.
197    ///
198    /// `items` is a list of vertexes and parents. The vertexes MUST be unique
199    /// and not already exist in non-VIRTUAL groups. This assumption is used
200    /// as an optimization to avoid remote lookups.
201    ///
202    /// Existing content of the VIRTUAL group will be cleared before inserting
203    /// `items`. So this API feels declarative. As a comparison, `add_heads`
204    /// is imperative.
205    ///
206    /// This function calls `maybe_recreate_virtual_group` immediately to clear
207    /// and update contents in the VIRTUAL group. `maybe_recreate_virtual_group`
208    /// will be called automatically after graph changing operations:
209    /// `add_heads_and_flush`, `strip`, `flush`, `import_pull_data`.
210    pub async fn set_managed_virtual_group(
211        &mut self,
212        items: Option<Vec<(Vertex, Vec<Vertex>)>>,
213    ) -> Result<()> {
214        tracing::debug!(target: "dag::set_managed_virtual_group", lifecycle_id=?self.lifecycle_id, ?items);
215        self.managed_virtual_group = items.map(|items| {
216            // Calculate `Parents` and `VertexListWithOptions` so they can be
217            // used in `maybe_recreate_virtual_group`.
218            let opts = VertexOptions {
219                reserve_size: 0,
220                desired_group: Group::VIRTUAL,
221            };
222            let heads: VertexListWithOptions = items
223                .iter()
224                .map(|(v, _p)| (v.clone(), opts.clone()))
225                .collect::<Vec<_>>()
226                .into();
227            let parents: HashMap<Vertex, Vec<Vertex>> = items.into_iter().collect();
228            let parents: Box<dyn Parents> = Box::new(parents);
229            Arc::new((parents, heads))
230        });
231        self.maybe_recreate_virtual_group().await
232    }
233
234    /// Clear vertexes in the VIRTUAL group.
235    pub(crate) async fn clear_virtual_group(&mut self) -> Result<()> {
236        let id_set = self.dag.all_ids_in_groups(&[Group::VIRTUAL])?;
237        if !id_set.is_empty() {
238            let removed = self.dag.strip(id_set)?;
239            for span in removed.iter_span_desc() {
240                self.map.remove_range(span.low, span.high).await?;
241            }
242        }
243        Ok(())
244    }
245
246    /// If `managed_virtual_group` is set, clear the VIRTUAL group and re-insert
247    /// based on `managed_virtual_group`.
248    async fn maybe_recreate_virtual_group(&mut self) -> Result<()> {
249        if let Some(maintained_virtual_group) = self.managed_virtual_group.as_ref() {
250            let maintained_virtual_group = maintained_virtual_group.clone();
251            self.clear_virtual_group().await?;
252            let parents = &maintained_virtual_group.0;
253            let head_opts = &maintained_virtual_group.1;
254            // With the assumption (see set_managed_virtual_group docstring) that VIRTUAL group
255            // only has unique vertexes, we can pre-populate the negative cache to avoid remote
256            // lookup.
257            {
258                let mut cache = self.missing_vertexes_confirmed_by_remote.write().unwrap();
259                for v in head_opts.vertexes() {
260                    cache.insert(v);
261                }
262            }
263            // Insert to the VIRTUAL group, using the a precalculated insertion order.
264            self.add_heads(parents.as_ref(), head_opts).await?;
265        }
266        Ok(())
267    }
268}
269
270#[async_trait::async_trait]
271impl<IS, M, P, S> DagPersistent for AbstractDag<IdDag<IS>, M, P, S>
272where
273    IS: IdDagStore + Persist + StorageVersion,
274    IdDag<IS>: TryClone + 'static,
275    M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
276    P: Open<OpenTarget = Self> + Send + Sync + 'static,
277    S: TryClone + StorageVersion + Persist + Send + Sync + 'static,
278{
279    // See docstring in ops.rs for details.
280    async fn add_heads_and_flush(
281        &mut self,
282        parents: &dyn Parents,
283        heads: &VertexListWithOptions,
284    ) -> Result<()> {
285        if !self.pending_heads.is_empty() {
286            return programming(format!(
287                "ProgrammingError: add_heads_and_flush called with pending heads ({:?})",
288                &self.pending_heads.vertexes(),
289            ));
290        }
291        tracing::debug!(target: "dag::add_heads_and_flush", lifecycle_id=?self.lifecycle_id, ?heads);
292
293        // Clear the VIRTUAL group. Their parents might have changed in incompatible ways.
294        self.clear_virtual_group().await?;
295
296        // Take lock.
297        //
298        // Reload meta and logs. This drops in-memory changes, which is fine because we have
299        // checked there are no in-memory changes at the beginning.
300        //
301        // Also see comments in `DagState::lock()`.
302        let old_version = self.state.storage_version();
303        let lock = self.state.lock()?;
304        let map_lock = self.map.lock()?;
305        let dag_lock = self.dag.lock()?;
306        self.state.reload(&lock)?;
307        let new_version = self.state.storage_version();
308        if old_version != new_version {
309            self.invalidate_snapshot();
310            self.invalidate_missing_vertex_cache();
311            self.invalidate_overlay_map()?;
312        }
313
314        self.map.reload(&map_lock)?;
315        self.dag.reload(&dag_lock)?;
316
317        // Build.
318        self.build_with_lock(parents, heads, &map_lock).await?;
319
320        // Write to disk.
321        self.map.persist(&map_lock)?;
322        self.dag.persist(&dag_lock)?;
323        self.state.persist(&lock)?;
324        drop(dag_lock);
325        drop(map_lock);
326        drop(lock);
327
328        self.persisted_id_set = self.dag.all_ids_in_groups(&Group::PERSIST)?;
329        self.maybe_recreate_virtual_group().await?;
330
331        debug_assert_eq!(self.dirty().await?.count().await?, 0);
332
333        Ok(())
334    }
335
336    /// Write in-memory DAG to disk. This will also pick up changes to
337    /// the DAG by other processes.
338    ///
339    /// This function re-assigns ids for vertexes. That requires the
340    /// pending ids and vertexes to be non-lazy. If you're changing
341    /// internal structures (ex. dag and map) directly, or introducing
342    /// lazy vertexes, then avoid this function. Instead, lock and
343    /// flush directly (see `add_heads_and_flush`, `import_clone_data`).
344    ///
345    /// `heads` specify additional options for special vertexes. This
346    /// overrides the `VertexOptions` provided to `add_head`. If `heads`
347    /// is empty, then `VertexOptions` provided to `add_head` will be
348    /// used.
349    async fn flush(&mut self, heads: &VertexListWithOptions) -> Result<()> {
350        tracing::debug!(target: "dag::flush", lifecycle_id=?self.lifecycle_id, ?heads);
351        // Sanity check.
352        for result in self.vertex_id_batch(&heads.vertexes()).await? {
353            result?;
354        }
355        // Previous version of the API requires `master_heads: &[Vertex]`.
356        // Warn about possible misuses.
357        if heads.vertexes_by_group(Group::MASTER).len() != heads.len() {
358            return programming(format!(
359                "Dag::flush({:?}) is probably misused (group is not master)",
360                heads
361            ));
362        }
363
364        // Write cached IdMap to disk.
365        self.flush_cached_idmap().await?;
366
367        // Constructs a new graph so we can copy pending data from the existing graph.
368        let mut new_name_dag: Self = self.path.open()?;
369
370        let parents: &(dyn DagAlgorithm + Send + Sync) = self;
371        let non_master_heads: VertexListWithOptions = self.pending_heads.clone();
372        new_name_dag.inherit_configurations_from(self);
373        let heads = heads.clone().chain(non_master_heads);
374        new_name_dag.add_heads_and_flush(&parents, &heads).await?;
375        new_name_dag.maybe_recreate_virtual_group().await?;
376
377        *self = new_name_dag;
378        Ok(())
379    }
380
381    /// Write in-memory IdMap paths to disk so the next time we don't need to
382    /// ask remote service for IdMap translation.
383    #[tracing::instrument(skip(self))]
384    async fn flush_cached_idmap(&self) -> Result<()> {
385        // The map might have changed on disk. We cannot use the ids in overlay_map
386        // directly. Instead, re-translate the paths.
387
388        // Prepare data to insert. Do not hold Mutex across async yield points.
389        let mut to_insert: Vec<(AncestorPath, Vec<Vertex>)> = Vec::new();
390        std::mem::swap(&mut to_insert, &mut *self.overlay_map_paths.lock().unwrap());
391        if to_insert.is_empty() {
392            return Ok(());
393        }
394
395        // Lock, reload from disk. Use a new state so the existing dag is not affected.
396        tracing::debug!(target: "dag::cache", "flushing cached idmap ({} items)", to_insert.len());
397        let mut new: Self = self.path.open()?;
398        let lock = new.state.lock()?;
399        let map_lock = new.map.lock()?;
400        let dag_lock = new.dag.lock()?;
401        new.state.reload(&lock)?;
402        new.map.reload(&map_lock)?;
403        new.dag.reload(&dag_lock)?;
404        new.inherit_configurations_from(self);
405        std::mem::swap(&mut to_insert, &mut *new.overlay_map_paths.lock().unwrap());
406        new.flush_cached_idmap_with_lock(&map_lock).await?;
407
408        new.state.persist(&lock)?;
409
410        Ok(())
411    }
412}
413
414impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
415where
416    IS: IdDagStore,
417    IdDag<IS>: TryClone + 'static,
418    M: TryClone + IdConvert + IdMapWrite + Persist + Send + Sync + 'static,
419    P: Send + Sync + 'static,
420    S: TryClone + Send + Sync + 'static,
421{
422    /// Implementation detail. Must be protected by a lock.
423    async fn flush_cached_idmap_with_lock(&mut self, map_lock: &M::Lock) -> Result<()> {
424        let mut to_insert: Vec<(AncestorPath, Vec<Vertex>)> = Vec::new();
425        std::mem::swap(&mut to_insert, &mut *self.overlay_map_paths.lock().unwrap());
426        if to_insert.is_empty() {
427            return Ok(());
428        }
429
430        let id_names = calculate_id_name_from_paths(
431            &self.map,
432            &*self.dag,
433            &self.overlay_map_id_set,
434            &to_insert,
435        )
436        .await?;
437
438        // For testing purpose, skip inserting certain vertexes.
439        let mut skip_vertexes: Option<HashSet<Vertex>> = None;
440        if crate::is_testing() {
441            if let Ok(s) = var("DAG_SKIP_FLUSH_VERTEXES") {
442                skip_vertexes = Some(
443                    s.split(',')
444                        .filter_map(|s| Vertex::from_hex(s.as_bytes()).ok())
445                        .collect(),
446                )
447            }
448        }
449
450        for (id, name) in id_names {
451            if let Some(skip) = &skip_vertexes {
452                if skip.contains(&name) {
453                    tracing::info!(
454                        target: "dag::cache",
455                        "skip flushing {:?}-{} to IdMap set by DAG_SKIP_FLUSH_VERTEXES",
456                        &name,
457                        id
458                    );
459                    continue;
460                }
461            }
462            tracing::debug!(target: "dag::cache", "insert {:?}-{} to IdMap", &name, id);
463            self.map.insert(id, name.as_ref()).await?;
464        }
465
466        self.map.persist(map_lock)?;
467        Ok(())
468    }
469}
470
471impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
472where
473    IS: Send + Sync + 'static,
474    IdDag<IS>: StorageVersion,
475    M: Send + Sync + 'static,
476    P: Send + Sync + 'static,
477    S: Send + Sync + 'static,
478{
479    /// Attempt to reuse caches from `other` if two `Dag`s are compatible.
480    /// Usually called when `self` is newly created.
481    fn maybe_reuse_caches_from(&mut self, other: &Self) {
482        // No need to check IdMap (or "state" which includes both IdDag and IdMap).
483        // If IdMap is changed (ex. by flush_cached_idmap), the cache states
484        // (missing_vertexes_confirmed_by_remote, overlay_map) are still reusable.
485        let dag_version_mismatch = self.dag.storage_version() != other.dag.storage_version();
486        let persisted_id_mismatch =
487            self.persisted_id_set.as_spans() != other.persisted_id_set.as_spans();
488        if dag_version_mismatch || persisted_id_mismatch {
489            tracing::debug!(target: "dag::cache", "cannot reuse cache");
490            return;
491        }
492        tracing::debug!(
493            target: "dag::cache", "reusing cache ({} missing)",
494            other.missing_vertexes_confirmed_by_remote.read().unwrap().len(),
495        );
496        self.missing_vertexes_confirmed_by_remote =
497            other.missing_vertexes_confirmed_by_remote.clone();
498        self.overlay_map = other.overlay_map.clone();
499        self.overlay_map_paths = other.overlay_map_paths.clone();
500    }
501
502    /// Set the remote protocol for converting between Id and Vertex remotely.
503    ///
504    /// This is usually used on "sparse" ("lazy") Dag where the IdMap is incomplete
505    /// for vertexes in the master groups.
506    pub fn set_remote_protocol(&mut self, protocol: Arc<dyn RemoteIdConvertProtocol>) {
507        self.remote_protocol = protocol;
508    }
509
510    /// Inherit configurations like `managed_virtual_group` from `original`.
511    fn inherit_configurations_from(&mut self, original: &Self) {
512        let seg_size = original.dag.get_new_segment_size();
513        self.dag.set_new_segment_size(seg_size);
514        self.set_remote_protocol(original.remote_protocol.clone());
515        self.managed_virtual_group = original.managed_virtual_group.clone();
516        self.maybe_reuse_caches_from(original)
517    }
518}
519
520#[async_trait::async_trait]
521impl<IS, M, P, S> DagAddHeads for AbstractDag<IdDag<IS>, M, P, S>
522where
523    IS: IdDagStore,
524    IdDag<IS>: TryClone,
525    M: TryClone + IdMapAssignHead + Send + Sync + 'static,
526    P: TryClone + Send + Sync + 'static,
527    S: TryClone + Send + Sync + 'static,
528{
529    // See docstring in ops.rs for details.
530    async fn add_heads(
531        &mut self,
532        parents: &dyn Parents,
533        heads: &VertexListWithOptions,
534    ) -> Result<bool> {
535        tracing::debug!(target: "dag::add_heads", lifecycle_id=?self.lifecycle_id, ?heads);
536        // Populate vertex negative cache to reduce round-trips doing remote lookups.
537        // Attention: this might have side effect recreating the snapshots!
538        // Skip this optimization for virtual group add_heads since the virutal set
539        // is usually small and related tracing logs can be noisy.
540        if heads.min_desired_group().unwrap_or(Group::VIRTUAL) < Group::VIRTUAL {
541            self.populate_missing_vertexes_for_add_heads(parents, &heads.vertexes())
542                .await?;
543        }
544
545        // When heads are not VIRTUAL, invalidate_snapshot() helps performance, is optional for
546        // correctness. invalidate_snapshot() decreases VerLink ref count so VerLink::bump() can
547        // use a fast path mutating in place. When heads are VIRTUAL, invalidate_snapshot() is
548        // necessary for correctness, since the versions (VerLinks) won't be bumped to avoid
549        // excessive cache invalidation.
550        self.invalidate_snapshot();
551
552        // This API takes `heads` with "desired_group"s. When a head already exists in a lower
553        // group than its "desired_group" we need to remove the higher-group id and re-assign
554        // the head and its ancestors to the lower group.
555        //
556        // For simplicity, add_heads is *append-only* and does not want to deal with the
557        // reassignment. So if you have code like:
558        //
559        //    let set1 = dag.range(x, y); // set1 is associated with the current dag, "dag v1".
560        //    dag.add_heads(...);
561        //    let set2 = dag.range(p, q); // set2 is associated with the updated dag, "dag v2".
562        //    let set3 = set2 & set1;
563        //
564        // The `set3` understands that the "dag v2" is a superset of "dag v1" (because add_heads
565        // does not strip ids), and can use fast paths - it can assume same ids in set2 and set3
566        // mean the same vertexes and ensure set3 is associated with "dag v2". If `add_heads`
567        // strips out commits, then the fast paths (note: not just for set3, also p and q) cannot
568        // be used.
569        //
570        // Practically, `heads` match one of these patterns:
571        // - (This use-case is going away): desired_group = MASTER for all heads. This is used by
572        //   old Mononoke server-side logic. The server only indexes the "main" branch. All vertexes
573        //   are in the MASTER group. To avoid misuse by the client-side, we check that there
574        //   is nothing outisde the MASTER group.
575        // - desired_group = NON_MASTER for all heads. This is used by Sapling client.
576        //   It might use desired_group = MASTER on add_heads_and_flush, but not here.
577
578        // Performance-wise, add_heads + flush is slower than
579        // add_heads_and_flush.
580        //
581        // Practically, the callsite might want to use add_heads + flush
582        // instead of add_heads_and_flush, if:
583        // - The callsites cannot figure out "master_heads" at the same time
584        //   it does the graph change. For example, hg might know commits
585        //   before bookmark movements.
586        // - The callsite is trying some temporary graph changes, and does
587        //   not want to pollute the on-disk DAG. For example, calculating
588        //   a preview of a rebase.
589        // Update IdMap. Keep track of what heads are added.
590        let mut outcome = PreparedFlatSegments::default();
591        let mut covered = self.dag().all_ids_in_groups(&Group::ALL)?;
592        let mut reserved = calculate_initial_reserved(self, &covered, heads).await?;
593        for (head, opts) in heads.vertex_options() {
594            let need_assigning = match self.vertex_id_optional(&head).await? {
595                Some(id) => {
596                    if id.group() > opts.desired_group {
597                        return programming(format!(
598                            "add_heads: cannot re-assign {:?}:{:?} from {} to {} (desired), use add_heads_and_flush instead",
599                            head,
600                            id,
601                            id.group(),
602                            opts.desired_group
603                        ));
604                    } else {
605                        // In some cases (ex. old Mononoke use-case), the id exists in IdMap but
606                        // not IdDag. Still need to assign the id to IdDag.
607                        !self.dag.contains_id(id)?
608                    }
609                }
610                None => true,
611            };
612            if need_assigning {
613                let group = opts.desired_group;
614                // If any ancestors have incompatible group (ex. desired = MASTER, ancestor has
615                // NON_MASTER), then `assign_head` below will report an error.
616                let prepared_segments = self
617                    .assign_head(head.clone(), parents, group, &mut covered, &reserved)
618                    .await?;
619                outcome.merge(prepared_segments);
620                if opts.reserve_size > 0 {
621                    let low = self.map.vertex_id(head.clone()).await? + 1;
622                    update_reserved(&mut reserved, &covered, low, opts.reserve_size);
623                }
624                if group != Group::VIRTUAL {
625                    self.pending_heads.push((head, opts));
626                }
627            }
628        }
629
630        // Update high level segments from the flat segments just inserted.
631        self.dag
632            .build_segments_from_prepared_flat_segments(&outcome)?;
633
634        Ok(outcome.segment_count() > 0)
635    }
636}
637
638#[async_trait::async_trait]
639impl<IS, M, P, S> DagStrip for AbstractDag<IdDag<IS>, M, P, S>
640where
641    IS: IdDagStore + Persist,
642    IdDag<IS>: TryClone + StorageVersion,
643    M: TryClone + Persist + IdMapWrite + IdConvert + Send + Sync + 'static,
644    P: TryClone + Open<OpenTarget = Self> + Send + Sync + 'static,
645    S: TryClone + Persist + Send + Sync + 'static,
646{
647    async fn strip(&mut self, set: &Set) -> Result<()> {
648        if !self.pending_heads.is_empty() {
649            return programming(format!(
650                "strip does not support pending heads ({:?})",
651                &self.pending_heads.vertexes(),
652            ));
653        }
654        tracing::debug!(target: "dag::strip", lifecycle_id=?self.lifecycle_id, ?set);
655
656        // Do strip with a lock to avoid cases where descendants are added to
657        // the stripped segments.
658        let mut new: Self = self.path.open()?;
659        let (lock, map_lock, dag_lock) = new.reload()?;
660        new.inherit_configurations_from(self);
661
662        new.strip_with_lock(set, &map_lock).await?;
663        new.persist(lock, map_lock, dag_lock)?;
664        new.maybe_recreate_virtual_group().await?;
665
666        *self = new;
667        Ok(())
668    }
669}
670
671impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
672where
673    IS: IdDagStore,
674    IdDag<IS>: TryClone,
675    M: TryClone + Persist + IdMapWrite + IdConvert + Send + Sync + 'static,
676    P: TryClone + Send + Sync + 'static,
677    S: TryClone + Send + Sync + 'static,
678{
679    /// Internal impelementation of "strip".
680    async fn strip_with_lock(&mut self, set: &Set, map_lock: &M::Lock) -> Result<()> {
681        if !self.pending_heads.is_empty() {
682            return programming(format!(
683                "strip does not support pending heads ({:?})",
684                &self.pending_heads.vertexes(),
685            ));
686        }
687
688        let id_set = self.to_id_set(set).await?;
689
690        // Heads in the master group must be known. Strip might "create" heads that are not
691        // currently known. Resolve them to ensure graph integrity.
692        let head_ids: Vec<Id> = {
693            // strip will include descendants.
694            let to_strip = self.dag.descendants(id_set.clone())?;
695            // only vertexes in the master group can be lazy.
696            let master_group = self.dag.master_group()?;
697            let master_group_after_strip = master_group.difference(&to_strip);
698            let heads_before_strip = self.dag.heads_ancestors(master_group)?;
699            let heads_after_strip = self.dag.heads_ancestors(master_group_after_strip)?;
700            let new_heads = heads_after_strip.difference(&heads_before_strip);
701            new_heads.iter_desc().collect()
702        };
703        let heads_after_strip = self.vertex_name_batch(&head_ids).await?;
704        tracing::debug!(target: "dag::strip", "heads after strip: {:?}", &heads_after_strip);
705        // Write IdMap cache first, they will become problematic to write
706        // after "remove" because the `VerLink`s might become incompatible.
707        self.flush_cached_idmap_with_lock(map_lock).await?;
708
709        let removed_id_set = self.dag.strip(id_set)?;
710        tracing::debug!(target: "dag::strip", "removed id set: {:?}", &removed_id_set);
711
712        let mut removed_vertexes = Vec::new();
713        for span in removed_id_set.iter_span_desc() {
714            let vertexes = self.map.remove_range(span.low, span.high).await?;
715            removed_vertexes.extend(vertexes);
716        }
717        tracing::debug!(target: "dag::strip", "removed vertexes: {:?}", &removed_vertexes);
718
719        // Add removed names to missing cache.
720        self.missing_vertexes_confirmed_by_remote
721            .write()
722            .unwrap()
723            .extend(removed_vertexes);
724
725        // Snapshot cannot be reused.
726        self.invalidate_snapshot();
727
728        Ok(())
729    }
730}
731
732#[async_trait::async_trait]
733impl<IS, M, P, S> IdMapWrite for AbstractDag<IdDag<IS>, M, P, S>
734where
735    IS: IdDagStore,
736    IdDag<IS>: TryClone,
737    M: TryClone + IdMapAssignHead + Send + Sync,
738    P: TryClone + Send + Sync,
739    S: TryClone + Send + Sync,
740{
741    async fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
742        self.map.insert(id, name).await
743    }
744
745    async fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<Vertex>> {
746        self.map.remove_range(low, high).await
747    }
748}
749
750#[async_trait::async_trait]
751impl<IS, M, P, S> DagImportCloneData for AbstractDag<IdDag<IS>, M, P, S>
752where
753    IS: IdDagStore + Persist + 'static,
754    IdDag<IS>: TryClone,
755    M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
756    P: TryClone + Send + Sync + 'static,
757    S: TryClone + Persist + Send + Sync + 'static,
758{
759    async fn import_clone_data(&mut self, clone_data: CloneData<Vertex>) -> Result<()> {
760        // Write directly to disk. Bypassing "flush()" that re-assigns Ids
761        // using parent functions.
762        let (lock, map_lock, dag_lock) = self.reload()?;
763
764        if !self.dag.all()?.is_empty() {
765            return programming("Cannot import clone data for non-empty graph");
766        }
767        for (id, name) in clone_data.idmap {
768            tracing::debug!(target: "dag::clone", "insert IdMap: {:?}-{:?}", &name, id);
769            self.map.insert(id, name.as_ref()).await?;
770        }
771        self.dag
772            .build_segments_from_prepared_flat_segments(&clone_data.flat_segments)?;
773
774        self.verify_missing().await?;
775
776        self.persist(lock, map_lock, dag_lock)
777    }
778}
779
780impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
781where
782    IS: IdDagStore + Persist + 'static,
783    IdDag<IS>: TryClone,
784    M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
785    P: TryClone + Send + Sync + 'static,
786    S: TryClone + Persist + Send + Sync + 'static,
787{
788    /// Verify that universally known vertexes and heads are present in IdMap.
789    async fn verify_missing(&self) -> Result<()> {
790        let missing: Vec<Id> = self.check_universal_ids().await?;
791        if !missing.is_empty() {
792            let msg = format!(
793                concat!(
794                    "Clone data does not contain vertex for {:?}. ",
795                    "This is most likely a server-side bug."
796                ),
797                missing,
798            );
799            return programming(msg);
800        }
801
802        Ok(())
803    }
804
805    fn reload(&mut self) -> Result<(S::Lock, M::Lock, IS::Lock)> {
806        let lock = self.state.lock()?;
807        let map_lock = self.map.lock()?;
808        let dag_lock = self.dag.lock()?;
809        self.state.reload(&lock)?;
810        self.map.reload(&map_lock)?;
811        self.dag.reload(&dag_lock)?;
812
813        Ok((lock, map_lock, dag_lock))
814    }
815
816    fn persist(&mut self, lock: S::Lock, map_lock: M::Lock, dag_lock: IS::Lock) -> Result<()> {
817        self.map.persist(&map_lock)?;
818        self.dag.persist(&dag_lock)?;
819        self.state.persist(&lock)?;
820
821        self.invalidate_overlay_map()?;
822        self.persisted_id_set = self.dag.all_ids_in_groups(&Group::PERSIST)?;
823
824        Ok(())
825    }
826}
827
828#[async_trait::async_trait]
829impl<IS, M, P, S> DagImportPullData for AbstractDag<IdDag<IS>, M, P, S>
830where
831    IS: IdDagStore + Persist,
832    IdDag<IS>: TryClone + StorageVersion,
833    M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
834    P: Open<OpenTarget = Self> + TryClone + Send + Sync + 'static,
835    S: TryClone + Persist + Send + Sync + 'static,
836{
837    // See docstring in ops.py for details.
838    async fn import_pull_data(
839        &mut self,
840        clone_data: CloneData<Vertex>,
841        heads: &VertexListWithOptions,
842    ) -> Result<()> {
843        if !self.pending_heads.is_empty() {
844            return programming(format!(
845                "import_pull_data called with pending heads ({:?})",
846                &self.pending_heads.vertexes(),
847            ));
848        }
849        if let Some(group) = heads.max_desired_group() {
850            if group != Group::MASTER {
851                return programming(concat!(
852                    "import_pull_data should only take MASTER group heads. ",
853                    "Only MASTER group can contain lazy vertexes like what pull_data uses."
854                ));
855            }
856        }
857
858        for id in clone_data.flat_segments.parents_head_and_roots() {
859            if !clone_data.idmap.contains_key(&id) {
860                return programming(format!(
861                    "server does not provide name for id {:?} in pull data",
862                    id
863                ));
864            }
865        }
866
867        // Constructs a new graph so we don't expose a broken `self` state on error.
868        let mut new: Self = self.path.open()?;
869        let (lock, map_lock, dag_lock) = new.reload()?;
870        new.inherit_configurations_from(self);
871
872        // Parents that should exist in the local graph. Look them up in 1 round-trip
873        // and insert to the local graph.
874        // Also check that roots of the new segments do not overlap with the local graph.
875        // For example,
876        //
877        //      D          When the client has B (and A, C), and is pulling D,
878        //     /|\         the server provides D, E, F, with parents B and C,
879        //    F B E        and roots F and E.
880        //      |\|        The client must have B and C, and must not have F
881        //      A C        or E.
882        {
883            let mut root_ids: Vec<Id> = Vec::new();
884            let mut parent_ids: Vec<Id> = Vec::new();
885            let segments = &clone_data.flat_segments.segments;
886            let id_set = IdSet::from_spans(segments.iter().map(|s| s.low..=s.high));
887            for seg in segments {
888                let pids: Vec<Id> = seg.parents.to_vec();
889                // Parents that are not part of the pull vertexes should exist
890                // in the local graph.
891                let connected_pids: Vec<Id> = pids
892                    .iter()
893                    .copied()
894                    .filter(|&p| !id_set.contains(p))
895                    .collect();
896                if connected_pids.len() == pids.len() {
897                    // The "low" of the segment is a root (of vertexes to insert).
898                    // It needs an overlap check.
899                    root_ids.push(seg.low);
900                }
901                parent_ids.extend(connected_pids);
902            }
903
904            let to_names = |ids: &[Id], hint: &str| -> Result<Vec<Vertex>> {
905                let names = ids.iter().map(|i| match clone_data.idmap.get(i) {
906                    Some(v) => Ok(v.clone()),
907                    None => {
908                        programming(format!("server does not provide name for {} {:?}", hint, i))
909                    }
910                });
911                names.collect()
912            };
913
914            let parent_names = to_names(&parent_ids, "parent")?;
915            let root_names = to_names(&root_ids, "root")?;
916            tracing::trace!(
917                "pull: connected parents: {:?}, roots: {:?}",
918                &parent_names,
919                &root_names
920            );
921
922            // Pre-lookup in one round-trip.
923            let mut names = parent_names
924                .iter()
925                .chain(root_names.iter())
926                .cloned()
927                .collect::<Vec<_>>();
928            names.sort_unstable();
929            names.dedup();
930            let resolved = new.vertex_id_batch(&names).await?;
931            assert_eq!(resolved.len(), names.len());
932            for (id, name) in resolved.into_iter().zip(names) {
933                if let Ok(id) = id {
934                    if !new.map.contains_vertex_name(&name).await? {
935                        tracing::debug!(target: "dag::pull", "insert IdMap: {:?}-{:?}", &name, id);
936                        new.map.insert(id, name.as_ref()).await?;
937                    }
938                }
939            }
940
941            for name in root_names {
942                if new.contains_vertex_name(&name).await? {
943                    let e = NeedSlowPath(format!("{:?} exists in local graph", name));
944                    return Err(e);
945                }
946            }
947
948            let client_parents = new.vertex_id_batch(&parent_names).await?;
949            client_parents.into_iter().collect::<Result<Vec<Id>>>()?;
950        }
951
952        // Prepare indexes and states used below.
953        /// Query server segments with some indexes.
954        struct ServerState<'a> {
955            seg_by_high: BTreeMap<Id, FlatSegment>,
956            idmap_by_name: BTreeMap<&'a Vertex, Id>,
957            idmap_by_id: &'a BTreeMap<Id, Vertex>,
958        }
959        let mut server = ServerState {
960            seg_by_high: clone_data
961                .flat_segments
962                .segments
963                .iter()
964                .map(|s| (s.high, s.clone()))
965                .collect(),
966            idmap_by_name: clone_data
967                .idmap
968                .iter()
969                .map(|(&id, name)| (name, id))
970                .collect(),
971            idmap_by_id: &clone_data.idmap,
972        };
973
974        impl<'a> ServerState<'a> {
975            /// Find the segment that contains the (server-side) Id.
976            fn seg_containing_id(&self, server_id: Id) -> Result<&FlatSegment> {
977                let seg = match self.seg_by_high.range(server_id..).next() {
978                    Some((_high, seg)) => {
979                        if seg.low <= server_id && seg.high >= server_id {
980                            Some(seg)
981                        } else {
982                            None
983                        }
984                    }
985                    None => None,
986                };
987                seg.ok_or_else(|| {
988                    DagError::Programming(format!(
989                        "server does not provide segment covering id {}",
990                        server_id
991                    ))
992                })
993            }
994
995            /// Split a server segment from `[ low --  middle -- high ]` to
996            /// `[ low -- middle ] [ middle + 1 -- high ]`.
997            fn split_seg(&mut self, high: Id, middle: Id) {
998                // This is useful when "middle" is a "parent" of another segment, like:
999                //    seg 1 (server): 100 -- 115 -- 120
1000                //    seg 2 (server):                    121 -- 130, parents: [115]
1001                // While the task is to ensure seg 2's head H is present, we can split seg 1:
1002                //    seg 1a (server): 110 -- 115
1003                //    seg 1b (server):            116 -- 120, parents: [115]
1004                //    seg 2  (server):                       121 -- 130 (H), parents: [115]
1005                // Then remap and insert seg 1a and seg 2 first to complete the "H" goal:
1006                //    seg 1a (client): 10 -- 15
1007                //    seg 2  (client):          16 -- 20 (H), parents: [15]
1008                // The 10 ... 20 range is now continuous and friendly to merge to high-level
1009                // segments. The rest of seg 1, seg 1b (server) can be picked up later when
1010                // visiting from other heads.
1011                let seg = self
1012                    .seg_by_high
1013                    .remove(&high)
1014                    .expect("bug: invalid high passed to split_seg");
1015                assert!(seg.low <= middle);
1016                assert!(seg.high > middle);
1017                assert!(self.idmap_by_id.contains_key(&middle));
1018                let seg1 = FlatSegment {
1019                    low: seg.low,
1020                    high: middle,
1021                    parents: seg.parents,
1022                };
1023                let seg2 = FlatSegment {
1024                    low: middle + 1,
1025                    high: seg.high,
1026                    parents: vec![middle],
1027                };
1028                self.seg_by_high.insert(seg1.high, seg1);
1029                self.seg_by_high.insert(seg2.high, seg2);
1030            }
1031
1032            fn name_by_id(&self, id: Id) -> Vertex {
1033                self.idmap_by_id
1034                    .get(&id)
1035                    .expect("IdMap should contain the `id`. It should be checked before.")
1036                    .clone()
1037            }
1038
1039            fn id_by_name(&self, name: &Vertex) -> Option<Id> {
1040                self.idmap_by_name.get(name).copied()
1041            }
1042        }
1043
1044        // `taken` is the union of `covered` and `reserved`, mainly used by `find_free_span`.
1045        let mut taken = {
1046            // Normally we would want `calculate_initial_reserved` here. But we calculate head
1047            // reservation for all `heads` in order, instead of just considering heads in the
1048            // `clone_data`. So we're fine without the "initial reserved". In other words, the
1049            // `calculate_initial_reserved` logic is "inlined" into the `for ... in heads`
1050            // loop below.
1051            new.dag().all_ids_in_groups(&[Group::MASTER])?
1052        };
1053
1054        // Output. Remapped segments to insert.
1055        let mut prepared_client_segments = PreparedFlatSegments::default();
1056
1057        // Insert segments by visiting the heads following the `VertexOptions` order.
1058        //
1059        // If a segment is not ready to be inserted (ex. its parents are still missing),
1060        // their parents will be visited recursively. This has the nice effects
1061        // comparing to `import_clone_data` which blindly takes the input as-is:
1062        // - De-fragment `clone_data`: gaps or sub-optional segment order won't hurt.
1063        // - Respect the local `VertexOptions`: respect the order and reserve_size
1064        //   set locally if possible.
1065        // - Ignore "bogus" unrelated sub-graph: if the `clone_data` contains more
1066        //   segments then needed, they will be simply ignored.
1067        //
1068        // The implementation (of using a stack) is similar to `IdMap::assign_head`,
1069        // but insert a segment at a time, not a vertex at a time.
1070        //
1071        // Only the MASTER group supports laziness. So we only care about it.
1072        for (head, opts) in heads.vertex_options() {
1073            let mut stack: Vec<Id> = vec![];
1074            if let Some(head_server_id) = server.id_by_name(&head) {
1075                let _head_server_seg = server.seg_containing_id(head_server_id)?;
1076                stack.push(head_server_id);
1077            }
1078
1079            while let Some(server_high) = stack.pop() {
1080                let mut server_seg = server.seg_containing_id(server_high)?;
1081                if server_high < server_seg.high {
1082                    // Split the segment for more efficient high level segments.
1083                    let seg_high = server_seg.high;
1084                    server.split_seg(seg_high, server_high);
1085                    server_seg = server.seg_containing_id(server_high)?;
1086                    assert_eq!(server_high, server_seg.high);
1087                }
1088                let high_vertex = server.name_by_id(server_high);
1089                let client_high_id = new
1090                    .map
1091                    .vertex_id_with_max_group(&high_vertex, Group::MAX)
1092                    .await?;
1093                match client_high_id {
1094                    Some(id) if id.group() == Group::MASTER => {
1095                        // `server_seg` is present in MASTER group (previously inserted
1096                        // by this loop). No need to insert or visit parents.
1097                        continue;
1098                    }
1099                    Some(id) => {
1100                        // `id` in non-MASTER group. This should not really happen because we have
1101                        // checked all "roots" are missing in the local graph. See `NeedSlowPath`
1102                        // above.
1103                        let e = NeedSlowPath(format!(
1104                            "{:?} exists in local graph as {:?} - fast path requires MASTER group",
1105                            &high_vertex, id
1106                        ));
1107                        return Err(e);
1108                    }
1109                    None => {}
1110                }
1111
1112                let parent_server_ids = &server_seg.parents;
1113                let parent_names: Vec<Vertex> = {
1114                    let iter = parent_server_ids.iter().map(|id| server.name_by_id(*id));
1115                    iter.collect()
1116                };
1117
1118                // The client parent ids in the MASTER group.
1119                let mut parent_client_ids = Vec::new();
1120                let mut missng_parent_server_ids = Vec::new();
1121
1122                // Calculate `parent_client_ids`, and `missng_parent_server_ids`.
1123                // Intentionally using `new.map` not `new` to bypass remote lookups.
1124                {
1125                    let client_id_res = new.map.vertex_id_batch(&parent_names).await?;
1126                    assert_eq!(client_id_res.len(), parent_server_ids.len());
1127                    for (res, &server_id) in client_id_res.into_iter().zip(parent_server_ids) {
1128                        match res {
1129                            Ok(id) if id.group() != Group::MASTER => {
1130                                return Err(NeedSlowPath(format!(
1131                                    "{:?} exists id in local graph as {:?} - fast path requires MASTER group",
1132                                    &parent_names, id
1133                                )));
1134                            }
1135                            Ok(id) => {
1136                                parent_client_ids.push(id);
1137                            }
1138                            Err(crate::Error::VertexNotFound(_)) => {
1139                                missng_parent_server_ids.push(server_id);
1140                            }
1141                            Err(e) => return Err(e),
1142                        }
1143                    }
1144                }
1145
1146                if !missng_parent_server_ids.is_empty() {
1147                    // Parents are not ready. Needs revisit this segment after inserting parents.
1148                    stack.push(server_high);
1149                    // Insert missing parents.
1150                    // First parent, first insertion.
1151                    for &server_id in missng_parent_server_ids.iter().rev() {
1152                        stack.push(server_id);
1153                    }
1154                    continue;
1155                }
1156
1157                // All parents are present. Time to insert this segment.
1158                // Find a suitable low..=high range.
1159                let candidate_id = match parent_client_ids.iter().max().copied() {
1160                    None => Group::MASTER.min_id(),
1161                    Some(id) => id + 1,
1162                };
1163                let size = server_seg.high.0 - server_seg.low.0 + 1;
1164                let span = find_free_span(&taken, candidate_id, size, false);
1165
1166                // Map the server_seg.low..=server_seg.high to client span.low..=span.high.
1167                // Insert to IdMap.
1168                for (&server_id, name) in server.idmap_by_id.range(server_seg.low..=server_seg.high)
1169                {
1170                    let client_id = server_id + span.low.0 - server_seg.low.0;
1171                    if client_id.group() != Group::MASTER {
1172                        return Err(crate::Error::IdOverflow(Group::MASTER));
1173                    }
1174                    new.map.insert(client_id, name.as_ref()).await?;
1175                }
1176
1177                // Prepare insertion to IdDag.
1178                prepared_client_segments.push_segment(span.low, span.high, &parent_client_ids);
1179
1180                // Mark the range as taken.
1181                taken.push(span);
1182            }
1183
1184            // Consider reservation for `head` by updating `taken`.
1185            if opts.reserve_size > 0 {
1186                let head_client_id = new.map.vertex_id(head).await?;
1187                let span = find_free_span(&taken, head_client_id + 1, opts.reserve_size as _, true);
1188                taken.push(span);
1189            }
1190        }
1191
1192        new.dag
1193            .build_segments_from_prepared_flat_segments(&prepared_client_segments)?;
1194
1195        // Some "missing" vertexes might be imported.
1196        new.invalidate_missing_vertex_cache();
1197
1198        if cfg!(debug_assertions) {
1199            new.verify_missing().await?;
1200        }
1201
1202        new.persist(lock, map_lock, dag_lock)?;
1203
1204        // Update maintained VIRTUAL group.
1205        new.maybe_recreate_virtual_group().await?;
1206
1207        *self = new;
1208        Ok(())
1209    }
1210}
1211
1212#[async_trait::async_trait]
1213impl<IS, M, P, S> DagExportCloneData for AbstractDag<IdDag<IS>, M, P, S>
1214where
1215    IS: IdDagStore,
1216    IdDag<IS>: TryClone,
1217    M: IdConvert + TryClone + Send + Sync + 'static,
1218    P: TryClone + Send + Sync + 'static,
1219    S: TryClone + Send + Sync + 'static,
1220{
1221    async fn export_clone_data(&self) -> Result<CloneData<Vertex>> {
1222        let idmap: BTreeMap<Id, Vertex> = {
1223            let ids: Vec<Id> = self.dag.universal_ids()?.into_iter().collect();
1224            tracing::debug!("export: {} universally known vertexes", ids.len());
1225            let names = {
1226                let fallible_names = self.vertex_name_batch(&ids).await?;
1227                let mut names = Vec::with_capacity(fallible_names.len());
1228                for name in fallible_names {
1229                    names.push(name?);
1230                }
1231                names
1232            };
1233            ids.into_iter().zip(names).collect()
1234        };
1235
1236        let flat_segments: PreparedFlatSegments = {
1237            let segments = self.dag.next_segments(Id::MIN, 0)?;
1238            let mut prepared = Vec::with_capacity(segments.len());
1239            for segment in segments {
1240                let span = segment.span()?;
1241                let parents = segment.parents()?;
1242                prepared.push(FlatSegment {
1243                    low: span.low,
1244                    high: span.high,
1245                    parents,
1246                });
1247            }
1248            PreparedFlatSegments {
1249                segments: prepared.into_iter().collect(),
1250            }
1251        };
1252
1253        let data = CloneData {
1254            flat_segments,
1255            idmap,
1256        };
1257        Ok(data)
1258    }
1259}
1260
1261#[async_trait::async_trait]
1262impl<IS, M, P, S> DagExportPullData for AbstractDag<IdDag<IS>, M, P, S>
1263where
1264    IS: IdDagStore,
1265    IdDag<IS>: TryClone,
1266    M: IdConvert + TryClone + Send + Sync + 'static,
1267    P: TryClone + Send + Sync + 'static,
1268    S: TryClone + Send + Sync + 'static,
1269{
1270    async fn export_pull_data(&self, set: &Set) -> Result<CloneData<Vertex>> {
1271        let id_set = self.to_id_set(set).await?;
1272
1273        let flat_segments = self.dag.idset_to_flat_segments(id_set)?;
1274        let ids: Vec<_> = flat_segments.parents_head_and_roots().into_iter().collect();
1275
1276        let idmap: BTreeMap<Id, Vertex> = {
1277            tracing::debug!("pull: {} vertexes in idmap", ids.len());
1278            let names = {
1279                let fallible_names = self.vertex_name_batch(&ids).await?;
1280                let mut names = Vec::with_capacity(fallible_names.len());
1281                for name in fallible_names {
1282                    names.push(name?);
1283                }
1284                names
1285            };
1286            assert_eq!(ids.len(), names.len());
1287            ids.into_iter().zip(names).collect()
1288        };
1289
1290        let data = CloneData {
1291            flat_segments,
1292            idmap,
1293        };
1294        Ok(data)
1295    }
1296}
1297
1298impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
1299where
1300    IS: IdDagStore,
1301    IdDag<IS>: TryClone,
1302    M: TryClone + Send + Sync,
1303    P: TryClone + Send + Sync,
1304    S: TryClone + Send + Sync,
1305{
1306    /// Invalidate cached content. Call this before changing the graph
1307    /// so `version` in `snapshot` is dropped, and `version.bump()` might
1308    /// have a faster path.
1309    ///
1310    /// Forgetting to call this function might hurt performance a bit, but does
1311    /// not affect correctness.
1312    fn invalidate_snapshot(&mut self) {
1313        *self.snapshot.write().unwrap() = None;
1314    }
1315
1316    fn invalidate_missing_vertex_cache(&mut self) {
1317        tracing::debug!(target: "dag::cache", "cleared missing cache");
1318        *self.missing_vertexes_confirmed_by_remote.write().unwrap() = Default::default();
1319    }
1320
1321    fn invalidate_overlay_map(&mut self) -> Result<()> {
1322        self.overlay_map = Default::default();
1323        self.update_overlay_map_id_set()?;
1324        tracing::debug!(target: "dag::cache", "cleared overlay map cache");
1325        Ok(())
1326    }
1327
1328    fn update_overlay_map_id_set(&mut self) -> Result<()> {
1329        self.overlay_map_id_set = self.dag.master_group()?;
1330        Ok(())
1331    }
1332
1333    /// Attempt to get a snapshot of this graph.
1334    pub(crate) fn try_snapshot(&self) -> Result<Arc<Self>> {
1335        if let Some(s) = self.snapshot.read().unwrap().deref() {
1336            if s.dag.version() == self.dag.version() {
1337                return Ok(Arc::clone(s));
1338            }
1339        }
1340
1341        let mut snapshot = self.snapshot.write().unwrap();
1342        match snapshot.deref() {
1343            Some(s) if s.dag.version() == self.dag.version() => Ok(s.clone()),
1344            _ => {
1345                let cloned = Self {
1346                    dag: self.dag.try_clone()?,
1347                    map: self.map.try_clone()?,
1348                    snapshot: Default::default(),
1349                    pending_heads: self.pending_heads.clone(),
1350                    persisted_id_set: self.persisted_id_set.clone(),
1351                    path: self.path.try_clone()?,
1352                    state: self.state.try_clone()?,
1353                    id: self.id.clone(),
1354                    // If we do deep clone here we can remove `overlay_map_next_id`
1355                    // protection. However that could be too expensive.
1356                    overlay_map: Arc::clone(&self.overlay_map),
1357                    overlay_map_id_set: self.overlay_map_id_set.clone(),
1358                    overlay_map_paths: Arc::clone(&self.overlay_map_paths),
1359                    remote_protocol: self.remote_protocol.clone(),
1360                    managed_virtual_group: self.managed_virtual_group.clone(),
1361                    missing_vertexes_confirmed_by_remote: Arc::clone(
1362                        &self.missing_vertexes_confirmed_by_remote,
1363                    ),
1364                    lifecycle_id: self.lifecycle_id.clone(),
1365                    internal_stats: Default::default(),
1366                };
1367                let result = Arc::new(cloned);
1368                *snapshot = Some(Arc::clone(&result));
1369                Ok(result)
1370            }
1371        }
1372    }
1373
1374    pub fn dag(&self) -> &IdDag<IS> {
1375        &self.dag
1376    }
1377
1378    pub fn map(&self) -> &M {
1379        &self.map
1380    }
1381
1382    pub(crate) fn get_remote_protocol(&self) -> Arc<dyn RemoteIdConvertProtocol> {
1383        self.remote_protocol.clone()
1384    }
1385}
1386
1387impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
1388where
1389    IS: IdDagStore,
1390    IdDag<IS>: TryClone,
1391    M: TryClone + IdMapAssignHead + Send + Sync + 'static,
1392    P: TryClone + Send + Sync + 'static,
1393    S: TryClone + Send + Sync + 'static,
1394{
1395    async fn populate_missing_vertexes_for_add_heads(
1396        &mut self,
1397        parents: &dyn Parents,
1398        heads: &[Vertex],
1399    ) -> Result<()> {
1400        if self.is_vertex_lazy() {
1401            let unassigned = calculate_definitely_unassigned_vertexes(self, parents, heads).await?;
1402            let mut missing = self.missing_vertexes_confirmed_by_remote.write().unwrap();
1403            for v in unassigned {
1404                if missing.insert(v.clone()) {
1405                    tracing::trace!(target: "dag::cache", "cached missing {:?} (definitely missing)", &v);
1406                }
1407            }
1408        }
1409        Ok(())
1410    }
1411}
1412
1413/// Calculate vertexes that are definitely not assigned (not in the IdMap,
1414/// and not in the lazy part of the IdMap) according to
1415/// `hint_pending_subdag`. This does not report all unassigned vertexes.
1416/// But the reported vertexes are guaranteed not assigned.
1417///
1418/// If X is assigned, then X's parents must have been assigned.
1419/// If X is not assigned, then all X's descendants are not assigned.
1420///
1421/// This function visits the "roots" of "parents", and if they are not assigned,
1422/// then add their descendants to the "unassigned" result set.
1423async fn calculate_definitely_unassigned_vertexes<IS, M, P, S>(
1424    this: &AbstractDag<IdDag<IS>, M, P, S>,
1425    parents: &dyn Parents,
1426    heads: &[Vertex],
1427) -> Result<Vec<Vertex>>
1428where
1429    IS: IdDagStore,
1430    IdDag<IS>: TryClone,
1431    M: TryClone + IdMapAssignHead + Send + Sync + 'static,
1432    P: TryClone + Send + Sync + 'static,
1433    S: TryClone + Send + Sync + 'static,
1434{
1435    // subdag: vertexes to insert
1436    //
1437    // For example, when adding C---D to the graph A---B:
1438    //
1439    //      A---B
1440    //           \
1441    //            C---D
1442    //
1443    // The subdag is C---D (C does not have parent).
1444    //
1445    // Extra checks are needed because upon reload, the main graph
1446    // A---B might already contain part of the subdag to be added.
1447    let subdag = parents.hint_subdag_for_insertion(heads).await?;
1448
1449    let mut remaining = subdag.all().await?;
1450    let mut unassigned = Set::empty();
1451
1452    // For lazy graph, avoid some remote lookups by figuring out
1453    // some definitely unassigned (missing) vertexes. For example,
1454    //
1455    //      A---B---C
1456    //           \
1457    //            D---E
1458    //
1459    // When adding D---E (subdag, new vertex that might trigger remote
1460    // lookup) with parent B to the main graph (A--B--C),
1461    // 1. If B exists, and is not in the master group, then B and its
1462    //    descendants cannot be not lazy, and there is no need to lookup
1463    //    D remotely.
1464    // 2. If B exists, and is in the master group, and all its children
1465    //    except D (i.e. C) are known locally, and the vertex name of D
1466    //    does not match other children (C), we know that D cannot be
1467    //    in the lazy part of the main graph, and can skip the remote
1468    //    lookup.
1469    let mut unassigned_roots = Vec::new();
1470    if this.is_vertex_lazy() {
1471        let roots = subdag.roots(remaining.clone()).await?;
1472        let mut roots_iter = roots.iter().await?;
1473        while let Some(root) = roots_iter.next().await {
1474            let root = root?;
1475
1476            // Do a local "contains" check.
1477            if matches!(
1478                &this.contains_vertex_name_locally(&[root.clone()]).await?[..],
1479                [true]
1480            ) {
1481                tracing::debug!(target: "dag::definitelymissing", "root {:?} is already known", &root);
1482                continue;
1483            }
1484
1485            let root_parents_id_set = {
1486                let root_parents = parents.parent_names(root.clone()).await?;
1487                let root_parents_set = match this.sort(&Set::from_static_names(root_parents)).await
1488                {
1489                    Ok(set) => set,
1490                    Err(_) => {
1491                        tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear (parents cannot be resolved)", &root);
1492                        continue;
1493                    }
1494                };
1495                this.to_id_set(&root_parents_set).await?
1496            };
1497
1498            // If there are no parents of `root`, we cannot confidently test
1499            // whether `root` is missing or not.
1500            if root_parents_id_set.is_empty() {
1501                tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear (no parents)", &root);
1502                continue;
1503            }
1504
1505            // All parents of `root` are non-lazy.
1506            // So `root` is non-lazy and the local "contains" check is the same
1507            // as a remote "contains" check.
1508            if root_parents_id_set
1509                .iter_desc()
1510                .all(|i| i.group() > Group::MASTER)
1511            {
1512                tracing::debug!(target: "dag::definitelymissing", "root {:?} is not assigned (non-lazy parent)", &root);
1513                unassigned_roots.push(root);
1514                continue;
1515            }
1516
1517            // All children of lazy parents of `root` are known locally.
1518            // So `root` cannot match an existing vertex in the lazy graph.
1519            let children_ids: Vec<Id> = this
1520                .dag
1521                .children(root_parents_id_set)?
1522                .iter_desc()
1523                .collect();
1524            if this
1525                .map
1526                .contains_vertex_id_locally(&children_ids)
1527                .await?
1528                .iter()
1529                .all(|b| *b)
1530            {
1531                tracing::debug!(target: "dag::definitelymissing", "root {:?} is not assigned (children of parents are known)", &root);
1532                unassigned_roots.push(root);
1533                continue;
1534            }
1535
1536            tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear", &root);
1537        }
1538
1539        if !unassigned_roots.is_empty() {
1540            unassigned = subdag
1541                .descendants(Set::from_static_names(unassigned_roots))
1542                .await?;
1543            remaining = remaining.difference(&unassigned);
1544        }
1545    }
1546
1547    // Figure out unassigned (missing) vertexes that do need to be inserted.
1548    // This is done via utils::filter_known.
1549    let filter_known = |sample: &[Vertex]| -> BoxFuture<Result<Vec<Vertex>>> {
1550        let sample = sample.to_vec();
1551        async {
1552            let known_bools: Vec<bool> = {
1553                let ids = this.vertex_id_batch(&sample).await?;
1554                ids.into_iter().map(|i| i.is_ok()).collect()
1555            };
1556            debug_assert_eq!(sample.len(), known_bools.len());
1557            let known = sample
1558                .into_iter()
1559                .zip(known_bools)
1560                .filter_map(|(v, b)| if b { Some(v) } else { None })
1561                .collect();
1562            Ok(known)
1563        }
1564        .boxed()
1565    };
1566    let assigned = utils::filter_known(remaining.clone(), &filter_known).await?;
1567    unassigned = unassigned.union(&remaining.difference(&assigned));
1568    tracing::debug!(target: "dag::definitelymissing", "unassigned (missing): {:?}", &unassigned);
1569
1570    let unassigned = unassigned.iter().await?.try_collect().await?;
1571    Ok(unassigned)
1572}
1573
1574// The "client" Dag. Using a remote protocol to fill lazy part of the vertexes.
1575impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
1576where
1577    IS: IdDagStore,
1578    IdDag<IS>: TryClone,
1579    M: IdConvert + TryClone + Send + Sync,
1580    P: TryClone + Send + Sync,
1581    S: TryClone + Send + Sync,
1582{
1583    /// Resolve vertexes remotely and cache the result in the overlay map.
1584    /// Return the resolved ids in the given order. Not all names are resolved.
1585    async fn resolve_vertexes_remotely(&self, names: &[Vertex]) -> Result<Vec<Option<Id>>> {
1586        if names.is_empty() {
1587            return Ok(Vec::new());
1588        }
1589        if is_remote_protocol_disabled() {
1590            return Err(io::Error::new(
1591                io::ErrorKind::WouldBlock,
1592                "resolving vertexes remotely disabled",
1593            )
1594            .into());
1595        }
1596        if names.len() < 30 {
1597            tracing::debug!(target: "dag::protocol", "resolve names {:?} remotely", &names);
1598        } else {
1599            tracing::debug!(target: "dag::protocol", "resolve names ({}) remotely", names.len());
1600        }
1601        crate::failpoint!("dag-resolve-vertexes-remotely");
1602        let request: protocol::RequestNameToLocation =
1603            (self.map(), self.dag()).process(names.to_vec()).await?;
1604        let path_names = self
1605            .remote_protocol
1606            .resolve_names_to_relative_paths(request.heads, request.names)
1607            .await?;
1608        self.insert_relative_paths(path_names).await?;
1609        let overlay = self.overlay_map.read().unwrap();
1610        let mut ids = Vec::with_capacity(names.len());
1611        let mut missing = self.missing_vertexes_confirmed_by_remote.write().unwrap();
1612        for name in names {
1613            if let Some(id) = overlay.lookup_vertex_id(name) {
1614                ids.push(Some(id));
1615            } else {
1616                tracing::trace!(target: "dag::cache", "cached missing {:?} (server confirmed)", &name);
1617                missing.insert(name.clone());
1618                ids.push(None);
1619            }
1620        }
1621        Ok(ids)
1622    }
1623
1624    /// Resolve ids remotely and cache the result in the overlay map.
1625    /// Return the resolved ids in the given order. All ids must be resolved.
1626    async fn resolve_ids_remotely(&self, ids: &[Id]) -> Result<Vec<Vertex>> {
1627        if ids.is_empty() {
1628            return Ok(Vec::new());
1629        }
1630        if is_remote_protocol_disabled() {
1631            return Err(io::Error::new(
1632                io::ErrorKind::WouldBlock,
1633                "resolving ids remotely disabled",
1634            )
1635            .into());
1636        }
1637        if ids.len() < 30 {
1638            tracing::debug!(target: "dag::protocol", "resolve ids {:?} remotely", &ids);
1639        } else {
1640            tracing::debug!(target: "dag::protocol", "resolve ids ({}) remotely", ids.len());
1641        }
1642        crate::failpoint!("dag-resolve-ids-remotely");
1643        let request: protocol::RequestLocationToName = (self.map(), self.dag())
1644            .process(IdSet::from_spans(ids.iter().copied()))
1645            .await?;
1646        let path_names = self
1647            .remote_protocol
1648            .resolve_relative_paths_to_names(request.paths)
1649            .await?;
1650        self.insert_relative_paths(path_names).await?;
1651        let overlay = self.overlay_map.read().unwrap();
1652        let mut names = Vec::with_capacity(ids.len());
1653        for &id in ids {
1654            if let Some(name) = overlay.lookup_vertex_name(id).cloned() {
1655                names.push(name);
1656            } else {
1657                return id.not_found();
1658            }
1659        }
1660        Ok(names)
1661    }
1662
1663    /// Insert `x~n` relative paths to the overlay IdMap.
1664    async fn insert_relative_paths(
1665        &self,
1666        path_names: Vec<(AncestorPath, Vec<Vertex>)>,
1667    ) -> Result<()> {
1668        if path_names.is_empty() {
1669            return Ok(());
1670        }
1671        let to_insert: Vec<(Id, Vertex)> = calculate_id_name_from_paths(
1672            self.map(),
1673            self.dag().deref(),
1674            &self.overlay_map_id_set,
1675            &path_names,
1676        )
1677        .await?;
1678
1679        let mut paths = self.overlay_map_paths.lock().unwrap();
1680        paths.extend(path_names);
1681        drop(paths);
1682
1683        let mut overlay = self.overlay_map.write().unwrap();
1684        for (id, name) in to_insert {
1685            tracing::trace!(target: "dag::cache", "cached mapping {:?} <=> {:?}", id, &name);
1686            overlay.insert_vertex_id_name(id, name);
1687        }
1688
1689        Ok(())
1690    }
1691}
1692
1693/// Calculate (id, name) pairs to insert from (path, [name]) pairs.
1694async fn calculate_id_name_from_paths(
1695    map: &dyn IdConvert,
1696    dag: &dyn IdDagAlgorithm,
1697    overlay_map_id_set: &IdSet,
1698    path_names: &[(AncestorPath, Vec<Vertex>)],
1699) -> Result<Vec<(Id, Vertex)>> {
1700    if path_names.is_empty() {
1701        return Ok(Vec::new());
1702    }
1703    let mut to_insert: Vec<(Id, Vertex)> =
1704        Vec::with_capacity(path_names.iter().map(|(_, ns)| ns.len()).sum());
1705    for (path, names) in path_names {
1706        if names.is_empty() {
1707            continue;
1708        }
1709        // Resolve x~n to id. x is "universally known" so it should exist locally.
1710        let x_id = map.vertex_id(path.x.clone()).await.map_err(|e| {
1711            let msg = format!(
1712                concat!(
1713                    "Cannot resolve x ({:?}) in x~n locally. The x is expected to be known ",
1714                    "locally and is populated at clone time. This x~n is used to convert ",
1715                    "{:?} to a location in the graph. (Check initial clone logic) ",
1716                    "(Error: {})",
1717                ),
1718                &path.x, &names[0], e
1719            );
1720            crate::Error::Programming(msg)
1721        })?;
1722        tracing::trace!(
1723            "resolve path {:?} names {:?} (x = {}) to overlay",
1724            &path,
1725            &names,
1726            x_id
1727        );
1728        if !overlay_map_id_set.contains(x_id) {
1729            crate::failpoint!("dag-error-x-n-overflow");
1730            let msg = format!(
1731                concat!(
1732                    "Server returned x~n (x = {:?} {}, n = {}). But x is out of range ",
1733                    "({:?}). This is not expected and indicates some ",
1734                    "logic error on the server side."
1735                ),
1736                &path.x, x_id, path.n, overlay_map_id_set
1737            );
1738            return programming(msg);
1739        }
1740        let mut id = match dag.first_ancestor_nth(x_id, path.n).map_err(|e| {
1741            let msg = format!(
1742                concat!(
1743                    "Cannot resolve x~n (x = {:?} {}, n = {}): {}. ",
1744                    "This indicates the client-side graph is somewhat incompatible from the ",
1745                    "server-side graph. Something (server-side or client-side) was probably ",
1746                    "seriously wrong before this error."
1747                ),
1748                &path.x, x_id, path.n, e
1749            );
1750            crate::Error::Programming(msg)
1751        }) {
1752            Err(e) => {
1753                crate::failpoint!("dag-error-x-n-unresolvable");
1754                return Err(e);
1755            }
1756            Ok(id) => id,
1757        };
1758        if names.len() < 30 {
1759            tracing::debug!("resolved {:?} => {} {:?}", &path, id, &names);
1760        } else {
1761            tracing::debug!("resolved {:?} => {} {:?} ...", &path, id, &names[0]);
1762        }
1763        for (i, name) in names.iter().enumerate() {
1764            if i > 0 {
1765                // Follow id's first parent.
1766                id = match dag.parent_ids(id)?.first().cloned() {
1767                    Some(id) => id,
1768                    None => {
1769                        let msg = format!(
1770                            concat!(
1771                                "Cannot resolve x~(n+i) (x = {:?} {}, n = {}, i = {}) locally. ",
1772                                "This indicates the client-side graph is somewhat incompatible ",
1773                                "from the server-side graph. Something (server-side or ",
1774                                "client-side) was probably seriously wrong before this error."
1775                            ),
1776                            &path.x, x_id, path.n, i
1777                        );
1778                        return programming(msg);
1779                    }
1780                }
1781            }
1782
1783            tracing::trace!(" resolved {:?} = {:?}", id, &name,);
1784            to_insert.push((id, name.clone()));
1785        }
1786    }
1787    Ok(to_insert)
1788}
1789
1790// The server Dag. IdMap is complete. Provide APIs for client Dag to resolve vertexes.
1791// Currently mainly used for testing purpose.
1792#[async_trait::async_trait]
1793impl<IS, M, P, S> RemoteIdConvertProtocol for AbstractDag<IdDag<IS>, M, P, S>
1794where
1795    IS: IdDagStore,
1796    IdDag<IS>: TryClone,
1797    M: IdConvert + TryClone + Send + Sync + 'static,
1798    P: TryClone + Send + Sync + 'static,
1799    S: TryClone + Send + Sync + 'static,
1800{
1801    async fn resolve_names_to_relative_paths(
1802        &self,
1803        heads: Vec<Vertex>,
1804        names: Vec<Vertex>,
1805    ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
1806        let request = protocol::RequestNameToLocation { names, heads };
1807        let response: protocol::ResponseIdNamePair =
1808            (self.map(), self.dag()).process(request).await?;
1809        Ok(response.path_names)
1810    }
1811
1812    async fn resolve_relative_paths_to_names(
1813        &self,
1814        paths: Vec<AncestorPath>,
1815    ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
1816        let request = protocol::RequestLocationToName { paths };
1817        let response: protocol::ResponseIdNamePair =
1818            (self.map(), self.dag()).process(request).await?;
1819        Ok(response.path_names)
1820    }
1821}
1822
1823// On "snapshot".
1824#[async_trait::async_trait]
1825impl<IS, M, P, S> RemoteIdConvertProtocol for Arc<AbstractDag<IdDag<IS>, M, P, S>>
1826where
1827    IS: IdDagStore,
1828    IdDag<IS>: TryClone,
1829    M: IdConvert + TryClone + Send + Sync + 'static,
1830    P: TryClone + Send + Sync + 'static,
1831    S: TryClone + Send + Sync + 'static,
1832{
1833    async fn resolve_names_to_relative_paths(
1834        &self,
1835        heads: Vec<Vertex>,
1836        names: Vec<Vertex>,
1837    ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
1838        self.deref()
1839            .resolve_names_to_relative_paths(heads, names)
1840            .await
1841    }
1842
1843    async fn resolve_relative_paths_to_names(
1844        &self,
1845        paths: Vec<AncestorPath>,
1846    ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
1847        self.deref().resolve_relative_paths_to_names(paths).await
1848    }
1849}
1850
1851// Dag operations. Those are just simple wrappers around [`IdDag`].
1852// See [`IdDag`] for the actual implementations of these algorithms.
1853
1854/// DAG related read-only algorithms.
1855#[async_trait::async_trait]
1856impl<IS, M, P, S> DagAlgorithm for AbstractDag<IdDag<IS>, M, P, S>
1857where
1858    IS: IdDagStore,
1859    IdDag<IS>: TryClone + 'static,
1860    M: TryClone + IdConvert + Sync + Send + 'static,
1861    P: TryClone + Sync + Send + 'static,
1862    S: TryClone + Sync + Send + 'static,
1863{
1864    /// Sort a `Set` topologically.
1865    async fn sort(&self, set: &Set) -> Result<Set> {
1866        let hints = set.hints();
1867        if hints.contains(Flags::TOPO_DESC)
1868            && matches!(hints.dag_version(), Some(v) if v <= self.dag_version())
1869            && matches!(hints.id_map_version(), Some(v) if v <= self.map_version())
1870        {
1871            tracing::debug!(target: "dag::algo::sort", "sort({:6?}) (fast path)", set);
1872            return Ok(set.clone());
1873        } else if let Some(flat_set) = set.specialized_flatten_id() {
1874            let dag_version = flat_set.dag.dag_version();
1875            if dag_version <= self.dag_version() {
1876                let mut flat_set = flat_set.into_owned();
1877                flat_set.set_iteration_order(BasicIterationOrder::Desc);
1878                flat_set.map = self.id_map_snapshot()?;
1879                flat_set.dag = self.dag_snapshot()?;
1880                tracing::debug!(target: "dag::algo::sort", "sort({:6?}) (fast path 2)", set);
1881                return Ok(Set::from_query(flat_set));
1882            } else {
1883                tracing::info!(target: "dag::algo::sort", "sort({:6?}) (cannot use fast path 2 due to mismatched version)", set);
1884            }
1885        }
1886        tracing::warn!(target: "dag::algo::sort", "sort({:6?}) (slow path)", set);
1887        self.internal_stats
1888            .sort_slow_path_count
1889            .fetch_add(1, Ordering::Release);
1890        let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1891        let mut spans = IdSet::empty();
1892        let mut iter = set.iter().await?.chunks(1 << 17);
1893        while let Some(names) = iter.next().await {
1894            let names = names.into_iter().collect::<Result<Vec<_>>>()?;
1895            let ids = self.vertex_id_batch(&names).await?;
1896            for id in ids {
1897                spans.push(id?);
1898            }
1899        }
1900        let result = Set::from_id_set_dag(spans, self)?;
1901        result.hints().add_flags(flags);
1902        Ok(result)
1903    }
1904
1905    /// Get ordered parent vertexes.
1906    async fn parent_names(&self, name: Vertex) -> Result<Vec<Vertex>> {
1907        let id = self.vertex_id(name).await?;
1908        let parent_ids = self.dag().parent_ids(id)?;
1909        let mut result = Vec::with_capacity(parent_ids.len());
1910        for id in parent_ids {
1911            result.push(self.vertex_name(id).await?);
1912        }
1913        Ok(result)
1914    }
1915
1916    /// Returns a set that covers all vertexes tracked by this DAG.
1917    /// Excluding the virtual group.
1918    async fn all(&self) -> Result<Set> {
1919        let spans = self.dag().all()?;
1920        let result = Set::from_id_set_dag(spans, self)?;
1921        result.hints().add_flags(Flags::FULL);
1922        Ok(result)
1923    }
1924
1925    /// Returns a set that covers all vertexes in the master group.
1926    async fn master_group(&self) -> Result<Set> {
1927        let spans = self.dag().master_group()?;
1928        let result = Set::from_id_set_dag(spans, self)?;
1929        result.hints().add_flags(Flags::ANCESTORS);
1930        Ok(result)
1931    }
1932
1933    /// Returns a set that covers all vertexes in the virtual group.
1934    async fn virtual_group(&self) -> Result<Set> {
1935        let spans = self.dag().all_ids_in_groups(&[Group::VIRTUAL])?;
1936        let result = Set::from_id_set_dag(spans, self)?;
1937        Ok(result)
1938    }
1939
1940    /// Calculates all ancestors reachable from any name from the given set.
1941    async fn ancestors(&self, set: Set) -> Result<Set> {
1942        if set.hints().contains(Flags::ANCESTORS)
1943            && set.hints().dag_version() <= Some(self.dag_version())
1944        {
1945            return Ok(set);
1946        }
1947        let spans = self.to_id_set(&set).await?;
1948        let spans = self.dag().ancestors(spans)?;
1949        let result = Set::from_id_set_dag(spans, self)?;
1950        result.hints().add_flags(Flags::ANCESTORS);
1951        Ok(result)
1952    }
1953
1954    /// Like `ancestors` but follows only the first parents.
1955    async fn first_ancestors(&self, set: Set) -> Result<Set> {
1956        // If set == ancestors(set), then first_ancestors(set) == set.
1957        if set.hints().contains(Flags::ANCESTORS)
1958            && set.hints().dag_version() <= Some(self.dag_version())
1959        {
1960            return Ok(set);
1961        }
1962        let spans = self.to_id_set(&set).await?;
1963        let spans = self.dag().first_ancestors(spans)?;
1964        let result = Set::from_id_set_dag(spans, self)?;
1965        #[cfg(test)]
1966        {
1967            result.assert_eq(crate::default_impl::first_ancestors(self, set).await?);
1968        }
1969        Ok(result)
1970    }
1971
1972    /// Calculate merges within the given set.
1973    async fn merges(&self, set: Set) -> Result<Set> {
1974        let spans = self.to_id_set(&set).await?;
1975        let spans = self.dag().merges(spans)?;
1976        let result = Set::from_id_set_dag(spans, self)?;
1977        #[cfg(test)]
1978        {
1979            result.assert_eq(crate::default_impl::merges(self, set).await?);
1980        }
1981        Ok(result)
1982    }
1983
1984    /// Calculates parents of the given set.
1985    ///
1986    /// Note: Parent order is not preserved. Use [`Dag::parent_names`]
1987    /// to preserve order.
1988    async fn parents(&self, set: Set) -> Result<Set> {
1989        // Preserve ANCESTORS flag. If ancestors(x) == x, then ancestors(parents(x)) == parents(x).
1990        let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1991        let spans = self.dag().parents(self.to_id_set(&set).await?)?;
1992        let result = Set::from_id_set_dag(spans, self)?;
1993        result.hints().add_flags(flags);
1994        #[cfg(test)]
1995        {
1996            result.assert_eq(crate::default_impl::parents(self, set).await?);
1997        }
1998        Ok(result)
1999    }
2000
2001    /// Calculates the n-th first ancestor.
2002    async fn first_ancestor_nth(&self, name: Vertex, n: u64) -> Result<Option<Vertex>> {
2003        #[cfg(test)]
2004        let name2 = name.clone();
2005        let id = self.vertex_id(name).await?;
2006        let id = self.dag().try_first_ancestor_nth(id, n)?;
2007        let result = match id {
2008            None => None,
2009            Some(id) => Some(self.vertex_name(id).await?),
2010        };
2011        #[cfg(test)]
2012        {
2013            let result2 = crate::default_impl::first_ancestor_nth(self, name2, n).await?;
2014            assert_eq!(result, result2);
2015        }
2016        Ok(result)
2017    }
2018
2019    /// Calculates heads of the given set.
2020    async fn heads(&self, set: Set) -> Result<Set> {
2021        if set.hints().contains(Flags::ANCESTORS)
2022            && set.hints().dag_version() <= Some(self.dag_version())
2023        {
2024            // heads_ancestors is faster.
2025            return self.heads_ancestors(set).await;
2026        }
2027        let spans = self.dag().heads(self.to_id_set(&set).await?)?;
2028        let result = Set::from_id_set_dag(spans, self)?;
2029        #[cfg(test)]
2030        {
2031            result.assert_eq(crate::default_impl::heads(self, set).await?);
2032        }
2033        Ok(result)
2034    }
2035
2036    /// Calculates children of the given set.
2037    async fn children(&self, set: Set) -> Result<Set> {
2038        let spans = self.dag().children(self.to_id_set(&set).await?)?;
2039        let result = Set::from_id_set_dag(spans, self)?;
2040        Ok(result)
2041    }
2042
2043    /// Calculates roots of the given set.
2044    async fn roots(&self, set: Set) -> Result<Set> {
2045        let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
2046        let spans = self.dag().roots(self.to_id_set(&set).await?)?;
2047        let result = Set::from_id_set_dag(spans, self)?;
2048        result.hints().add_flags(flags);
2049        #[cfg(test)]
2050        {
2051            result.assert_eq(crate::default_impl::roots(self, set).await?);
2052        }
2053        Ok(result)
2054    }
2055
2056    /// Calculates one "greatest common ancestor" of the given set.
2057    ///
2058    /// If there are no common ancestors, return None.
2059    /// If there are multiple greatest common ancestors, pick one arbitrarily.
2060    /// Use `gca_all` to get all of them.
2061    async fn gca_one(&self, set: Set) -> Result<Option<Vertex>> {
2062        let result: Option<Vertex> = match self.dag().gca_one(self.to_id_set(&set).await?)? {
2063            None => None,
2064            Some(id) => Some(self.vertex_name(id).await?),
2065        };
2066        #[cfg(test)]
2067        {
2068            assert_eq!(&result, &crate::default_impl::gca_one(self, set).await?);
2069        }
2070        Ok(result)
2071    }
2072
2073    /// Calculates all "greatest common ancestor"s of the given set.
2074    /// `gca_one` is faster if an arbitrary answer is ok.
2075    async fn gca_all(&self, set: Set) -> Result<Set> {
2076        let spans = self.dag().gca_all(self.to_id_set(&set).await?)?;
2077        let result = Set::from_id_set_dag(spans, self)?;
2078        #[cfg(test)]
2079        {
2080            result.assert_eq(crate::default_impl::gca_all(self, set).await?);
2081        }
2082        Ok(result)
2083    }
2084
2085    /// Calculates all common ancestors of the given set.
2086    async fn common_ancestors(&self, set: Set) -> Result<Set> {
2087        let spans = self.dag().common_ancestors(self.to_id_set(&set).await?)?;
2088        let result = Set::from_id_set_dag(spans, self)?;
2089        result.hints().add_flags(Flags::ANCESTORS);
2090        #[cfg(test)]
2091        {
2092            result.assert_eq(crate::default_impl::common_ancestors(self, set).await?);
2093        }
2094        Ok(result)
2095    }
2096
2097    /// Tests if `ancestor` is an ancestor of `descendant`.
2098    async fn is_ancestor(&self, ancestor: Vertex, descendant: Vertex) -> Result<bool> {
2099        #[cfg(test)]
2100        let result2 =
2101            crate::default_impl::is_ancestor(self, ancestor.clone(), descendant.clone()).await?;
2102        let ancestor_id = self.vertex_id(ancestor).await?;
2103        let descendant_id = self.vertex_id(descendant).await?;
2104        let result = self.dag().is_ancestor(ancestor_id, descendant_id)?;
2105        #[cfg(test)]
2106        {
2107            assert_eq!(&result, &result2);
2108        }
2109        Ok(result)
2110    }
2111
2112    /// Calculates "heads" of the ancestors of the given set. That is,
2113    /// Find Y, which is the smallest subset of set X, where `ancestors(Y)` is
2114    /// `ancestors(X)`.
2115    ///
2116    /// This is faster than calculating `heads(ancestors(set))`.
2117    ///
2118    /// This is different from `heads`. In case set contains X and Y, and Y is
2119    /// an ancestor of X, but not the immediate ancestor, `heads` will include
2120    /// Y while this function won't.
2121    async fn heads_ancestors(&self, set: Set) -> Result<Set> {
2122        let spans = self.dag().heads_ancestors(self.to_id_set(&set).await?)?;
2123        let result = Set::from_id_set_dag(spans, self)?;
2124        #[cfg(test)]
2125        {
2126            // default_impl::heads_ancestors calls `heads` if `Flags::ANCESTORS`
2127            // is set. Prevent infinite loop.
2128            if !set.hints().contains(Flags::ANCESTORS) {
2129                result.assert_eq(crate::default_impl::heads_ancestors(self, set).await?);
2130            }
2131        }
2132        Ok(result)
2133    }
2134
2135    /// Calculates the "dag range" - vertexes reachable from both sides.
2136    async fn range(&self, roots: Set, heads: Set) -> Result<Set> {
2137        let roots = self.to_id_set(&roots).await?;
2138        let heads = self.to_id_set(&heads).await?;
2139        let spans = self.dag().range(roots, heads)?;
2140        let result = Set::from_id_set_dag(spans, self)?;
2141        Ok(result)
2142    }
2143
2144    /// Calculates the descendants of the given set.
2145    async fn descendants(&self, set: Set) -> Result<Set> {
2146        let spans = self.dag().descendants(self.to_id_set(&set).await?)?;
2147        let result = Set::from_id_set_dag(spans, self)?;
2148        Ok(result)
2149    }
2150
2151    async fn suggest_bisect(
2152        &self,
2153        roots: Set,
2154        heads: Set,
2155        skip: Set,
2156    ) -> Result<(Option<Vertex>, Set, Set)> {
2157        default_impl::suggest_bisect(self, roots, heads, skip).await
2158    }
2159
2160    /// Vertexes buffered in memory, not yet written to disk.
2161    async fn dirty(&self) -> Result<Set> {
2162        let all = self.dag().all()?;
2163        let spans = all.difference(&self.persisted_id_set);
2164        let set = Set::from_id_set_dag(spans, self)?;
2165        Ok(set)
2166    }
2167
2168    fn is_vertex_lazy(&self) -> bool {
2169        !self.remote_protocol.is_local()
2170    }
2171
2172    /// Get a snapshot of the current graph.
2173    fn dag_snapshot(&self) -> Result<Arc<dyn DagAlgorithm + Send + Sync>> {
2174        Ok(self.try_snapshot()? as Arc<dyn DagAlgorithm + Send + Sync>)
2175    }
2176
2177    fn id_dag_snapshot(&self) -> Result<Arc<dyn IdDagAlgorithm + Send + Sync>> {
2178        let store = self.dag.try_clone()?.store;
2179        Ok(Arc::new(store))
2180    }
2181
2182    fn dag_id(&self) -> &str {
2183        &self.id
2184    }
2185
2186    fn dag_version(&self) -> &VerLink {
2187        self.dag.version()
2188    }
2189}
2190
2191/// Extract the ANCESTORS flag if the set with the `hints` is bound to a
2192/// compatible DAG.
2193fn extract_ancestor_flag_if_compatible(hints: &Hints, dag_version: &VerLink) -> Flags {
2194    if hints.dag_version() <= Some(dag_version) {
2195        hints.flags() & Flags::ANCESTORS
2196    } else {
2197        Flags::empty()
2198    }
2199}
2200
2201#[async_trait::async_trait]
2202impl<I, M, P, S> PrefixLookup for AbstractDag<I, M, P, S>
2203where
2204    I: Send + Sync,
2205    M: PrefixLookup + Send + Sync,
2206    P: Send + Sync,
2207    S: Send + Sync,
2208{
2209    async fn vertexes_by_hex_prefix(&self, hex_prefix: &[u8], limit: usize) -> Result<Vec<Vertex>> {
2210        let mut list = self.map.vertexes_by_hex_prefix(hex_prefix, limit).await?;
2211        let overlay_list = self
2212            .overlay_map
2213            .read()
2214            .unwrap()
2215            .lookup_vertexes_by_hex_prefix(hex_prefix, limit)?;
2216        list.extend(overlay_list);
2217        list.sort_unstable();
2218        list.dedup();
2219        list.truncate(limit);
2220        Ok(list)
2221    }
2222}
2223
2224#[async_trait::async_trait]
2225impl<IS, M, P, S> IdConvert for AbstractDag<IdDag<IS>, M, P, S>
2226where
2227    IS: IdDagStore,
2228    IdDag<IS>: TryClone,
2229    M: IdConvert + TryClone + Send + Sync + 'static,
2230    P: TryClone + Send + Sync + 'static,
2231    S: TryClone + Send + Sync + 'static,
2232{
2233    async fn vertex_id(&self, name: Vertex) -> Result<Id> {
2234        match self.map.vertex_id(name.clone()).await {
2235            Ok(id) => Ok(id),
2236            Err(crate::Error::VertexNotFound(_)) if self.is_vertex_lazy() => {
2237                if let Some(id) = self.overlay_map.read().unwrap().lookup_vertex_id(&name) {
2238                    return Ok(id);
2239                }
2240                if self
2241                    .missing_vertexes_confirmed_by_remote
2242                    .read()
2243                    .unwrap()
2244                    .contains(&name)
2245                {
2246                    return name.not_found();
2247                }
2248                let ids = self.resolve_vertexes_remotely(&[name.clone()]).await?;
2249                if let Some(Some(id)) = ids.first() {
2250                    Ok(*id)
2251                } else {
2252                    // ids is empty.
2253                    name.not_found()
2254                }
2255            }
2256            Err(e) => Err(e),
2257        }
2258    }
2259
2260    async fn vertex_id_with_max_group(
2261        &self,
2262        name: &Vertex,
2263        max_group: Group,
2264    ) -> Result<Option<Id>> {
2265        match self.map.vertex_id_with_max_group(name, max_group).await {
2266            Ok(Some(id)) => Ok(Some(id)),
2267            Err(err) => Err(err),
2268            Ok(None) if self.is_vertex_lazy() => {
2269                // Not exist in max_group from local data.
2270                if let Some(id) = self.overlay_map.read().unwrap().lookup_vertex_id(name) {
2271                    return Ok(Some(id));
2272                }
2273                if self
2274                    .missing_vertexes_confirmed_by_remote
2275                    .read()
2276                    .unwrap()
2277                    .contains(name)
2278                {
2279                    return Ok(None);
2280                }
2281                if max_group != Group::MAX
2282                    && self
2283                        .map
2284                        .vertex_id_with_max_group(name, Group::MAX)
2285                        .await?
2286                        .is_some()
2287                {
2288                    // If the vertex exists in the non-master groups. Then it must be missing in the
2289                    // master group.
2290                    return Ok(None);
2291                }
2292                match self.resolve_vertexes_remotely(&[name.clone()]).await {
2293                    Ok(ids) => match ids.first() {
2294                        Some(Some(id)) => Ok(Some(*id)),
2295                        Some(None) | None => Ok(None),
2296                    },
2297                    Err(e) => Err(e),
2298                }
2299            }
2300            Ok(None) => Ok(None),
2301        }
2302    }
2303
2304    async fn vertex_name(&self, id: Id) -> Result<Vertex> {
2305        match self.map.vertex_name(id).await {
2306            Ok(name) => Ok(name),
2307            Err(crate::Error::IdNotFound(_)) if self.is_vertex_lazy() => {
2308                if let Some(name) = self
2309                    .overlay_map
2310                    .read()
2311                    .unwrap()
2312                    .lookup_vertex_name(id)
2313                    .cloned()
2314                {
2315                    return Ok(name);
2316                }
2317                // Only ids <= max(MASTER group) can be lazy.
2318                let max_master_id = self.dag.master_group()?.max();
2319                if Some(id) > max_master_id {
2320                    return id.not_found();
2321                }
2322                let names = self.resolve_ids_remotely(&[id]).await?;
2323                if let Some(name) = names.into_iter().next() {
2324                    Ok(name)
2325                } else {
2326                    id.not_found()
2327                }
2328            }
2329            Err(e) => Err(e),
2330        }
2331    }
2332
2333    async fn contains_vertex_name(&self, name: &Vertex) -> Result<bool> {
2334        match self.map.contains_vertex_name(name).await {
2335            Ok(true) => Ok(true),
2336            Ok(false) if self.is_vertex_lazy() => {
2337                if self
2338                    .overlay_map
2339                    .read()
2340                    .unwrap()
2341                    .lookup_vertex_id(name)
2342                    .is_some()
2343                {
2344                    return Ok(true);
2345                }
2346                if self
2347                    .missing_vertexes_confirmed_by_remote
2348                    .read()
2349                    .unwrap()
2350                    .contains(name)
2351                {
2352                    return Ok(false);
2353                }
2354                match self.resolve_vertexes_remotely(&[name.clone()]).await {
2355                    Ok(ids) => match ids.first() {
2356                        Some(Some(_)) => Ok(true),
2357                        Some(None) | None => Ok(false),
2358                    },
2359                    Err(e) => Err(e),
2360                }
2361            }
2362            Ok(false) => Ok(false),
2363            Err(e) => Err(e),
2364        }
2365    }
2366
2367    async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
2368        let mut list = self.map.contains_vertex_id_locally(ids).await?;
2369        let map = self.overlay_map.read().unwrap();
2370        for (b, id) in list.iter_mut().zip(ids.iter().copied()) {
2371            if !*b {
2372                *b = *b || map.has_vertex_id(id);
2373            }
2374        }
2375        Ok(list)
2376    }
2377
2378    async fn contains_vertex_name_locally(&self, names: &[Vertex]) -> Result<Vec<bool>> {
2379        tracing::trace!("contains_vertex_name_locally names: {:?}", &names);
2380        let mut list = self.map.contains_vertex_name_locally(names).await?;
2381        tracing::trace!("contains_vertex_name_locally list (local): {:?}", &list);
2382        assert_eq!(list.len(), names.len());
2383        let map = self.overlay_map.read().unwrap();
2384        for (b, name) in list.iter_mut().zip(names.iter()) {
2385            if !*b && map.has_vertex_name(name) {
2386                tracing::trace!("contains_vertex_name_locally overlay has {:?}", &name);
2387                *b = true;
2388            }
2389        }
2390        Ok(list)
2391    }
2392
2393    async fn vertex_name_batch(&self, ids: &[Id]) -> Result<Vec<Result<Vertex>>> {
2394        let mut list = self.map.vertex_name_batch(ids).await?;
2395        if self.is_vertex_lazy() {
2396            // Read from overlay map cache.
2397            {
2398                let map = self.overlay_map.read().unwrap();
2399                for (r, id) in list.iter_mut().zip(ids) {
2400                    if let Some(name) = map.lookup_vertex_name(*id).cloned() {
2401                        *r = Ok(name);
2402                    }
2403                }
2404            }
2405            // Read from missing_vertexes_confirmed_by_remote cache.
2406            let missing_indexes: Vec<usize> = {
2407                let max_master_id = self.dag.master_group()?.max();
2408                list.iter()
2409                    .enumerate()
2410                    .filter_map(|(i, r)| match r {
2411                        // Only resolve ids that are <= max(master) remotely.
2412                        Err(_) if Some(ids[i]) <= max_master_id => Some(i),
2413                        Err(_) | Ok(_) => None,
2414                    })
2415                    .collect()
2416            };
2417            let missing_ids: Vec<Id> = missing_indexes.iter().map(|i| ids[*i]).collect();
2418            let resolved = self.resolve_ids_remotely(&missing_ids).await?;
2419            for (i, name) in missing_indexes.into_iter().zip(resolved.into_iter()) {
2420                list[i] = Ok(name);
2421            }
2422        }
2423        Ok(list)
2424    }
2425
2426    async fn vertex_id_batch(&self, names: &[Vertex]) -> Result<Vec<Result<Id>>> {
2427        let mut list = self.map.vertex_id_batch(names).await?;
2428        if self.is_vertex_lazy() {
2429            // Read from overlay map cache.
2430            {
2431                let map = self.overlay_map.read().unwrap();
2432                for (r, name) in list.iter_mut().zip(names) {
2433                    if let Some(id) = map.lookup_vertex_id(name) {
2434                        *r = Ok(id);
2435                    }
2436                }
2437            }
2438            // Read from missing_vertexes_confirmed_by_remote cache.
2439            let missing_indexes: Vec<usize> = {
2440                let known_missing = self.missing_vertexes_confirmed_by_remote.read().unwrap();
2441                list.iter()
2442                    .enumerate()
2443                    .filter_map(|(i, r)| {
2444                        if r.is_err() && !known_missing.contains(&names[i]) {
2445                            Some(i)
2446                        } else {
2447                            None
2448                        }
2449                    })
2450                    .collect()
2451            };
2452            if !missing_indexes.is_empty() {
2453                let missing_names: Vec<Vertex> =
2454                    missing_indexes.iter().map(|i| names[*i].clone()).collect();
2455                let resolved = self.resolve_vertexes_remotely(&missing_names).await?;
2456                for (i, id) in missing_indexes.into_iter().zip(resolved.into_iter()) {
2457                    if let Some(id) = id {
2458                        list[i] = Ok(id);
2459                    }
2460                }
2461            }
2462        }
2463        Ok(list)
2464    }
2465
2466    fn map_id(&self) -> &str {
2467        self.map.map_id()
2468    }
2469
2470    fn map_version(&self) -> &VerLink {
2471        self.map.map_version()
2472    }
2473}
2474
2475impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
2476where
2477    IS: IdDagStore,
2478    IdDag<IS>: TryClone + 'static,
2479    M: TryClone + Persist + IdMapWrite + IdConvert + Sync + Send + 'static,
2480    P: TryClone + Sync + Send + 'static,
2481    S: TryClone + Sync + Send + 'static,
2482{
2483    /// Build IdMap and Segments for the given heads.
2484    /// Update IdMap and IdDag to include the given heads and their ancestors.
2485    ///
2486    /// Handle "reassign" cases. For example, when adding P to the master group
2487    /// and one of its parent N2 is in the non-master group:
2488    ///
2489    /// ```plain,ignore
2490    ///     1--2--3             3---P
2491    ///         \                  /
2492    ///          N1-N2-N3        N2
2493    /// ```
2494    ///
2495    /// To maintain topological order, N2 need to be re-assigned to the master
2496    /// group. This is done by temporarily removing N1-N2-N3, re-insert N1-N2
2497    /// as 4-5 to be able to insert P, then re-insert N3 in the non-master
2498    /// group:
2499    ///
2500    /// ```plain,ignore
2501    ///     1--2--3 --6 (P)
2502    ///         \    /
2503    ///          4--5 --N1
2504    /// ```
2505    async fn build_with_lock(
2506        &mut self,
2507        parents: &dyn Parents,
2508        heads: &VertexListWithOptions,
2509        map_lock: &M::Lock,
2510    ) -> Result<()> {
2511        // `std::borrow::Cow` without `Clone` constraint.
2512        enum Input<'a> {
2513            Borrowed(&'a dyn Parents, &'a VertexListWithOptions),
2514            Owned(Box<dyn Parents>, VertexListWithOptions),
2515        }
2516
2517        // Manual recursion. async fn does not support recursion.
2518        let mut stack = vec![Input::Borrowed(parents, heads)];
2519
2520        // Avoid infinite loop (buggy logic).
2521        let mut loop_count = 0;
2522
2523        while let Some(input) = stack.pop() {
2524            loop_count += 1;
2525            if loop_count > 2 {
2526                return bug("should not loop > 2 times (1st insertion+strip, 2nd reinsert)");
2527            }
2528
2529            let (parents, heads) = match &input {
2530                Input::Borrowed(p, h) => (*p, *h),
2531                Input::Owned(p, h) => (p.as_ref(), h),
2532            };
2533
2534            // Populate vertex negative cache to reduce round-trips doing remote lookups.
2535            if self.is_vertex_lazy() {
2536                let heads: Vec<Vertex> = heads.vertexes();
2537                self.populate_missing_vertexes_for_add_heads(parents, &heads)
2538                    .await?;
2539            }
2540
2541            // Backup, then remove vertexes that need to be reassigned. Actual reassignment
2542            // happens in the next loop iteration.
2543            let to_reassign: Set = self.find_vertexes_to_reassign(parents, heads).await?;
2544            if !to_reassign.is_empty().await? {
2545                let reinsert_heads: VertexListWithOptions = {
2546                    let heads = self
2547                        .heads(
2548                            self.descendants(to_reassign.clone())
2549                                .await?
2550                                .difference(&to_reassign),
2551                        )
2552                        .await?;
2553                    tracing::debug!(target: "dag::reassign", "need to rebuild heads: {:?}", &heads);
2554                    let heads: Vec<Vertex> = heads.iter().await?.try_collect().await?;
2555                    VertexListWithOptions::from(heads)
2556                };
2557                let reinsert_parents: Box<dyn Parents> = Box::new(self.dag_snapshot()?);
2558                self.strip_with_lock(&to_reassign, map_lock).await?;
2559
2560                // Rebuild non-master ids and segments on the next iteration.
2561                stack.push(Input::Owned(reinsert_parents, reinsert_heads));
2562            };
2563
2564            // Update IdMap.
2565            let mut outcome = PreparedFlatSegments::default();
2566            let mut covered = self.dag().all_ids_in_groups(&Group::ALL)?;
2567            let mut reserved = calculate_initial_reserved(self, &covered, heads).await?;
2568            for group in Group::ALL {
2569                for (vertex, opts) in heads.vertex_options() {
2570                    if opts.desired_group != group {
2571                        continue;
2572                    }
2573                    // Important: do not call self.map.assign_head. It does not trigger
2574                    // remote protocol properly. Call self.assign_head instead.
2575                    let prepared_segments = self
2576                        .assign_head(vertex.clone(), parents, group, &mut covered, &reserved)
2577                        .await?;
2578                    outcome.merge(prepared_segments);
2579                    // Update reserved.
2580                    if opts.reserve_size > 0 {
2581                        let low = self.map.vertex_id(vertex).await? + 1;
2582                        update_reserved(&mut reserved, &covered, low, opts.reserve_size);
2583                    }
2584                }
2585            }
2586
2587            // Update segments.
2588            self.dag
2589                .build_segments_from_prepared_flat_segments(&outcome)?;
2590
2591            // The master group might have new vertexes inserted, which will
2592            // affect the `overlay_map_id_set`.
2593            self.update_overlay_map_id_set()?;
2594        }
2595
2596        Ok(())
2597    }
2598
2599    /// Find vertexes that need to be reassigned from the non-master group
2600    /// to the master group. That is,
2601    /// `ancestors(master_heads_to_insert) & existing_non_master_group`
2602    ///
2603    /// Assume pre-fetching (populate_missing_vertexes_for_add_heads)
2604    /// was done, so this function can just use naive DFS without worrying
2605    /// about excessive remote lookups.
2606    async fn find_vertexes_to_reassign(
2607        &self,
2608        parents: &dyn Parents,
2609        heads: &VertexListWithOptions,
2610    ) -> Result<Set> {
2611        // Heads that need to be inserted to the master group.
2612        let master_heads = heads.vertexes_by_group(Group::MASTER);
2613
2614        // Visit vertexes recursively.
2615        let mut id_set = IdSet::empty();
2616        let mut to_visit: Vec<Vertex> = master_heads;
2617        let mut visited = HashSet::new();
2618        while let Some(vertex) = to_visit.pop() {
2619            if !visited.insert(vertex.clone()) {
2620                continue;
2621            }
2622            let id = self.vertex_id_optional(&vertex).await?;
2623            // None: The vertex/id is not yet inserted to IdMap.
2624            if let Some(id) = id {
2625                if id.group() == Group::MASTER {
2626                    // Already exist in the master group. Stop visiting.
2627                    continue;
2628                } else {
2629                    // Need reassign. Need to continue visiting.
2630                    id_set.push(id);
2631                }
2632            }
2633            let parents = parents.parent_names(vertex).await?;
2634            to_visit.extend(parents);
2635        }
2636
2637        let set = Set::from_id_set_dag(id_set, self)?;
2638        tracing::debug!(target: "dag::reassign", "need to reassign: {:?}", &set);
2639        Ok(set)
2640    }
2641}
2642
2643/// Calculate the initial "reserved" set used before inserting new vertexes.
2644/// Only heads that have non-zero reserve_size and are presnet in the graph
2645/// take effect. In other words, heads that are known to be not present in
2646/// the local graph (ex. being added), or have zero reserve_size can be
2647/// skipped as an optimization.
2648async fn calculate_initial_reserved(
2649    map: &dyn IdConvert,
2650    covered: &IdSet,
2651    heads: &VertexListWithOptions,
2652) -> Result<IdSet> {
2653    let mut reserved = IdSet::empty();
2654    for (vertex, opts) in heads.vertex_options() {
2655        if opts.reserve_size == 0 {
2656            // Avoid potentially costly remote lookup.
2657            continue;
2658        }
2659        if let Some(id) = map
2660            .vertex_id_with_max_group(&vertex, opts.desired_group)
2661            .await?
2662        {
2663            update_reserved(&mut reserved, covered, id + 1, opts.reserve_size);
2664        }
2665    }
2666    Ok(reserved)
2667}
2668
2669fn update_reserved(reserved: &mut IdSet, covered: &IdSet, low: Id, reserve_size: u32) {
2670    if reserve_size == 0 {
2671        return;
2672    }
2673    let span = find_free_span(covered, low, reserve_size as _, true);
2674    reserved.push(span);
2675}
2676
2677/// Find a span with constraints:
2678/// - does not overlap with `covered`.
2679/// - `span.low` >= the given `low`.
2680/// - if `shrink_to_fit` is `false`, `span.high - span.low` must be `reserve_size`.
2681/// - if `shrink_to_fit` is `true`, the span can be smaller than `reserve_size` to
2682///   fill up existing gaps in `covered`.
2683///
2684/// `reserve_size` cannot be 0.
2685fn find_free_span(covered: &IdSet, low: Id, reserve_size: u64, shrink_to_fit: bool) -> IdSpan {
2686    assert!(reserve_size > 0);
2687    let original_low = low;
2688    let mut low = low;
2689    let mut high;
2690    let mut count = 0;
2691    loop {
2692        count += 1;
2693        // First, bump 'low' to not overlap with a conflicted `span`.
2694        //   [----covered_span----]
2695        //      ^                  ^
2696        //      original_low       bump to here
2697        if let Some(span) = covered.span_contains(low) {
2698            low = span.high + 1;
2699        }
2700        high = (low + reserve_size - 1).min(low.group().max_id());
2701        if reserve_size <= 1 && !covered.contains(low) {
2702            // No need to go through complex (maybe O(N)) logic below.
2703            break;
2704        }
2705        // Try to reserve id..=id+reserve_size-1
2706        let reserved = IdSet::from_single_span(IdSpan::new(low, high));
2707        let intersected = reserved.intersection(covered);
2708        if let Some(span) = intersected.iter_span_asc().next() {
2709            // Overlap with existing covered spans. Decrease `high` so it
2710            // no longer overlap.
2711            if span.low > low && shrink_to_fit {
2712                // Use the remaining part of the previous reservation.
2713                //   [----------reserved--------------]
2714                //             [--intersected--]
2715                //   ^                                ^
2716                //   low                              high
2717                //            ^
2718                //            last_free
2719                //   [reserved] <- remaining of the previous reservation
2720                //            ^
2721                //            high
2722                let last_free = span.low - 1;
2723                high = last_free;
2724            } else {
2725                // No space on the left side. Try the right side.
2726                //   [--------reserved-------]
2727                //   [--intersected--]
2728                //   ^                       ^
2729                //   low                     high
2730                //        try next -> [------reserved------]
2731                //  ^                 ^
2732                //  last_free         low (try next)
2733                low = span.high + 1;
2734                continue;
2735            }
2736        }
2737        break;
2738    }
2739    if count >= 4096 {
2740        tracing::warn!(
2741            target: "dag::perf",
2742            count=count,
2743            low=?original_low,
2744            reserve_size=reserve_size,
2745            covered=?covered,
2746            "PERF: find_free_span took too long",
2747        );
2748    }
2749    let span = IdSpan::new(low, high);
2750    if !shrink_to_fit {
2751        assert_eq!(span.count(), reserve_size);
2752    }
2753    span
2754}
2755
2756fn is_ok_some<T>(value: Result<Option<T>>) -> bool {
2757    match value {
2758        Ok(Some(_)) => true,
2759        _ => false,
2760    }
2761}
2762
2763impl<IS, M, P, S> IdMapSnapshot for AbstractDag<IdDag<IS>, M, P, S>
2764where
2765    IS: IdDagStore,
2766    IdDag<IS>: TryClone + 'static,
2767    M: TryClone + IdConvert + Send + Sync + 'static,
2768    P: TryClone + Send + Sync + 'static,
2769    S: TryClone + Send + Sync + 'static,
2770{
2771    fn id_map_snapshot(&self) -> Result<Arc<dyn IdConvert + Send + Sync>> {
2772        Ok(self.try_snapshot()? as Arc<dyn IdConvert + Send + Sync>)
2773    }
2774}
2775
2776impl<IS, M, P, S> fmt::Debug for AbstractDag<IdDag<IS>, M, P, S>
2777where
2778    IS: IdDagStore,
2779    M: IdConvert + Send + Sync,
2780    P: Send + Sync,
2781    S: Send + Sync,
2782{
2783    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2784        debug(&self.dag, &self.map, f)
2785    }
2786}
2787
2788pub(crate) fn debug_segments_by_level_group<S: IdDagStore>(
2789    iddag: &IdDag<S>,
2790    idmap: &dyn IdConvert,
2791    level: Level,
2792    group: Group,
2793) -> Vec<String> {
2794    let mut result = Vec::new();
2795    // Show Id, with optional hash.
2796    let show = |id: Id| DebugId {
2797        id,
2798        name: non_blocking_result(idmap.vertex_name(id)).ok(),
2799    };
2800    let show_flags = |flags: SegmentFlags| -> String {
2801        let mut result = Vec::new();
2802        if flags.contains(SegmentFlags::HAS_ROOT) {
2803            result.push("Root");
2804        }
2805        if flags.contains(SegmentFlags::ONLY_HEAD) {
2806            result.push("OnlyHead");
2807        }
2808        result.join(" ")
2809    };
2810
2811    if let Ok(segments) = iddag.next_segments(group.min_id(), level) {
2812        for segment in segments.into_iter().rev() {
2813            if let (Ok(span), Ok(parents), Ok(flags)) =
2814                (segment.span(), segment.parents(), segment.flags())
2815            {
2816                let mut line = format!(
2817                    "{:.12?} : {:.12?} {:.12?}",
2818                    show(span.low),
2819                    show(span.high),
2820                    parents.into_iter().map(show).collect::<Vec<_>>(),
2821                );
2822                let flags = show_flags(flags);
2823                if !flags.is_empty() {
2824                    line += &format!(" {}", flags);
2825                }
2826                result.push(line);
2827            }
2828        }
2829    }
2830    result
2831}
2832
2833fn debug<S: IdDagStore>(
2834    iddag: &IdDag<S>,
2835    idmap: &dyn IdConvert,
2836    f: &mut fmt::Formatter,
2837) -> fmt::Result {
2838    if let Ok(max_level) = iddag.max_level() {
2839        writeln!(f, "Max Level: {}", max_level)?;
2840        for lv in (0..=max_level).rev() {
2841            writeln!(f, " Level {}", lv)?;
2842            for group in Group::ALL.iter().cloned() {
2843                writeln!(f, "  {}:", group)?;
2844                if let Ok(segments) = iddag.next_segments(group.min_id(), lv) {
2845                    writeln!(f, "   Segments: {}", segments.len())?;
2846                    for line in debug_segments_by_level_group(iddag, idmap, lv, group) {
2847                        writeln!(f, "    {}", line)?;
2848                    }
2849                }
2850            }
2851        }
2852    }
2853
2854    Ok(())
2855}
2856
2857struct DebugId {
2858    id: Id,
2859    name: Option<Vertex>,
2860}
2861
2862impl fmt::Debug for DebugId {
2863    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2864        if let Some(name) = &self.name {
2865            fmt::Debug::fmt(&name, f)?;
2866            f.write_str("+")?;
2867        }
2868        write!(f, "{:?}", self.id)?;
2869        Ok(())
2870    }
2871}
2872
2873#[cfg(test)]
2874mod tests {
2875    use super::*;
2876
2877    #[test]
2878    fn test_find_free_span_overflow() {
2879        let covered = IdSet::from(0..=6);
2880        let reserve_size = 2;
2881        for shrink_to_fit in [true, false] {
2882            let span = find_free_span(&covered, Id(0), reserve_size, shrink_to_fit);
2883            assert_eq!(span, IdSpan::from(7..=8));
2884        }
2885    }
2886}