1use std::collections::BTreeMap;
13use std::collections::HashMap;
14use std::collections::HashSet;
15use std::env::var;
16use std::fmt;
17use std::io;
18use std::ops::Deref;
19use std::sync::atomic::AtomicUsize;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::RwLock;
24
25use dag_types::FlatSegment;
26use futures::future::BoxFuture;
27use futures::FutureExt;
28use futures::StreamExt;
29use futures::TryStreamExt;
30use nonblocking::non_blocking_result;
31
32use crate::clone::CloneData;
33use crate::default_impl;
34use crate::errors::bug;
35use crate::errors::programming;
36use crate::errors::DagError;
37use crate::errors::NotFoundError;
38use crate::id::Group;
39use crate::id::Id;
40use crate::id::Vertex;
41use crate::iddag::IdDag;
42use crate::iddag::IdDagAlgorithm;
43use crate::iddagstore::IdDagStore;
44use crate::idmap::CoreMemIdMap;
45use crate::idmap::IdMapAssignHead;
46use crate::idmap::IdMapWrite;
47use crate::lifecycle::LifecycleId;
48use crate::ops::CheckIntegrity;
49use crate::ops::DagAddHeads;
50use crate::ops::DagAlgorithm;
51use crate::ops::DagExportCloneData;
52use crate::ops::DagExportPullData;
53use crate::ops::DagImportCloneData;
54use crate::ops::DagImportPullData;
55use crate::ops::DagPersistent;
56use crate::ops::DagStrip;
57use crate::ops::IdConvert;
58use crate::ops::IdMapSnapshot;
59use crate::ops::Open;
60use crate::ops::Parents;
61use crate::ops::Persist;
62use crate::ops::PrefixLookup;
63use crate::ops::StorageVersion;
64use crate::ops::ToIdSet;
65use crate::ops::TryClone;
66use crate::protocol;
67use crate::protocol::is_remote_protocol_disabled;
68use crate::protocol::AncestorPath;
69use crate::protocol::Process;
70use crate::protocol::RemoteIdConvertProtocol;
71use crate::segment::PreparedFlatSegments;
72use crate::segment::SegmentFlags;
73use crate::set::hints::Flags;
74use crate::set::hints::Hints;
75use crate::set::id_static::BasicIterationOrder;
76use crate::set::Set;
77use crate::types_ext::PreparedFlatSegmentsExt;
78use crate::utils;
79use crate::Error::NeedSlowPath;
80use crate::IdSet;
81use crate::IdSpan;
82use crate::Level;
83use crate::Result;
84use crate::VerLink;
85use crate::VertexListWithOptions;
86use crate::VertexOptions;
87
88mod builder;
89#[cfg(any(test, feature = "indexedlog-backend"))]
90mod indexedlog_dag;
91mod mem_dag;
92
93pub use builder::DagBuilder;
94#[cfg(any(test, feature = "indexedlog-backend"))]
95pub use indexedlog_dag::Dag;
96#[cfg(any(test, feature = "indexedlog-backend"))]
97pub use indexedlog_dag::IndexedLogDagPath;
98pub use mem_dag::MemDag;
99pub use mem_dag::MemDagPath;
100
101pub struct AbstractDag<I, M, P, S>
102where
103 I: Send + Sync,
104 M: Send + Sync,
105 P: Send + Sync,
106 S: Send + Sync,
107{
108 pub(crate) dag: I,
109 pub(crate) map: M,
110
111 snapshot: RwLock<Option<Arc<Self>>>,
114
115 pending_heads: VertexListWithOptions,
118
119 path: P,
121
122 state: S,
124
125 id: String,
127
128 persisted_id_set: IdSet,
130
131 overlay_map: Arc<RwLock<CoreMemIdMap>>,
134
135 overlay_map_id_set: IdSet,
140
141 overlay_map_paths: Arc<Mutex<Vec<(AncestorPath, Vec<Vertex>)>>>,
145
146 remote_protocol: Arc<dyn RemoteIdConvertProtocol>,
150
151 managed_virtual_group: Option<Arc<(Box<dyn Parents>, VertexListWithOptions )>>,
153
154 missing_vertexes_confirmed_by_remote: Arc<RwLock<HashSet<Vertex>>>,
157
158 lifecycle_id: LifecycleId,
160 pub(crate) internal_stats: DagInternalStats,
161}
162
163#[derive(Debug, Default)]
165pub struct DagInternalStats {
166 pub sort_slow_path_count: AtomicUsize,
168}
169
170impl<D, M, P, S> AbstractDag<D, M, P, S>
171where
172 D: Send + Sync,
173 M: Send + Sync,
174 P: Send + Sync,
175 S: Send + Sync,
176{
177 pub fn into_idmap_dag(self) -> (M, D) {
179 (self.map, self.dag)
180 }
181
182 pub fn into_idmap_dag_path_state(self) -> (M, D, P, S) {
184 (self.map, self.dag, self.path, self.state)
185 }
186}
187
188impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
189where
190 IS: IdDagStore,
191 IdDag<IS>: TryClone,
192 M: Send + Sync + IdMapWrite + IdMapAssignHead + TryClone + 'static,
193 P: Send + Sync + TryClone + 'static,
194 S: Send + Sync + TryClone + 'static,
195{
196 pub async fn set_managed_virtual_group(
211 &mut self,
212 items: Option<Vec<(Vertex, Vec<Vertex>)>>,
213 ) -> Result<()> {
214 tracing::debug!(target: "dag::set_managed_virtual_group", lifecycle_id=?self.lifecycle_id, ?items);
215 self.managed_virtual_group = items.map(|items| {
216 let opts = VertexOptions {
219 reserve_size: 0,
220 desired_group: Group::VIRTUAL,
221 };
222 let heads: VertexListWithOptions = items
223 .iter()
224 .map(|(v, _p)| (v.clone(), opts.clone()))
225 .collect::<Vec<_>>()
226 .into();
227 let parents: HashMap<Vertex, Vec<Vertex>> = items.into_iter().collect();
228 let parents: Box<dyn Parents> = Box::new(parents);
229 Arc::new((parents, heads))
230 });
231 self.maybe_recreate_virtual_group().await
232 }
233
234 pub(crate) async fn clear_virtual_group(&mut self) -> Result<()> {
236 let id_set = self.dag.all_ids_in_groups(&[Group::VIRTUAL])?;
237 if !id_set.is_empty() {
238 let removed = self.dag.strip(id_set)?;
239 for span in removed.iter_span_desc() {
240 self.map.remove_range(span.low, span.high).await?;
241 }
242 }
243 Ok(())
244 }
245
246 async fn maybe_recreate_virtual_group(&mut self) -> Result<()> {
249 if let Some(maintained_virtual_group) = self.managed_virtual_group.as_ref() {
250 let maintained_virtual_group = maintained_virtual_group.clone();
251 self.clear_virtual_group().await?;
252 let parents = &maintained_virtual_group.0;
253 let head_opts = &maintained_virtual_group.1;
254 {
258 let mut cache = self.missing_vertexes_confirmed_by_remote.write().unwrap();
259 for v in head_opts.vertexes() {
260 cache.insert(v);
261 }
262 }
263 self.add_heads(parents.as_ref(), head_opts).await?;
265 }
266 Ok(())
267 }
268}
269
270#[async_trait::async_trait]
271impl<IS, M, P, S> DagPersistent for AbstractDag<IdDag<IS>, M, P, S>
272where
273 IS: IdDagStore + Persist + StorageVersion,
274 IdDag<IS>: TryClone + 'static,
275 M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
276 P: Open<OpenTarget = Self> + Send + Sync + 'static,
277 S: TryClone + StorageVersion + Persist + Send + Sync + 'static,
278{
279 async fn add_heads_and_flush(
281 &mut self,
282 parents: &dyn Parents,
283 heads: &VertexListWithOptions,
284 ) -> Result<()> {
285 if !self.pending_heads.is_empty() {
286 return programming(format!(
287 "ProgrammingError: add_heads_and_flush called with pending heads ({:?})",
288 &self.pending_heads.vertexes(),
289 ));
290 }
291 tracing::debug!(target: "dag::add_heads_and_flush", lifecycle_id=?self.lifecycle_id, ?heads);
292
293 self.clear_virtual_group().await?;
295
296 let old_version = self.state.storage_version();
303 let lock = self.state.lock()?;
304 let map_lock = self.map.lock()?;
305 let dag_lock = self.dag.lock()?;
306 self.state.reload(&lock)?;
307 let new_version = self.state.storage_version();
308 if old_version != new_version {
309 self.invalidate_snapshot();
310 self.invalidate_missing_vertex_cache();
311 self.invalidate_overlay_map()?;
312 }
313
314 self.map.reload(&map_lock)?;
315 self.dag.reload(&dag_lock)?;
316
317 self.build_with_lock(parents, heads, &map_lock).await?;
319
320 self.map.persist(&map_lock)?;
322 self.dag.persist(&dag_lock)?;
323 self.state.persist(&lock)?;
324 drop(dag_lock);
325 drop(map_lock);
326 drop(lock);
327
328 self.persisted_id_set = self.dag.all_ids_in_groups(&Group::PERSIST)?;
329 self.maybe_recreate_virtual_group().await?;
330
331 debug_assert_eq!(self.dirty().await?.count().await?, 0);
332
333 Ok(())
334 }
335
336 async fn flush(&mut self, heads: &VertexListWithOptions) -> Result<()> {
350 tracing::debug!(target: "dag::flush", lifecycle_id=?self.lifecycle_id, ?heads);
351 for result in self.vertex_id_batch(&heads.vertexes()).await? {
353 result?;
354 }
355 if heads.vertexes_by_group(Group::MASTER).len() != heads.len() {
358 return programming(format!(
359 "Dag::flush({:?}) is probably misused (group is not master)",
360 heads
361 ));
362 }
363
364 self.flush_cached_idmap().await?;
366
367 let mut new_name_dag: Self = self.path.open()?;
369
370 let parents: &(dyn DagAlgorithm + Send + Sync) = self;
371 let non_master_heads: VertexListWithOptions = self.pending_heads.clone();
372 new_name_dag.inherit_configurations_from(self);
373 let heads = heads.clone().chain(non_master_heads);
374 new_name_dag.add_heads_and_flush(&parents, &heads).await?;
375 new_name_dag.maybe_recreate_virtual_group().await?;
376
377 *self = new_name_dag;
378 Ok(())
379 }
380
381 #[tracing::instrument(skip(self))]
384 async fn flush_cached_idmap(&self) -> Result<()> {
385 let mut to_insert: Vec<(AncestorPath, Vec<Vertex>)> = Vec::new();
390 std::mem::swap(&mut to_insert, &mut *self.overlay_map_paths.lock().unwrap());
391 if to_insert.is_empty() {
392 return Ok(());
393 }
394
395 tracing::debug!(target: "dag::cache", "flushing cached idmap ({} items)", to_insert.len());
397 let mut new: Self = self.path.open()?;
398 let lock = new.state.lock()?;
399 let map_lock = new.map.lock()?;
400 let dag_lock = new.dag.lock()?;
401 new.state.reload(&lock)?;
402 new.map.reload(&map_lock)?;
403 new.dag.reload(&dag_lock)?;
404 new.inherit_configurations_from(self);
405 std::mem::swap(&mut to_insert, &mut *new.overlay_map_paths.lock().unwrap());
406 new.flush_cached_idmap_with_lock(&map_lock).await?;
407
408 new.state.persist(&lock)?;
409
410 Ok(())
411 }
412}
413
414impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
415where
416 IS: IdDagStore,
417 IdDag<IS>: TryClone + 'static,
418 M: TryClone + IdConvert + IdMapWrite + Persist + Send + Sync + 'static,
419 P: Send + Sync + 'static,
420 S: TryClone + Send + Sync + 'static,
421{
422 async fn flush_cached_idmap_with_lock(&mut self, map_lock: &M::Lock) -> Result<()> {
424 let mut to_insert: Vec<(AncestorPath, Vec<Vertex>)> = Vec::new();
425 std::mem::swap(&mut to_insert, &mut *self.overlay_map_paths.lock().unwrap());
426 if to_insert.is_empty() {
427 return Ok(());
428 }
429
430 let id_names = calculate_id_name_from_paths(
431 &self.map,
432 &*self.dag,
433 &self.overlay_map_id_set,
434 &to_insert,
435 )
436 .await?;
437
438 let mut skip_vertexes: Option<HashSet<Vertex>> = None;
440 if crate::is_testing() {
441 if let Ok(s) = var("DAG_SKIP_FLUSH_VERTEXES") {
442 skip_vertexes = Some(
443 s.split(',')
444 .filter_map(|s| Vertex::from_hex(s.as_bytes()).ok())
445 .collect(),
446 )
447 }
448 }
449
450 for (id, name) in id_names {
451 if let Some(skip) = &skip_vertexes {
452 if skip.contains(&name) {
453 tracing::info!(
454 target: "dag::cache",
455 "skip flushing {:?}-{} to IdMap set by DAG_SKIP_FLUSH_VERTEXES",
456 &name,
457 id
458 );
459 continue;
460 }
461 }
462 tracing::debug!(target: "dag::cache", "insert {:?}-{} to IdMap", &name, id);
463 self.map.insert(id, name.as_ref()).await?;
464 }
465
466 self.map.persist(map_lock)?;
467 Ok(())
468 }
469}
470
471impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
472where
473 IS: Send + Sync + 'static,
474 IdDag<IS>: StorageVersion,
475 M: Send + Sync + 'static,
476 P: Send + Sync + 'static,
477 S: Send + Sync + 'static,
478{
479 fn maybe_reuse_caches_from(&mut self, other: &Self) {
482 let dag_version_mismatch = self.dag.storage_version() != other.dag.storage_version();
486 let persisted_id_mismatch =
487 self.persisted_id_set.as_spans() != other.persisted_id_set.as_spans();
488 if dag_version_mismatch || persisted_id_mismatch {
489 tracing::debug!(target: "dag::cache", "cannot reuse cache");
490 return;
491 }
492 tracing::debug!(
493 target: "dag::cache", "reusing cache ({} missing)",
494 other.missing_vertexes_confirmed_by_remote.read().unwrap().len(),
495 );
496 self.missing_vertexes_confirmed_by_remote =
497 other.missing_vertexes_confirmed_by_remote.clone();
498 self.overlay_map = other.overlay_map.clone();
499 self.overlay_map_paths = other.overlay_map_paths.clone();
500 }
501
502 pub fn set_remote_protocol(&mut self, protocol: Arc<dyn RemoteIdConvertProtocol>) {
507 self.remote_protocol = protocol;
508 }
509
510 fn inherit_configurations_from(&mut self, original: &Self) {
512 let seg_size = original.dag.get_new_segment_size();
513 self.dag.set_new_segment_size(seg_size);
514 self.set_remote_protocol(original.remote_protocol.clone());
515 self.managed_virtual_group = original.managed_virtual_group.clone();
516 self.maybe_reuse_caches_from(original)
517 }
518}
519
520#[async_trait::async_trait]
521impl<IS, M, P, S> DagAddHeads for AbstractDag<IdDag<IS>, M, P, S>
522where
523 IS: IdDagStore,
524 IdDag<IS>: TryClone,
525 M: TryClone + IdMapAssignHead + Send + Sync + 'static,
526 P: TryClone + Send + Sync + 'static,
527 S: TryClone + Send + Sync + 'static,
528{
529 async fn add_heads(
531 &mut self,
532 parents: &dyn Parents,
533 heads: &VertexListWithOptions,
534 ) -> Result<bool> {
535 tracing::debug!(target: "dag::add_heads", lifecycle_id=?self.lifecycle_id, ?heads);
536 if heads.min_desired_group().unwrap_or(Group::VIRTUAL) < Group::VIRTUAL {
541 self.populate_missing_vertexes_for_add_heads(parents, &heads.vertexes())
542 .await?;
543 }
544
545 self.invalidate_snapshot();
551
552 let mut outcome = PreparedFlatSegments::default();
591 let mut covered = self.dag().all_ids_in_groups(&Group::ALL)?;
592 let mut reserved = calculate_initial_reserved(self, &covered, heads).await?;
593 for (head, opts) in heads.vertex_options() {
594 let need_assigning = match self.vertex_id_optional(&head).await? {
595 Some(id) => {
596 if id.group() > opts.desired_group {
597 return programming(format!(
598 "add_heads: cannot re-assign {:?}:{:?} from {} to {} (desired), use add_heads_and_flush instead",
599 head,
600 id,
601 id.group(),
602 opts.desired_group
603 ));
604 } else {
605 !self.dag.contains_id(id)?
608 }
609 }
610 None => true,
611 };
612 if need_assigning {
613 let group = opts.desired_group;
614 let prepared_segments = self
617 .assign_head(head.clone(), parents, group, &mut covered, &reserved)
618 .await?;
619 outcome.merge(prepared_segments);
620 if opts.reserve_size > 0 {
621 let low = self.map.vertex_id(head.clone()).await? + 1;
622 update_reserved(&mut reserved, &covered, low, opts.reserve_size);
623 }
624 if group != Group::VIRTUAL {
625 self.pending_heads.push((head, opts));
626 }
627 }
628 }
629
630 self.dag
632 .build_segments_from_prepared_flat_segments(&outcome)?;
633
634 Ok(outcome.segment_count() > 0)
635 }
636}
637
638#[async_trait::async_trait]
639impl<IS, M, P, S> DagStrip for AbstractDag<IdDag<IS>, M, P, S>
640where
641 IS: IdDagStore + Persist,
642 IdDag<IS>: TryClone + StorageVersion,
643 M: TryClone + Persist + IdMapWrite + IdConvert + Send + Sync + 'static,
644 P: TryClone + Open<OpenTarget = Self> + Send + Sync + 'static,
645 S: TryClone + Persist + Send + Sync + 'static,
646{
647 async fn strip(&mut self, set: &Set) -> Result<()> {
648 if !self.pending_heads.is_empty() {
649 return programming(format!(
650 "strip does not support pending heads ({:?})",
651 &self.pending_heads.vertexes(),
652 ));
653 }
654 tracing::debug!(target: "dag::strip", lifecycle_id=?self.lifecycle_id, ?set);
655
656 let mut new: Self = self.path.open()?;
659 let (lock, map_lock, dag_lock) = new.reload()?;
660 new.inherit_configurations_from(self);
661
662 new.strip_with_lock(set, &map_lock).await?;
663 new.persist(lock, map_lock, dag_lock)?;
664 new.maybe_recreate_virtual_group().await?;
665
666 *self = new;
667 Ok(())
668 }
669}
670
671impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
672where
673 IS: IdDagStore,
674 IdDag<IS>: TryClone,
675 M: TryClone + Persist + IdMapWrite + IdConvert + Send + Sync + 'static,
676 P: TryClone + Send + Sync + 'static,
677 S: TryClone + Send + Sync + 'static,
678{
679 async fn strip_with_lock(&mut self, set: &Set, map_lock: &M::Lock) -> Result<()> {
681 if !self.pending_heads.is_empty() {
682 return programming(format!(
683 "strip does not support pending heads ({:?})",
684 &self.pending_heads.vertexes(),
685 ));
686 }
687
688 let id_set = self.to_id_set(set).await?;
689
690 let head_ids: Vec<Id> = {
693 let to_strip = self.dag.descendants(id_set.clone())?;
695 let master_group = self.dag.master_group()?;
697 let master_group_after_strip = master_group.difference(&to_strip);
698 let heads_before_strip = self.dag.heads_ancestors(master_group)?;
699 let heads_after_strip = self.dag.heads_ancestors(master_group_after_strip)?;
700 let new_heads = heads_after_strip.difference(&heads_before_strip);
701 new_heads.iter_desc().collect()
702 };
703 let heads_after_strip = self.vertex_name_batch(&head_ids).await?;
704 tracing::debug!(target: "dag::strip", "heads after strip: {:?}", &heads_after_strip);
705 self.flush_cached_idmap_with_lock(map_lock).await?;
708
709 let removed_id_set = self.dag.strip(id_set)?;
710 tracing::debug!(target: "dag::strip", "removed id set: {:?}", &removed_id_set);
711
712 let mut removed_vertexes = Vec::new();
713 for span in removed_id_set.iter_span_desc() {
714 let vertexes = self.map.remove_range(span.low, span.high).await?;
715 removed_vertexes.extend(vertexes);
716 }
717 tracing::debug!(target: "dag::strip", "removed vertexes: {:?}", &removed_vertexes);
718
719 self.missing_vertexes_confirmed_by_remote
721 .write()
722 .unwrap()
723 .extend(removed_vertexes);
724
725 self.invalidate_snapshot();
727
728 Ok(())
729 }
730}
731
732#[async_trait::async_trait]
733impl<IS, M, P, S> IdMapWrite for AbstractDag<IdDag<IS>, M, P, S>
734where
735 IS: IdDagStore,
736 IdDag<IS>: TryClone,
737 M: TryClone + IdMapAssignHead + Send + Sync,
738 P: TryClone + Send + Sync,
739 S: TryClone + Send + Sync,
740{
741 async fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
742 self.map.insert(id, name).await
743 }
744
745 async fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<Vertex>> {
746 self.map.remove_range(low, high).await
747 }
748}
749
750#[async_trait::async_trait]
751impl<IS, M, P, S> DagImportCloneData for AbstractDag<IdDag<IS>, M, P, S>
752where
753 IS: IdDagStore + Persist + 'static,
754 IdDag<IS>: TryClone,
755 M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
756 P: TryClone + Send + Sync + 'static,
757 S: TryClone + Persist + Send + Sync + 'static,
758{
759 async fn import_clone_data(&mut self, clone_data: CloneData<Vertex>) -> Result<()> {
760 let (lock, map_lock, dag_lock) = self.reload()?;
763
764 if !self.dag.all()?.is_empty() {
765 return programming("Cannot import clone data for non-empty graph");
766 }
767 for (id, name) in clone_data.idmap {
768 tracing::debug!(target: "dag::clone", "insert IdMap: {:?}-{:?}", &name, id);
769 self.map.insert(id, name.as_ref()).await?;
770 }
771 self.dag
772 .build_segments_from_prepared_flat_segments(&clone_data.flat_segments)?;
773
774 self.verify_missing().await?;
775
776 self.persist(lock, map_lock, dag_lock)
777 }
778}
779
780impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
781where
782 IS: IdDagStore + Persist + 'static,
783 IdDag<IS>: TryClone,
784 M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
785 P: TryClone + Send + Sync + 'static,
786 S: TryClone + Persist + Send + Sync + 'static,
787{
788 async fn verify_missing(&self) -> Result<()> {
790 let missing: Vec<Id> = self.check_universal_ids().await?;
791 if !missing.is_empty() {
792 let msg = format!(
793 concat!(
794 "Clone data does not contain vertex for {:?}. ",
795 "This is most likely a server-side bug."
796 ),
797 missing,
798 );
799 return programming(msg);
800 }
801
802 Ok(())
803 }
804
805 fn reload(&mut self) -> Result<(S::Lock, M::Lock, IS::Lock)> {
806 let lock = self.state.lock()?;
807 let map_lock = self.map.lock()?;
808 let dag_lock = self.dag.lock()?;
809 self.state.reload(&lock)?;
810 self.map.reload(&map_lock)?;
811 self.dag.reload(&dag_lock)?;
812
813 Ok((lock, map_lock, dag_lock))
814 }
815
816 fn persist(&mut self, lock: S::Lock, map_lock: M::Lock, dag_lock: IS::Lock) -> Result<()> {
817 self.map.persist(&map_lock)?;
818 self.dag.persist(&dag_lock)?;
819 self.state.persist(&lock)?;
820
821 self.invalidate_overlay_map()?;
822 self.persisted_id_set = self.dag.all_ids_in_groups(&Group::PERSIST)?;
823
824 Ok(())
825 }
826}
827
828#[async_trait::async_trait]
829impl<IS, M, P, S> DagImportPullData for AbstractDag<IdDag<IS>, M, P, S>
830where
831 IS: IdDagStore + Persist,
832 IdDag<IS>: TryClone + StorageVersion,
833 M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
834 P: Open<OpenTarget = Self> + TryClone + Send + Sync + 'static,
835 S: TryClone + Persist + Send + Sync + 'static,
836{
837 async fn import_pull_data(
839 &mut self,
840 clone_data: CloneData<Vertex>,
841 heads: &VertexListWithOptions,
842 ) -> Result<()> {
843 if !self.pending_heads.is_empty() {
844 return programming(format!(
845 "import_pull_data called with pending heads ({:?})",
846 &self.pending_heads.vertexes(),
847 ));
848 }
849 if let Some(group) = heads.max_desired_group() {
850 if group != Group::MASTER {
851 return programming(concat!(
852 "import_pull_data should only take MASTER group heads. ",
853 "Only MASTER group can contain lazy vertexes like what pull_data uses."
854 ));
855 }
856 }
857
858 for id in clone_data.flat_segments.parents_head_and_roots() {
859 if !clone_data.idmap.contains_key(&id) {
860 return programming(format!(
861 "server does not provide name for id {:?} in pull data",
862 id
863 ));
864 }
865 }
866
867 let mut new: Self = self.path.open()?;
869 let (lock, map_lock, dag_lock) = new.reload()?;
870 new.inherit_configurations_from(self);
871
872 {
883 let mut root_ids: Vec<Id> = Vec::new();
884 let mut parent_ids: Vec<Id> = Vec::new();
885 let segments = &clone_data.flat_segments.segments;
886 let id_set = IdSet::from_spans(segments.iter().map(|s| s.low..=s.high));
887 for seg in segments {
888 let pids: Vec<Id> = seg.parents.to_vec();
889 let connected_pids: Vec<Id> = pids
892 .iter()
893 .copied()
894 .filter(|&p| !id_set.contains(p))
895 .collect();
896 if connected_pids.len() == pids.len() {
897 root_ids.push(seg.low);
900 }
901 parent_ids.extend(connected_pids);
902 }
903
904 let to_names = |ids: &[Id], hint: &str| -> Result<Vec<Vertex>> {
905 let names = ids.iter().map(|i| match clone_data.idmap.get(i) {
906 Some(v) => Ok(v.clone()),
907 None => {
908 programming(format!("server does not provide name for {} {:?}", hint, i))
909 }
910 });
911 names.collect()
912 };
913
914 let parent_names = to_names(&parent_ids, "parent")?;
915 let root_names = to_names(&root_ids, "root")?;
916 tracing::trace!(
917 "pull: connected parents: {:?}, roots: {:?}",
918 &parent_names,
919 &root_names
920 );
921
922 let mut names = parent_names
924 .iter()
925 .chain(root_names.iter())
926 .cloned()
927 .collect::<Vec<_>>();
928 names.sort_unstable();
929 names.dedup();
930 let resolved = new.vertex_id_batch(&names).await?;
931 assert_eq!(resolved.len(), names.len());
932 for (id, name) in resolved.into_iter().zip(names) {
933 if let Ok(id) = id {
934 if !new.map.contains_vertex_name(&name).await? {
935 tracing::debug!(target: "dag::pull", "insert IdMap: {:?}-{:?}", &name, id);
936 new.map.insert(id, name.as_ref()).await?;
937 }
938 }
939 }
940
941 for name in root_names {
942 if new.contains_vertex_name(&name).await? {
943 let e = NeedSlowPath(format!("{:?} exists in local graph", name));
944 return Err(e);
945 }
946 }
947
948 let client_parents = new.vertex_id_batch(&parent_names).await?;
949 client_parents.into_iter().collect::<Result<Vec<Id>>>()?;
950 }
951
952 struct ServerState<'a> {
955 seg_by_high: BTreeMap<Id, FlatSegment>,
956 idmap_by_name: BTreeMap<&'a Vertex, Id>,
957 idmap_by_id: &'a BTreeMap<Id, Vertex>,
958 }
959 let mut server = ServerState {
960 seg_by_high: clone_data
961 .flat_segments
962 .segments
963 .iter()
964 .map(|s| (s.high, s.clone()))
965 .collect(),
966 idmap_by_name: clone_data
967 .idmap
968 .iter()
969 .map(|(&id, name)| (name, id))
970 .collect(),
971 idmap_by_id: &clone_data.idmap,
972 };
973
974 impl<'a> ServerState<'a> {
975 fn seg_containing_id(&self, server_id: Id) -> Result<&FlatSegment> {
977 let seg = match self.seg_by_high.range(server_id..).next() {
978 Some((_high, seg)) => {
979 if seg.low <= server_id && seg.high >= server_id {
980 Some(seg)
981 } else {
982 None
983 }
984 }
985 None => None,
986 };
987 seg.ok_or_else(|| {
988 DagError::Programming(format!(
989 "server does not provide segment covering id {}",
990 server_id
991 ))
992 })
993 }
994
995 fn split_seg(&mut self, high: Id, middle: Id) {
998 let seg = self
1012 .seg_by_high
1013 .remove(&high)
1014 .expect("bug: invalid high passed to split_seg");
1015 assert!(seg.low <= middle);
1016 assert!(seg.high > middle);
1017 assert!(self.idmap_by_id.contains_key(&middle));
1018 let seg1 = FlatSegment {
1019 low: seg.low,
1020 high: middle,
1021 parents: seg.parents,
1022 };
1023 let seg2 = FlatSegment {
1024 low: middle + 1,
1025 high: seg.high,
1026 parents: vec![middle],
1027 };
1028 self.seg_by_high.insert(seg1.high, seg1);
1029 self.seg_by_high.insert(seg2.high, seg2);
1030 }
1031
1032 fn name_by_id(&self, id: Id) -> Vertex {
1033 self.idmap_by_id
1034 .get(&id)
1035 .expect("IdMap should contain the `id`. It should be checked before.")
1036 .clone()
1037 }
1038
1039 fn id_by_name(&self, name: &Vertex) -> Option<Id> {
1040 self.idmap_by_name.get(name).copied()
1041 }
1042 }
1043
1044 let mut taken = {
1046 new.dag().all_ids_in_groups(&[Group::MASTER])?
1052 };
1053
1054 let mut prepared_client_segments = PreparedFlatSegments::default();
1056
1057 for (head, opts) in heads.vertex_options() {
1073 let mut stack: Vec<Id> = vec![];
1074 if let Some(head_server_id) = server.id_by_name(&head) {
1075 let _head_server_seg = server.seg_containing_id(head_server_id)?;
1076 stack.push(head_server_id);
1077 }
1078
1079 while let Some(server_high) = stack.pop() {
1080 let mut server_seg = server.seg_containing_id(server_high)?;
1081 if server_high < server_seg.high {
1082 let seg_high = server_seg.high;
1084 server.split_seg(seg_high, server_high);
1085 server_seg = server.seg_containing_id(server_high)?;
1086 assert_eq!(server_high, server_seg.high);
1087 }
1088 let high_vertex = server.name_by_id(server_high);
1089 let client_high_id = new
1090 .map
1091 .vertex_id_with_max_group(&high_vertex, Group::MAX)
1092 .await?;
1093 match client_high_id {
1094 Some(id) if id.group() == Group::MASTER => {
1095 continue;
1098 }
1099 Some(id) => {
1100 let e = NeedSlowPath(format!(
1104 "{:?} exists in local graph as {:?} - fast path requires MASTER group",
1105 &high_vertex, id
1106 ));
1107 return Err(e);
1108 }
1109 None => {}
1110 }
1111
1112 let parent_server_ids = &server_seg.parents;
1113 let parent_names: Vec<Vertex> = {
1114 let iter = parent_server_ids.iter().map(|id| server.name_by_id(*id));
1115 iter.collect()
1116 };
1117
1118 let mut parent_client_ids = Vec::new();
1120 let mut missng_parent_server_ids = Vec::new();
1121
1122 {
1125 let client_id_res = new.map.vertex_id_batch(&parent_names).await?;
1126 assert_eq!(client_id_res.len(), parent_server_ids.len());
1127 for (res, &server_id) in client_id_res.into_iter().zip(parent_server_ids) {
1128 match res {
1129 Ok(id) if id.group() != Group::MASTER => {
1130 return Err(NeedSlowPath(format!(
1131 "{:?} exists id in local graph as {:?} - fast path requires MASTER group",
1132 &parent_names, id
1133 )));
1134 }
1135 Ok(id) => {
1136 parent_client_ids.push(id);
1137 }
1138 Err(crate::Error::VertexNotFound(_)) => {
1139 missng_parent_server_ids.push(server_id);
1140 }
1141 Err(e) => return Err(e),
1142 }
1143 }
1144 }
1145
1146 if !missng_parent_server_ids.is_empty() {
1147 stack.push(server_high);
1149 for &server_id in missng_parent_server_ids.iter().rev() {
1152 stack.push(server_id);
1153 }
1154 continue;
1155 }
1156
1157 let candidate_id = match parent_client_ids.iter().max().copied() {
1160 None => Group::MASTER.min_id(),
1161 Some(id) => id + 1,
1162 };
1163 let size = server_seg.high.0 - server_seg.low.0 + 1;
1164 let span = find_free_span(&taken, candidate_id, size, false);
1165
1166 for (&server_id, name) in server.idmap_by_id.range(server_seg.low..=server_seg.high)
1169 {
1170 let client_id = server_id + span.low.0 - server_seg.low.0;
1171 if client_id.group() != Group::MASTER {
1172 return Err(crate::Error::IdOverflow(Group::MASTER));
1173 }
1174 new.map.insert(client_id, name.as_ref()).await?;
1175 }
1176
1177 prepared_client_segments.push_segment(span.low, span.high, &parent_client_ids);
1179
1180 taken.push(span);
1182 }
1183
1184 if opts.reserve_size > 0 {
1186 let head_client_id = new.map.vertex_id(head).await?;
1187 let span = find_free_span(&taken, head_client_id + 1, opts.reserve_size as _, true);
1188 taken.push(span);
1189 }
1190 }
1191
1192 new.dag
1193 .build_segments_from_prepared_flat_segments(&prepared_client_segments)?;
1194
1195 new.invalidate_missing_vertex_cache();
1197
1198 if cfg!(debug_assertions) {
1199 new.verify_missing().await?;
1200 }
1201
1202 new.persist(lock, map_lock, dag_lock)?;
1203
1204 new.maybe_recreate_virtual_group().await?;
1206
1207 *self = new;
1208 Ok(())
1209 }
1210}
1211
1212#[async_trait::async_trait]
1213impl<IS, M, P, S> DagExportCloneData for AbstractDag<IdDag<IS>, M, P, S>
1214where
1215 IS: IdDagStore,
1216 IdDag<IS>: TryClone,
1217 M: IdConvert + TryClone + Send + Sync + 'static,
1218 P: TryClone + Send + Sync + 'static,
1219 S: TryClone + Send + Sync + 'static,
1220{
1221 async fn export_clone_data(&self) -> Result<CloneData<Vertex>> {
1222 let idmap: BTreeMap<Id, Vertex> = {
1223 let ids: Vec<Id> = self.dag.universal_ids()?.into_iter().collect();
1224 tracing::debug!("export: {} universally known vertexes", ids.len());
1225 let names = {
1226 let fallible_names = self.vertex_name_batch(&ids).await?;
1227 let mut names = Vec::with_capacity(fallible_names.len());
1228 for name in fallible_names {
1229 names.push(name?);
1230 }
1231 names
1232 };
1233 ids.into_iter().zip(names).collect()
1234 };
1235
1236 let flat_segments: PreparedFlatSegments = {
1237 let segments = self.dag.next_segments(Id::MIN, 0)?;
1238 let mut prepared = Vec::with_capacity(segments.len());
1239 for segment in segments {
1240 let span = segment.span()?;
1241 let parents = segment.parents()?;
1242 prepared.push(FlatSegment {
1243 low: span.low,
1244 high: span.high,
1245 parents,
1246 });
1247 }
1248 PreparedFlatSegments {
1249 segments: prepared.into_iter().collect(),
1250 }
1251 };
1252
1253 let data = CloneData {
1254 flat_segments,
1255 idmap,
1256 };
1257 Ok(data)
1258 }
1259}
1260
1261#[async_trait::async_trait]
1262impl<IS, M, P, S> DagExportPullData for AbstractDag<IdDag<IS>, M, P, S>
1263where
1264 IS: IdDagStore,
1265 IdDag<IS>: TryClone,
1266 M: IdConvert + TryClone + Send + Sync + 'static,
1267 P: TryClone + Send + Sync + 'static,
1268 S: TryClone + Send + Sync + 'static,
1269{
1270 async fn export_pull_data(&self, set: &Set) -> Result<CloneData<Vertex>> {
1271 let id_set = self.to_id_set(set).await?;
1272
1273 let flat_segments = self.dag.idset_to_flat_segments(id_set)?;
1274 let ids: Vec<_> = flat_segments.parents_head_and_roots().into_iter().collect();
1275
1276 let idmap: BTreeMap<Id, Vertex> = {
1277 tracing::debug!("pull: {} vertexes in idmap", ids.len());
1278 let names = {
1279 let fallible_names = self.vertex_name_batch(&ids).await?;
1280 let mut names = Vec::with_capacity(fallible_names.len());
1281 for name in fallible_names {
1282 names.push(name?);
1283 }
1284 names
1285 };
1286 assert_eq!(ids.len(), names.len());
1287 ids.into_iter().zip(names).collect()
1288 };
1289
1290 let data = CloneData {
1291 flat_segments,
1292 idmap,
1293 };
1294 Ok(data)
1295 }
1296}
1297
1298impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
1299where
1300 IS: IdDagStore,
1301 IdDag<IS>: TryClone,
1302 M: TryClone + Send + Sync,
1303 P: TryClone + Send + Sync,
1304 S: TryClone + Send + Sync,
1305{
1306 fn invalidate_snapshot(&mut self) {
1313 *self.snapshot.write().unwrap() = None;
1314 }
1315
1316 fn invalidate_missing_vertex_cache(&mut self) {
1317 tracing::debug!(target: "dag::cache", "cleared missing cache");
1318 *self.missing_vertexes_confirmed_by_remote.write().unwrap() = Default::default();
1319 }
1320
1321 fn invalidate_overlay_map(&mut self) -> Result<()> {
1322 self.overlay_map = Default::default();
1323 self.update_overlay_map_id_set()?;
1324 tracing::debug!(target: "dag::cache", "cleared overlay map cache");
1325 Ok(())
1326 }
1327
1328 fn update_overlay_map_id_set(&mut self) -> Result<()> {
1329 self.overlay_map_id_set = self.dag.master_group()?;
1330 Ok(())
1331 }
1332
1333 pub(crate) fn try_snapshot(&self) -> Result<Arc<Self>> {
1335 if let Some(s) = self.snapshot.read().unwrap().deref() {
1336 if s.dag.version() == self.dag.version() {
1337 return Ok(Arc::clone(s));
1338 }
1339 }
1340
1341 let mut snapshot = self.snapshot.write().unwrap();
1342 match snapshot.deref() {
1343 Some(s) if s.dag.version() == self.dag.version() => Ok(s.clone()),
1344 _ => {
1345 let cloned = Self {
1346 dag: self.dag.try_clone()?,
1347 map: self.map.try_clone()?,
1348 snapshot: Default::default(),
1349 pending_heads: self.pending_heads.clone(),
1350 persisted_id_set: self.persisted_id_set.clone(),
1351 path: self.path.try_clone()?,
1352 state: self.state.try_clone()?,
1353 id: self.id.clone(),
1354 overlay_map: Arc::clone(&self.overlay_map),
1357 overlay_map_id_set: self.overlay_map_id_set.clone(),
1358 overlay_map_paths: Arc::clone(&self.overlay_map_paths),
1359 remote_protocol: self.remote_protocol.clone(),
1360 managed_virtual_group: self.managed_virtual_group.clone(),
1361 missing_vertexes_confirmed_by_remote: Arc::clone(
1362 &self.missing_vertexes_confirmed_by_remote,
1363 ),
1364 lifecycle_id: self.lifecycle_id.clone(),
1365 internal_stats: Default::default(),
1366 };
1367 let result = Arc::new(cloned);
1368 *snapshot = Some(Arc::clone(&result));
1369 Ok(result)
1370 }
1371 }
1372 }
1373
1374 pub fn dag(&self) -> &IdDag<IS> {
1375 &self.dag
1376 }
1377
1378 pub fn map(&self) -> &M {
1379 &self.map
1380 }
1381
1382 pub(crate) fn get_remote_protocol(&self) -> Arc<dyn RemoteIdConvertProtocol> {
1383 self.remote_protocol.clone()
1384 }
1385}
1386
1387impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
1388where
1389 IS: IdDagStore,
1390 IdDag<IS>: TryClone,
1391 M: TryClone + IdMapAssignHead + Send + Sync + 'static,
1392 P: TryClone + Send + Sync + 'static,
1393 S: TryClone + Send + Sync + 'static,
1394{
1395 async fn populate_missing_vertexes_for_add_heads(
1396 &mut self,
1397 parents: &dyn Parents,
1398 heads: &[Vertex],
1399 ) -> Result<()> {
1400 if self.is_vertex_lazy() {
1401 let unassigned = calculate_definitely_unassigned_vertexes(self, parents, heads).await?;
1402 let mut missing = self.missing_vertexes_confirmed_by_remote.write().unwrap();
1403 for v in unassigned {
1404 if missing.insert(v.clone()) {
1405 tracing::trace!(target: "dag::cache", "cached missing {:?} (definitely missing)", &v);
1406 }
1407 }
1408 }
1409 Ok(())
1410 }
1411}
1412
1413async fn calculate_definitely_unassigned_vertexes<IS, M, P, S>(
1424 this: &AbstractDag<IdDag<IS>, M, P, S>,
1425 parents: &dyn Parents,
1426 heads: &[Vertex],
1427) -> Result<Vec<Vertex>>
1428where
1429 IS: IdDagStore,
1430 IdDag<IS>: TryClone,
1431 M: TryClone + IdMapAssignHead + Send + Sync + 'static,
1432 P: TryClone + Send + Sync + 'static,
1433 S: TryClone + Send + Sync + 'static,
1434{
1435 let subdag = parents.hint_subdag_for_insertion(heads).await?;
1448
1449 let mut remaining = subdag.all().await?;
1450 let mut unassigned = Set::empty();
1451
1452 let mut unassigned_roots = Vec::new();
1470 if this.is_vertex_lazy() {
1471 let roots = subdag.roots(remaining.clone()).await?;
1472 let mut roots_iter = roots.iter().await?;
1473 while let Some(root) = roots_iter.next().await {
1474 let root = root?;
1475
1476 if matches!(
1478 &this.contains_vertex_name_locally(&[root.clone()]).await?[..],
1479 [true]
1480 ) {
1481 tracing::debug!(target: "dag::definitelymissing", "root {:?} is already known", &root);
1482 continue;
1483 }
1484
1485 let root_parents_id_set = {
1486 let root_parents = parents.parent_names(root.clone()).await?;
1487 let root_parents_set = match this.sort(&Set::from_static_names(root_parents)).await
1488 {
1489 Ok(set) => set,
1490 Err(_) => {
1491 tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear (parents cannot be resolved)", &root);
1492 continue;
1493 }
1494 };
1495 this.to_id_set(&root_parents_set).await?
1496 };
1497
1498 if root_parents_id_set.is_empty() {
1501 tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear (no parents)", &root);
1502 continue;
1503 }
1504
1505 if root_parents_id_set
1509 .iter_desc()
1510 .all(|i| i.group() > Group::MASTER)
1511 {
1512 tracing::debug!(target: "dag::definitelymissing", "root {:?} is not assigned (non-lazy parent)", &root);
1513 unassigned_roots.push(root);
1514 continue;
1515 }
1516
1517 let children_ids: Vec<Id> = this
1520 .dag
1521 .children(root_parents_id_set)?
1522 .iter_desc()
1523 .collect();
1524 if this
1525 .map
1526 .contains_vertex_id_locally(&children_ids)
1527 .await?
1528 .iter()
1529 .all(|b| *b)
1530 {
1531 tracing::debug!(target: "dag::definitelymissing", "root {:?} is not assigned (children of parents are known)", &root);
1532 unassigned_roots.push(root);
1533 continue;
1534 }
1535
1536 tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear", &root);
1537 }
1538
1539 if !unassigned_roots.is_empty() {
1540 unassigned = subdag
1541 .descendants(Set::from_static_names(unassigned_roots))
1542 .await?;
1543 remaining = remaining.difference(&unassigned);
1544 }
1545 }
1546
1547 let filter_known = |sample: &[Vertex]| -> BoxFuture<Result<Vec<Vertex>>> {
1550 let sample = sample.to_vec();
1551 async {
1552 let known_bools: Vec<bool> = {
1553 let ids = this.vertex_id_batch(&sample).await?;
1554 ids.into_iter().map(|i| i.is_ok()).collect()
1555 };
1556 debug_assert_eq!(sample.len(), known_bools.len());
1557 let known = sample
1558 .into_iter()
1559 .zip(known_bools)
1560 .filter_map(|(v, b)| if b { Some(v) } else { None })
1561 .collect();
1562 Ok(known)
1563 }
1564 .boxed()
1565 };
1566 let assigned = utils::filter_known(remaining.clone(), &filter_known).await?;
1567 unassigned = unassigned.union(&remaining.difference(&assigned));
1568 tracing::debug!(target: "dag::definitelymissing", "unassigned (missing): {:?}", &unassigned);
1569
1570 let unassigned = unassigned.iter().await?.try_collect().await?;
1571 Ok(unassigned)
1572}
1573
1574impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
1576where
1577 IS: IdDagStore,
1578 IdDag<IS>: TryClone,
1579 M: IdConvert + TryClone + Send + Sync,
1580 P: TryClone + Send + Sync,
1581 S: TryClone + Send + Sync,
1582{
1583 async fn resolve_vertexes_remotely(&self, names: &[Vertex]) -> Result<Vec<Option<Id>>> {
1586 if names.is_empty() {
1587 return Ok(Vec::new());
1588 }
1589 if is_remote_protocol_disabled() {
1590 return Err(io::Error::new(
1591 io::ErrorKind::WouldBlock,
1592 "resolving vertexes remotely disabled",
1593 )
1594 .into());
1595 }
1596 if names.len() < 30 {
1597 tracing::debug!(target: "dag::protocol", "resolve names {:?} remotely", &names);
1598 } else {
1599 tracing::debug!(target: "dag::protocol", "resolve names ({}) remotely", names.len());
1600 }
1601 crate::failpoint!("dag-resolve-vertexes-remotely");
1602 let request: protocol::RequestNameToLocation =
1603 (self.map(), self.dag()).process(names.to_vec()).await?;
1604 let path_names = self
1605 .remote_protocol
1606 .resolve_names_to_relative_paths(request.heads, request.names)
1607 .await?;
1608 self.insert_relative_paths(path_names).await?;
1609 let overlay = self.overlay_map.read().unwrap();
1610 let mut ids = Vec::with_capacity(names.len());
1611 let mut missing = self.missing_vertexes_confirmed_by_remote.write().unwrap();
1612 for name in names {
1613 if let Some(id) = overlay.lookup_vertex_id(name) {
1614 ids.push(Some(id));
1615 } else {
1616 tracing::trace!(target: "dag::cache", "cached missing {:?} (server confirmed)", &name);
1617 missing.insert(name.clone());
1618 ids.push(None);
1619 }
1620 }
1621 Ok(ids)
1622 }
1623
1624 async fn resolve_ids_remotely(&self, ids: &[Id]) -> Result<Vec<Vertex>> {
1627 if ids.is_empty() {
1628 return Ok(Vec::new());
1629 }
1630 if is_remote_protocol_disabled() {
1631 return Err(io::Error::new(
1632 io::ErrorKind::WouldBlock,
1633 "resolving ids remotely disabled",
1634 )
1635 .into());
1636 }
1637 if ids.len() < 30 {
1638 tracing::debug!(target: "dag::protocol", "resolve ids {:?} remotely", &ids);
1639 } else {
1640 tracing::debug!(target: "dag::protocol", "resolve ids ({}) remotely", ids.len());
1641 }
1642 crate::failpoint!("dag-resolve-ids-remotely");
1643 let request: protocol::RequestLocationToName = (self.map(), self.dag())
1644 .process(IdSet::from_spans(ids.iter().copied()))
1645 .await?;
1646 let path_names = self
1647 .remote_protocol
1648 .resolve_relative_paths_to_names(request.paths)
1649 .await?;
1650 self.insert_relative_paths(path_names).await?;
1651 let overlay = self.overlay_map.read().unwrap();
1652 let mut names = Vec::with_capacity(ids.len());
1653 for &id in ids {
1654 if let Some(name) = overlay.lookup_vertex_name(id).cloned() {
1655 names.push(name);
1656 } else {
1657 return id.not_found();
1658 }
1659 }
1660 Ok(names)
1661 }
1662
1663 async fn insert_relative_paths(
1665 &self,
1666 path_names: Vec<(AncestorPath, Vec<Vertex>)>,
1667 ) -> Result<()> {
1668 if path_names.is_empty() {
1669 return Ok(());
1670 }
1671 let to_insert: Vec<(Id, Vertex)> = calculate_id_name_from_paths(
1672 self.map(),
1673 self.dag().deref(),
1674 &self.overlay_map_id_set,
1675 &path_names,
1676 )
1677 .await?;
1678
1679 let mut paths = self.overlay_map_paths.lock().unwrap();
1680 paths.extend(path_names);
1681 drop(paths);
1682
1683 let mut overlay = self.overlay_map.write().unwrap();
1684 for (id, name) in to_insert {
1685 tracing::trace!(target: "dag::cache", "cached mapping {:?} <=> {:?}", id, &name);
1686 overlay.insert_vertex_id_name(id, name);
1687 }
1688
1689 Ok(())
1690 }
1691}
1692
1693async fn calculate_id_name_from_paths(
1695 map: &dyn IdConvert,
1696 dag: &dyn IdDagAlgorithm,
1697 overlay_map_id_set: &IdSet,
1698 path_names: &[(AncestorPath, Vec<Vertex>)],
1699) -> Result<Vec<(Id, Vertex)>> {
1700 if path_names.is_empty() {
1701 return Ok(Vec::new());
1702 }
1703 let mut to_insert: Vec<(Id, Vertex)> =
1704 Vec::with_capacity(path_names.iter().map(|(_, ns)| ns.len()).sum());
1705 for (path, names) in path_names {
1706 if names.is_empty() {
1707 continue;
1708 }
1709 let x_id = map.vertex_id(path.x.clone()).await.map_err(|e| {
1711 let msg = format!(
1712 concat!(
1713 "Cannot resolve x ({:?}) in x~n locally. The x is expected to be known ",
1714 "locally and is populated at clone time. This x~n is used to convert ",
1715 "{:?} to a location in the graph. (Check initial clone logic) ",
1716 "(Error: {})",
1717 ),
1718 &path.x, &names[0], e
1719 );
1720 crate::Error::Programming(msg)
1721 })?;
1722 tracing::trace!(
1723 "resolve path {:?} names {:?} (x = {}) to overlay",
1724 &path,
1725 &names,
1726 x_id
1727 );
1728 if !overlay_map_id_set.contains(x_id) {
1729 crate::failpoint!("dag-error-x-n-overflow");
1730 let msg = format!(
1731 concat!(
1732 "Server returned x~n (x = {:?} {}, n = {}). But x is out of range ",
1733 "({:?}). This is not expected and indicates some ",
1734 "logic error on the server side."
1735 ),
1736 &path.x, x_id, path.n, overlay_map_id_set
1737 );
1738 return programming(msg);
1739 }
1740 let mut id = match dag.first_ancestor_nth(x_id, path.n).map_err(|e| {
1741 let msg = format!(
1742 concat!(
1743 "Cannot resolve x~n (x = {:?} {}, n = {}): {}. ",
1744 "This indicates the client-side graph is somewhat incompatible from the ",
1745 "server-side graph. Something (server-side or client-side) was probably ",
1746 "seriously wrong before this error."
1747 ),
1748 &path.x, x_id, path.n, e
1749 );
1750 crate::Error::Programming(msg)
1751 }) {
1752 Err(e) => {
1753 crate::failpoint!("dag-error-x-n-unresolvable");
1754 return Err(e);
1755 }
1756 Ok(id) => id,
1757 };
1758 if names.len() < 30 {
1759 tracing::debug!("resolved {:?} => {} {:?}", &path, id, &names);
1760 } else {
1761 tracing::debug!("resolved {:?} => {} {:?} ...", &path, id, &names[0]);
1762 }
1763 for (i, name) in names.iter().enumerate() {
1764 if i > 0 {
1765 id = match dag.parent_ids(id)?.first().cloned() {
1767 Some(id) => id,
1768 None => {
1769 let msg = format!(
1770 concat!(
1771 "Cannot resolve x~(n+i) (x = {:?} {}, n = {}, i = {}) locally. ",
1772 "This indicates the client-side graph is somewhat incompatible ",
1773 "from the server-side graph. Something (server-side or ",
1774 "client-side) was probably seriously wrong before this error."
1775 ),
1776 &path.x, x_id, path.n, i
1777 );
1778 return programming(msg);
1779 }
1780 }
1781 }
1782
1783 tracing::trace!(" resolved {:?} = {:?}", id, &name,);
1784 to_insert.push((id, name.clone()));
1785 }
1786 }
1787 Ok(to_insert)
1788}
1789
1790#[async_trait::async_trait]
1793impl<IS, M, P, S> RemoteIdConvertProtocol for AbstractDag<IdDag<IS>, M, P, S>
1794where
1795 IS: IdDagStore,
1796 IdDag<IS>: TryClone,
1797 M: IdConvert + TryClone + Send + Sync + 'static,
1798 P: TryClone + Send + Sync + 'static,
1799 S: TryClone + Send + Sync + 'static,
1800{
1801 async fn resolve_names_to_relative_paths(
1802 &self,
1803 heads: Vec<Vertex>,
1804 names: Vec<Vertex>,
1805 ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
1806 let request = protocol::RequestNameToLocation { names, heads };
1807 let response: protocol::ResponseIdNamePair =
1808 (self.map(), self.dag()).process(request).await?;
1809 Ok(response.path_names)
1810 }
1811
1812 async fn resolve_relative_paths_to_names(
1813 &self,
1814 paths: Vec<AncestorPath>,
1815 ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
1816 let request = protocol::RequestLocationToName { paths };
1817 let response: protocol::ResponseIdNamePair =
1818 (self.map(), self.dag()).process(request).await?;
1819 Ok(response.path_names)
1820 }
1821}
1822
1823#[async_trait::async_trait]
1825impl<IS, M, P, S> RemoteIdConvertProtocol for Arc<AbstractDag<IdDag<IS>, M, P, S>>
1826where
1827 IS: IdDagStore,
1828 IdDag<IS>: TryClone,
1829 M: IdConvert + TryClone + Send + Sync + 'static,
1830 P: TryClone + Send + Sync + 'static,
1831 S: TryClone + Send + Sync + 'static,
1832{
1833 async fn resolve_names_to_relative_paths(
1834 &self,
1835 heads: Vec<Vertex>,
1836 names: Vec<Vertex>,
1837 ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
1838 self.deref()
1839 .resolve_names_to_relative_paths(heads, names)
1840 .await
1841 }
1842
1843 async fn resolve_relative_paths_to_names(
1844 &self,
1845 paths: Vec<AncestorPath>,
1846 ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
1847 self.deref().resolve_relative_paths_to_names(paths).await
1848 }
1849}
1850
1851#[async_trait::async_trait]
1856impl<IS, M, P, S> DagAlgorithm for AbstractDag<IdDag<IS>, M, P, S>
1857where
1858 IS: IdDagStore,
1859 IdDag<IS>: TryClone + 'static,
1860 M: TryClone + IdConvert + Sync + Send + 'static,
1861 P: TryClone + Sync + Send + 'static,
1862 S: TryClone + Sync + Send + 'static,
1863{
1864 async fn sort(&self, set: &Set) -> Result<Set> {
1866 let hints = set.hints();
1867 if hints.contains(Flags::TOPO_DESC)
1868 && matches!(hints.dag_version(), Some(v) if v <= self.dag_version())
1869 && matches!(hints.id_map_version(), Some(v) if v <= self.map_version())
1870 {
1871 tracing::debug!(target: "dag::algo::sort", "sort({:6?}) (fast path)", set);
1872 return Ok(set.clone());
1873 } else if let Some(flat_set) = set.specialized_flatten_id() {
1874 let dag_version = flat_set.dag.dag_version();
1875 if dag_version <= self.dag_version() {
1876 let mut flat_set = flat_set.into_owned();
1877 flat_set.set_iteration_order(BasicIterationOrder::Desc);
1878 flat_set.map = self.id_map_snapshot()?;
1879 flat_set.dag = self.dag_snapshot()?;
1880 tracing::debug!(target: "dag::algo::sort", "sort({:6?}) (fast path 2)", set);
1881 return Ok(Set::from_query(flat_set));
1882 } else {
1883 tracing::info!(target: "dag::algo::sort", "sort({:6?}) (cannot use fast path 2 due to mismatched version)", set);
1884 }
1885 }
1886 tracing::warn!(target: "dag::algo::sort", "sort({:6?}) (slow path)", set);
1887 self.internal_stats
1888 .sort_slow_path_count
1889 .fetch_add(1, Ordering::Release);
1890 let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1891 let mut spans = IdSet::empty();
1892 let mut iter = set.iter().await?.chunks(1 << 17);
1893 while let Some(names) = iter.next().await {
1894 let names = names.into_iter().collect::<Result<Vec<_>>>()?;
1895 let ids = self.vertex_id_batch(&names).await?;
1896 for id in ids {
1897 spans.push(id?);
1898 }
1899 }
1900 let result = Set::from_id_set_dag(spans, self)?;
1901 result.hints().add_flags(flags);
1902 Ok(result)
1903 }
1904
1905 async fn parent_names(&self, name: Vertex) -> Result<Vec<Vertex>> {
1907 let id = self.vertex_id(name).await?;
1908 let parent_ids = self.dag().parent_ids(id)?;
1909 let mut result = Vec::with_capacity(parent_ids.len());
1910 for id in parent_ids {
1911 result.push(self.vertex_name(id).await?);
1912 }
1913 Ok(result)
1914 }
1915
1916 async fn all(&self) -> Result<Set> {
1919 let spans = self.dag().all()?;
1920 let result = Set::from_id_set_dag(spans, self)?;
1921 result.hints().add_flags(Flags::FULL);
1922 Ok(result)
1923 }
1924
1925 async fn master_group(&self) -> Result<Set> {
1927 let spans = self.dag().master_group()?;
1928 let result = Set::from_id_set_dag(spans, self)?;
1929 result.hints().add_flags(Flags::ANCESTORS);
1930 Ok(result)
1931 }
1932
1933 async fn virtual_group(&self) -> Result<Set> {
1935 let spans = self.dag().all_ids_in_groups(&[Group::VIRTUAL])?;
1936 let result = Set::from_id_set_dag(spans, self)?;
1937 Ok(result)
1938 }
1939
1940 async fn ancestors(&self, set: Set) -> Result<Set> {
1942 if set.hints().contains(Flags::ANCESTORS)
1943 && set.hints().dag_version() <= Some(self.dag_version())
1944 {
1945 return Ok(set);
1946 }
1947 let spans = self.to_id_set(&set).await?;
1948 let spans = self.dag().ancestors(spans)?;
1949 let result = Set::from_id_set_dag(spans, self)?;
1950 result.hints().add_flags(Flags::ANCESTORS);
1951 Ok(result)
1952 }
1953
1954 async fn first_ancestors(&self, set: Set) -> Result<Set> {
1956 if set.hints().contains(Flags::ANCESTORS)
1958 && set.hints().dag_version() <= Some(self.dag_version())
1959 {
1960 return Ok(set);
1961 }
1962 let spans = self.to_id_set(&set).await?;
1963 let spans = self.dag().first_ancestors(spans)?;
1964 let result = Set::from_id_set_dag(spans, self)?;
1965 #[cfg(test)]
1966 {
1967 result.assert_eq(crate::default_impl::first_ancestors(self, set).await?);
1968 }
1969 Ok(result)
1970 }
1971
1972 async fn merges(&self, set: Set) -> Result<Set> {
1974 let spans = self.to_id_set(&set).await?;
1975 let spans = self.dag().merges(spans)?;
1976 let result = Set::from_id_set_dag(spans, self)?;
1977 #[cfg(test)]
1978 {
1979 result.assert_eq(crate::default_impl::merges(self, set).await?);
1980 }
1981 Ok(result)
1982 }
1983
1984 async fn parents(&self, set: Set) -> Result<Set> {
1989 let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1991 let spans = self.dag().parents(self.to_id_set(&set).await?)?;
1992 let result = Set::from_id_set_dag(spans, self)?;
1993 result.hints().add_flags(flags);
1994 #[cfg(test)]
1995 {
1996 result.assert_eq(crate::default_impl::parents(self, set).await?);
1997 }
1998 Ok(result)
1999 }
2000
2001 async fn first_ancestor_nth(&self, name: Vertex, n: u64) -> Result<Option<Vertex>> {
2003 #[cfg(test)]
2004 let name2 = name.clone();
2005 let id = self.vertex_id(name).await?;
2006 let id = self.dag().try_first_ancestor_nth(id, n)?;
2007 let result = match id {
2008 None => None,
2009 Some(id) => Some(self.vertex_name(id).await?),
2010 };
2011 #[cfg(test)]
2012 {
2013 let result2 = crate::default_impl::first_ancestor_nth(self, name2, n).await?;
2014 assert_eq!(result, result2);
2015 }
2016 Ok(result)
2017 }
2018
2019 async fn heads(&self, set: Set) -> Result<Set> {
2021 if set.hints().contains(Flags::ANCESTORS)
2022 && set.hints().dag_version() <= Some(self.dag_version())
2023 {
2024 return self.heads_ancestors(set).await;
2026 }
2027 let spans = self.dag().heads(self.to_id_set(&set).await?)?;
2028 let result = Set::from_id_set_dag(spans, self)?;
2029 #[cfg(test)]
2030 {
2031 result.assert_eq(crate::default_impl::heads(self, set).await?);
2032 }
2033 Ok(result)
2034 }
2035
2036 async fn children(&self, set: Set) -> Result<Set> {
2038 let spans = self.dag().children(self.to_id_set(&set).await?)?;
2039 let result = Set::from_id_set_dag(spans, self)?;
2040 Ok(result)
2041 }
2042
2043 async fn roots(&self, set: Set) -> Result<Set> {
2045 let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
2046 let spans = self.dag().roots(self.to_id_set(&set).await?)?;
2047 let result = Set::from_id_set_dag(spans, self)?;
2048 result.hints().add_flags(flags);
2049 #[cfg(test)]
2050 {
2051 result.assert_eq(crate::default_impl::roots(self, set).await?);
2052 }
2053 Ok(result)
2054 }
2055
2056 async fn gca_one(&self, set: Set) -> Result<Option<Vertex>> {
2062 let result: Option<Vertex> = match self.dag().gca_one(self.to_id_set(&set).await?)? {
2063 None => None,
2064 Some(id) => Some(self.vertex_name(id).await?),
2065 };
2066 #[cfg(test)]
2067 {
2068 assert_eq!(&result, &crate::default_impl::gca_one(self, set).await?);
2069 }
2070 Ok(result)
2071 }
2072
2073 async fn gca_all(&self, set: Set) -> Result<Set> {
2076 let spans = self.dag().gca_all(self.to_id_set(&set).await?)?;
2077 let result = Set::from_id_set_dag(spans, self)?;
2078 #[cfg(test)]
2079 {
2080 result.assert_eq(crate::default_impl::gca_all(self, set).await?);
2081 }
2082 Ok(result)
2083 }
2084
2085 async fn common_ancestors(&self, set: Set) -> Result<Set> {
2087 let spans = self.dag().common_ancestors(self.to_id_set(&set).await?)?;
2088 let result = Set::from_id_set_dag(spans, self)?;
2089 result.hints().add_flags(Flags::ANCESTORS);
2090 #[cfg(test)]
2091 {
2092 result.assert_eq(crate::default_impl::common_ancestors(self, set).await?);
2093 }
2094 Ok(result)
2095 }
2096
2097 async fn is_ancestor(&self, ancestor: Vertex, descendant: Vertex) -> Result<bool> {
2099 #[cfg(test)]
2100 let result2 =
2101 crate::default_impl::is_ancestor(self, ancestor.clone(), descendant.clone()).await?;
2102 let ancestor_id = self.vertex_id(ancestor).await?;
2103 let descendant_id = self.vertex_id(descendant).await?;
2104 let result = self.dag().is_ancestor(ancestor_id, descendant_id)?;
2105 #[cfg(test)]
2106 {
2107 assert_eq!(&result, &result2);
2108 }
2109 Ok(result)
2110 }
2111
2112 async fn heads_ancestors(&self, set: Set) -> Result<Set> {
2122 let spans = self.dag().heads_ancestors(self.to_id_set(&set).await?)?;
2123 let result = Set::from_id_set_dag(spans, self)?;
2124 #[cfg(test)]
2125 {
2126 if !set.hints().contains(Flags::ANCESTORS) {
2129 result.assert_eq(crate::default_impl::heads_ancestors(self, set).await?);
2130 }
2131 }
2132 Ok(result)
2133 }
2134
2135 async fn range(&self, roots: Set, heads: Set) -> Result<Set> {
2137 let roots = self.to_id_set(&roots).await?;
2138 let heads = self.to_id_set(&heads).await?;
2139 let spans = self.dag().range(roots, heads)?;
2140 let result = Set::from_id_set_dag(spans, self)?;
2141 Ok(result)
2142 }
2143
2144 async fn descendants(&self, set: Set) -> Result<Set> {
2146 let spans = self.dag().descendants(self.to_id_set(&set).await?)?;
2147 let result = Set::from_id_set_dag(spans, self)?;
2148 Ok(result)
2149 }
2150
2151 async fn suggest_bisect(
2152 &self,
2153 roots: Set,
2154 heads: Set,
2155 skip: Set,
2156 ) -> Result<(Option<Vertex>, Set, Set)> {
2157 default_impl::suggest_bisect(self, roots, heads, skip).await
2158 }
2159
2160 async fn dirty(&self) -> Result<Set> {
2162 let all = self.dag().all()?;
2163 let spans = all.difference(&self.persisted_id_set);
2164 let set = Set::from_id_set_dag(spans, self)?;
2165 Ok(set)
2166 }
2167
2168 fn is_vertex_lazy(&self) -> bool {
2169 !self.remote_protocol.is_local()
2170 }
2171
2172 fn dag_snapshot(&self) -> Result<Arc<dyn DagAlgorithm + Send + Sync>> {
2174 Ok(self.try_snapshot()? as Arc<dyn DagAlgorithm + Send + Sync>)
2175 }
2176
2177 fn id_dag_snapshot(&self) -> Result<Arc<dyn IdDagAlgorithm + Send + Sync>> {
2178 let store = self.dag.try_clone()?.store;
2179 Ok(Arc::new(store))
2180 }
2181
2182 fn dag_id(&self) -> &str {
2183 &self.id
2184 }
2185
2186 fn dag_version(&self) -> &VerLink {
2187 self.dag.version()
2188 }
2189}
2190
2191fn extract_ancestor_flag_if_compatible(hints: &Hints, dag_version: &VerLink) -> Flags {
2194 if hints.dag_version() <= Some(dag_version) {
2195 hints.flags() & Flags::ANCESTORS
2196 } else {
2197 Flags::empty()
2198 }
2199}
2200
2201#[async_trait::async_trait]
2202impl<I, M, P, S> PrefixLookup for AbstractDag<I, M, P, S>
2203where
2204 I: Send + Sync,
2205 M: PrefixLookup + Send + Sync,
2206 P: Send + Sync,
2207 S: Send + Sync,
2208{
2209 async fn vertexes_by_hex_prefix(&self, hex_prefix: &[u8], limit: usize) -> Result<Vec<Vertex>> {
2210 let mut list = self.map.vertexes_by_hex_prefix(hex_prefix, limit).await?;
2211 let overlay_list = self
2212 .overlay_map
2213 .read()
2214 .unwrap()
2215 .lookup_vertexes_by_hex_prefix(hex_prefix, limit)?;
2216 list.extend(overlay_list);
2217 list.sort_unstable();
2218 list.dedup();
2219 list.truncate(limit);
2220 Ok(list)
2221 }
2222}
2223
2224#[async_trait::async_trait]
2225impl<IS, M, P, S> IdConvert for AbstractDag<IdDag<IS>, M, P, S>
2226where
2227 IS: IdDagStore,
2228 IdDag<IS>: TryClone,
2229 M: IdConvert + TryClone + Send + Sync + 'static,
2230 P: TryClone + Send + Sync + 'static,
2231 S: TryClone + Send + Sync + 'static,
2232{
2233 async fn vertex_id(&self, name: Vertex) -> Result<Id> {
2234 match self.map.vertex_id(name.clone()).await {
2235 Ok(id) => Ok(id),
2236 Err(crate::Error::VertexNotFound(_)) if self.is_vertex_lazy() => {
2237 if let Some(id) = self.overlay_map.read().unwrap().lookup_vertex_id(&name) {
2238 return Ok(id);
2239 }
2240 if self
2241 .missing_vertexes_confirmed_by_remote
2242 .read()
2243 .unwrap()
2244 .contains(&name)
2245 {
2246 return name.not_found();
2247 }
2248 let ids = self.resolve_vertexes_remotely(&[name.clone()]).await?;
2249 if let Some(Some(id)) = ids.first() {
2250 Ok(*id)
2251 } else {
2252 name.not_found()
2254 }
2255 }
2256 Err(e) => Err(e),
2257 }
2258 }
2259
2260 async fn vertex_id_with_max_group(
2261 &self,
2262 name: &Vertex,
2263 max_group: Group,
2264 ) -> Result<Option<Id>> {
2265 match self.map.vertex_id_with_max_group(name, max_group).await {
2266 Ok(Some(id)) => Ok(Some(id)),
2267 Err(err) => Err(err),
2268 Ok(None) if self.is_vertex_lazy() => {
2269 if let Some(id) = self.overlay_map.read().unwrap().lookup_vertex_id(name) {
2271 return Ok(Some(id));
2272 }
2273 if self
2274 .missing_vertexes_confirmed_by_remote
2275 .read()
2276 .unwrap()
2277 .contains(name)
2278 {
2279 return Ok(None);
2280 }
2281 if max_group != Group::MAX
2282 && self
2283 .map
2284 .vertex_id_with_max_group(name, Group::MAX)
2285 .await?
2286 .is_some()
2287 {
2288 return Ok(None);
2291 }
2292 match self.resolve_vertexes_remotely(&[name.clone()]).await {
2293 Ok(ids) => match ids.first() {
2294 Some(Some(id)) => Ok(Some(*id)),
2295 Some(None) | None => Ok(None),
2296 },
2297 Err(e) => Err(e),
2298 }
2299 }
2300 Ok(None) => Ok(None),
2301 }
2302 }
2303
2304 async fn vertex_name(&self, id: Id) -> Result<Vertex> {
2305 match self.map.vertex_name(id).await {
2306 Ok(name) => Ok(name),
2307 Err(crate::Error::IdNotFound(_)) if self.is_vertex_lazy() => {
2308 if let Some(name) = self
2309 .overlay_map
2310 .read()
2311 .unwrap()
2312 .lookup_vertex_name(id)
2313 .cloned()
2314 {
2315 return Ok(name);
2316 }
2317 let max_master_id = self.dag.master_group()?.max();
2319 if Some(id) > max_master_id {
2320 return id.not_found();
2321 }
2322 let names = self.resolve_ids_remotely(&[id]).await?;
2323 if let Some(name) = names.into_iter().next() {
2324 Ok(name)
2325 } else {
2326 id.not_found()
2327 }
2328 }
2329 Err(e) => Err(e),
2330 }
2331 }
2332
2333 async fn contains_vertex_name(&self, name: &Vertex) -> Result<bool> {
2334 match self.map.contains_vertex_name(name).await {
2335 Ok(true) => Ok(true),
2336 Ok(false) if self.is_vertex_lazy() => {
2337 if self
2338 .overlay_map
2339 .read()
2340 .unwrap()
2341 .lookup_vertex_id(name)
2342 .is_some()
2343 {
2344 return Ok(true);
2345 }
2346 if self
2347 .missing_vertexes_confirmed_by_remote
2348 .read()
2349 .unwrap()
2350 .contains(name)
2351 {
2352 return Ok(false);
2353 }
2354 match self.resolve_vertexes_remotely(&[name.clone()]).await {
2355 Ok(ids) => match ids.first() {
2356 Some(Some(_)) => Ok(true),
2357 Some(None) | None => Ok(false),
2358 },
2359 Err(e) => Err(e),
2360 }
2361 }
2362 Ok(false) => Ok(false),
2363 Err(e) => Err(e),
2364 }
2365 }
2366
2367 async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
2368 let mut list = self.map.contains_vertex_id_locally(ids).await?;
2369 let map = self.overlay_map.read().unwrap();
2370 for (b, id) in list.iter_mut().zip(ids.iter().copied()) {
2371 if !*b {
2372 *b = *b || map.has_vertex_id(id);
2373 }
2374 }
2375 Ok(list)
2376 }
2377
2378 async fn contains_vertex_name_locally(&self, names: &[Vertex]) -> Result<Vec<bool>> {
2379 tracing::trace!("contains_vertex_name_locally names: {:?}", &names);
2380 let mut list = self.map.contains_vertex_name_locally(names).await?;
2381 tracing::trace!("contains_vertex_name_locally list (local): {:?}", &list);
2382 assert_eq!(list.len(), names.len());
2383 let map = self.overlay_map.read().unwrap();
2384 for (b, name) in list.iter_mut().zip(names.iter()) {
2385 if !*b && map.has_vertex_name(name) {
2386 tracing::trace!("contains_vertex_name_locally overlay has {:?}", &name);
2387 *b = true;
2388 }
2389 }
2390 Ok(list)
2391 }
2392
2393 async fn vertex_name_batch(&self, ids: &[Id]) -> Result<Vec<Result<Vertex>>> {
2394 let mut list = self.map.vertex_name_batch(ids).await?;
2395 if self.is_vertex_lazy() {
2396 {
2398 let map = self.overlay_map.read().unwrap();
2399 for (r, id) in list.iter_mut().zip(ids) {
2400 if let Some(name) = map.lookup_vertex_name(*id).cloned() {
2401 *r = Ok(name);
2402 }
2403 }
2404 }
2405 let missing_indexes: Vec<usize> = {
2407 let max_master_id = self.dag.master_group()?.max();
2408 list.iter()
2409 .enumerate()
2410 .filter_map(|(i, r)| match r {
2411 Err(_) if Some(ids[i]) <= max_master_id => Some(i),
2413 Err(_) | Ok(_) => None,
2414 })
2415 .collect()
2416 };
2417 let missing_ids: Vec<Id> = missing_indexes.iter().map(|i| ids[*i]).collect();
2418 let resolved = self.resolve_ids_remotely(&missing_ids).await?;
2419 for (i, name) in missing_indexes.into_iter().zip(resolved.into_iter()) {
2420 list[i] = Ok(name);
2421 }
2422 }
2423 Ok(list)
2424 }
2425
2426 async fn vertex_id_batch(&self, names: &[Vertex]) -> Result<Vec<Result<Id>>> {
2427 let mut list = self.map.vertex_id_batch(names).await?;
2428 if self.is_vertex_lazy() {
2429 {
2431 let map = self.overlay_map.read().unwrap();
2432 for (r, name) in list.iter_mut().zip(names) {
2433 if let Some(id) = map.lookup_vertex_id(name) {
2434 *r = Ok(id);
2435 }
2436 }
2437 }
2438 let missing_indexes: Vec<usize> = {
2440 let known_missing = self.missing_vertexes_confirmed_by_remote.read().unwrap();
2441 list.iter()
2442 .enumerate()
2443 .filter_map(|(i, r)| {
2444 if r.is_err() && !known_missing.contains(&names[i]) {
2445 Some(i)
2446 } else {
2447 None
2448 }
2449 })
2450 .collect()
2451 };
2452 if !missing_indexes.is_empty() {
2453 let missing_names: Vec<Vertex> =
2454 missing_indexes.iter().map(|i| names[*i].clone()).collect();
2455 let resolved = self.resolve_vertexes_remotely(&missing_names).await?;
2456 for (i, id) in missing_indexes.into_iter().zip(resolved.into_iter()) {
2457 if let Some(id) = id {
2458 list[i] = Ok(id);
2459 }
2460 }
2461 }
2462 }
2463 Ok(list)
2464 }
2465
2466 fn map_id(&self) -> &str {
2467 self.map.map_id()
2468 }
2469
2470 fn map_version(&self) -> &VerLink {
2471 self.map.map_version()
2472 }
2473}
2474
2475impl<IS, M, P, S> AbstractDag<IdDag<IS>, M, P, S>
2476where
2477 IS: IdDagStore,
2478 IdDag<IS>: TryClone + 'static,
2479 M: TryClone + Persist + IdMapWrite + IdConvert + Sync + Send + 'static,
2480 P: TryClone + Sync + Send + 'static,
2481 S: TryClone + Sync + Send + 'static,
2482{
2483 async fn build_with_lock(
2506 &mut self,
2507 parents: &dyn Parents,
2508 heads: &VertexListWithOptions,
2509 map_lock: &M::Lock,
2510 ) -> Result<()> {
2511 enum Input<'a> {
2513 Borrowed(&'a dyn Parents, &'a VertexListWithOptions),
2514 Owned(Box<dyn Parents>, VertexListWithOptions),
2515 }
2516
2517 let mut stack = vec![Input::Borrowed(parents, heads)];
2519
2520 let mut loop_count = 0;
2522
2523 while let Some(input) = stack.pop() {
2524 loop_count += 1;
2525 if loop_count > 2 {
2526 return bug("should not loop > 2 times (1st insertion+strip, 2nd reinsert)");
2527 }
2528
2529 let (parents, heads) = match &input {
2530 Input::Borrowed(p, h) => (*p, *h),
2531 Input::Owned(p, h) => (p.as_ref(), h),
2532 };
2533
2534 if self.is_vertex_lazy() {
2536 let heads: Vec<Vertex> = heads.vertexes();
2537 self.populate_missing_vertexes_for_add_heads(parents, &heads)
2538 .await?;
2539 }
2540
2541 let to_reassign: Set = self.find_vertexes_to_reassign(parents, heads).await?;
2544 if !to_reassign.is_empty().await? {
2545 let reinsert_heads: VertexListWithOptions = {
2546 let heads = self
2547 .heads(
2548 self.descendants(to_reassign.clone())
2549 .await?
2550 .difference(&to_reassign),
2551 )
2552 .await?;
2553 tracing::debug!(target: "dag::reassign", "need to rebuild heads: {:?}", &heads);
2554 let heads: Vec<Vertex> = heads.iter().await?.try_collect().await?;
2555 VertexListWithOptions::from(heads)
2556 };
2557 let reinsert_parents: Box<dyn Parents> = Box::new(self.dag_snapshot()?);
2558 self.strip_with_lock(&to_reassign, map_lock).await?;
2559
2560 stack.push(Input::Owned(reinsert_parents, reinsert_heads));
2562 };
2563
2564 let mut outcome = PreparedFlatSegments::default();
2566 let mut covered = self.dag().all_ids_in_groups(&Group::ALL)?;
2567 let mut reserved = calculate_initial_reserved(self, &covered, heads).await?;
2568 for group in Group::ALL {
2569 for (vertex, opts) in heads.vertex_options() {
2570 if opts.desired_group != group {
2571 continue;
2572 }
2573 let prepared_segments = self
2576 .assign_head(vertex.clone(), parents, group, &mut covered, &reserved)
2577 .await?;
2578 outcome.merge(prepared_segments);
2579 if opts.reserve_size > 0 {
2581 let low = self.map.vertex_id(vertex).await? + 1;
2582 update_reserved(&mut reserved, &covered, low, opts.reserve_size);
2583 }
2584 }
2585 }
2586
2587 self.dag
2589 .build_segments_from_prepared_flat_segments(&outcome)?;
2590
2591 self.update_overlay_map_id_set()?;
2594 }
2595
2596 Ok(())
2597 }
2598
2599 async fn find_vertexes_to_reassign(
2607 &self,
2608 parents: &dyn Parents,
2609 heads: &VertexListWithOptions,
2610 ) -> Result<Set> {
2611 let master_heads = heads.vertexes_by_group(Group::MASTER);
2613
2614 let mut id_set = IdSet::empty();
2616 let mut to_visit: Vec<Vertex> = master_heads;
2617 let mut visited = HashSet::new();
2618 while let Some(vertex) = to_visit.pop() {
2619 if !visited.insert(vertex.clone()) {
2620 continue;
2621 }
2622 let id = self.vertex_id_optional(&vertex).await?;
2623 if let Some(id) = id {
2625 if id.group() == Group::MASTER {
2626 continue;
2628 } else {
2629 id_set.push(id);
2631 }
2632 }
2633 let parents = parents.parent_names(vertex).await?;
2634 to_visit.extend(parents);
2635 }
2636
2637 let set = Set::from_id_set_dag(id_set, self)?;
2638 tracing::debug!(target: "dag::reassign", "need to reassign: {:?}", &set);
2639 Ok(set)
2640 }
2641}
2642
2643async fn calculate_initial_reserved(
2649 map: &dyn IdConvert,
2650 covered: &IdSet,
2651 heads: &VertexListWithOptions,
2652) -> Result<IdSet> {
2653 let mut reserved = IdSet::empty();
2654 for (vertex, opts) in heads.vertex_options() {
2655 if opts.reserve_size == 0 {
2656 continue;
2658 }
2659 if let Some(id) = map
2660 .vertex_id_with_max_group(&vertex, opts.desired_group)
2661 .await?
2662 {
2663 update_reserved(&mut reserved, covered, id + 1, opts.reserve_size);
2664 }
2665 }
2666 Ok(reserved)
2667}
2668
2669fn update_reserved(reserved: &mut IdSet, covered: &IdSet, low: Id, reserve_size: u32) {
2670 if reserve_size == 0 {
2671 return;
2672 }
2673 let span = find_free_span(covered, low, reserve_size as _, true);
2674 reserved.push(span);
2675}
2676
2677fn find_free_span(covered: &IdSet, low: Id, reserve_size: u64, shrink_to_fit: bool) -> IdSpan {
2686 assert!(reserve_size > 0);
2687 let original_low = low;
2688 let mut low = low;
2689 let mut high;
2690 let mut count = 0;
2691 loop {
2692 count += 1;
2693 if let Some(span) = covered.span_contains(low) {
2698 low = span.high + 1;
2699 }
2700 high = (low + reserve_size - 1).min(low.group().max_id());
2701 if reserve_size <= 1 && !covered.contains(low) {
2702 break;
2704 }
2705 let reserved = IdSet::from_single_span(IdSpan::new(low, high));
2707 let intersected = reserved.intersection(covered);
2708 if let Some(span) = intersected.iter_span_asc().next() {
2709 if span.low > low && shrink_to_fit {
2712 let last_free = span.low - 1;
2723 high = last_free;
2724 } else {
2725 low = span.high + 1;
2734 continue;
2735 }
2736 }
2737 break;
2738 }
2739 if count >= 4096 {
2740 tracing::warn!(
2741 target: "dag::perf",
2742 count=count,
2743 low=?original_low,
2744 reserve_size=reserve_size,
2745 covered=?covered,
2746 "PERF: find_free_span took too long",
2747 );
2748 }
2749 let span = IdSpan::new(low, high);
2750 if !shrink_to_fit {
2751 assert_eq!(span.count(), reserve_size);
2752 }
2753 span
2754}
2755
2756fn is_ok_some<T>(value: Result<Option<T>>) -> bool {
2757 match value {
2758 Ok(Some(_)) => true,
2759 _ => false,
2760 }
2761}
2762
2763impl<IS, M, P, S> IdMapSnapshot for AbstractDag<IdDag<IS>, M, P, S>
2764where
2765 IS: IdDagStore,
2766 IdDag<IS>: TryClone + 'static,
2767 M: TryClone + IdConvert + Send + Sync + 'static,
2768 P: TryClone + Send + Sync + 'static,
2769 S: TryClone + Send + Sync + 'static,
2770{
2771 fn id_map_snapshot(&self) -> Result<Arc<dyn IdConvert + Send + Sync>> {
2772 Ok(self.try_snapshot()? as Arc<dyn IdConvert + Send + Sync>)
2773 }
2774}
2775
2776impl<IS, M, P, S> fmt::Debug for AbstractDag<IdDag<IS>, M, P, S>
2777where
2778 IS: IdDagStore,
2779 M: IdConvert + Send + Sync,
2780 P: Send + Sync,
2781 S: Send + Sync,
2782{
2783 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2784 debug(&self.dag, &self.map, f)
2785 }
2786}
2787
2788pub(crate) fn debug_segments_by_level_group<S: IdDagStore>(
2789 iddag: &IdDag<S>,
2790 idmap: &dyn IdConvert,
2791 level: Level,
2792 group: Group,
2793) -> Vec<String> {
2794 let mut result = Vec::new();
2795 let show = |id: Id| DebugId {
2797 id,
2798 name: non_blocking_result(idmap.vertex_name(id)).ok(),
2799 };
2800 let show_flags = |flags: SegmentFlags| -> String {
2801 let mut result = Vec::new();
2802 if flags.contains(SegmentFlags::HAS_ROOT) {
2803 result.push("Root");
2804 }
2805 if flags.contains(SegmentFlags::ONLY_HEAD) {
2806 result.push("OnlyHead");
2807 }
2808 result.join(" ")
2809 };
2810
2811 if let Ok(segments) = iddag.next_segments(group.min_id(), level) {
2812 for segment in segments.into_iter().rev() {
2813 if let (Ok(span), Ok(parents), Ok(flags)) =
2814 (segment.span(), segment.parents(), segment.flags())
2815 {
2816 let mut line = format!(
2817 "{:.12?} : {:.12?} {:.12?}",
2818 show(span.low),
2819 show(span.high),
2820 parents.into_iter().map(show).collect::<Vec<_>>(),
2821 );
2822 let flags = show_flags(flags);
2823 if !flags.is_empty() {
2824 line += &format!(" {}", flags);
2825 }
2826 result.push(line);
2827 }
2828 }
2829 }
2830 result
2831}
2832
2833fn debug<S: IdDagStore>(
2834 iddag: &IdDag<S>,
2835 idmap: &dyn IdConvert,
2836 f: &mut fmt::Formatter,
2837) -> fmt::Result {
2838 if let Ok(max_level) = iddag.max_level() {
2839 writeln!(f, "Max Level: {}", max_level)?;
2840 for lv in (0..=max_level).rev() {
2841 writeln!(f, " Level {}", lv)?;
2842 for group in Group::ALL.iter().cloned() {
2843 writeln!(f, " {}:", group)?;
2844 if let Ok(segments) = iddag.next_segments(group.min_id(), lv) {
2845 writeln!(f, " Segments: {}", segments.len())?;
2846 for line in debug_segments_by_level_group(iddag, idmap, lv, group) {
2847 writeln!(f, " {}", line)?;
2848 }
2849 }
2850 }
2851 }
2852 }
2853
2854 Ok(())
2855}
2856
2857struct DebugId {
2858 id: Id,
2859 name: Option<Vertex>,
2860}
2861
2862impl fmt::Debug for DebugId {
2863 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2864 if let Some(name) = &self.name {
2865 fmt::Debug::fmt(&name, f)?;
2866 f.write_str("+")?;
2867 }
2868 write!(f, "{:?}", self.id)?;
2869 Ok(())
2870 }
2871}
2872
2873#[cfg(test)]
2874mod tests {
2875 use super::*;
2876
2877 #[test]
2878 fn test_find_free_span_overflow() {
2879 let covered = IdSet::from(0..=6);
2880 let reserve_size = 2;
2881 for shrink_to_fit in [true, false] {
2882 let span = find_free_span(&covered, Id(0), reserve_size, shrink_to_fit);
2883 assert_eq!(span, IdSpan::from(7..=8));
2884 }
2885 }
2886}