1use std::collections::BTreeMap;
13use std::collections::HashSet;
14use std::env::var;
15use std::fmt;
16use std::io;
17use std::ops::Deref;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::RwLock;
21
22use dag_types::FlatSegment;
23use futures::future::BoxFuture;
24use futures::FutureExt;
25use futures::StreamExt;
26use futures::TryStreamExt;
27use nonblocking::non_blocking_result;
28
29use crate::clone::CloneData;
30use crate::errors::bug;
31use crate::errors::programming;
32use crate::errors::DagError;
33use crate::errors::NotFoundError;
34use crate::id::Group;
35use crate::id::Id;
36use crate::id::VertexName;
37use crate::iddag::IdDag;
38use crate::iddag::IdDagAlgorithm;
39use crate::iddagstore::IdDagStore;
40use crate::idmap::CoreMemIdMap;
41use crate::idmap::IdMapAssignHead;
42use crate::idmap::IdMapWrite;
43use crate::nameset::hints::Flags;
44use crate::nameset::hints::Hints;
45use crate::nameset::NameSet;
46use crate::ops::CheckIntegrity;
47use crate::ops::DagAddHeads;
48use crate::ops::DagAlgorithm;
49use crate::ops::DagExportCloneData;
50use crate::ops::DagExportPullData;
51use crate::ops::DagImportCloneData;
52use crate::ops::DagImportPullData;
53use crate::ops::DagPersistent;
54use crate::ops::DagStrip;
55use crate::ops::IdConvert;
56use crate::ops::IdMapSnapshot;
57use crate::ops::IntVersion;
58use crate::ops::Open;
59use crate::ops::Parents;
60use crate::ops::Persist;
61use crate::ops::PrefixLookup;
62use crate::ops::ToIdSet;
63use crate::ops::TryClone;
64use crate::protocol;
65use crate::protocol::is_remote_protocol_disabled;
66use crate::protocol::AncestorPath;
67use crate::protocol::Process;
68use crate::protocol::RemoteIdConvertProtocol;
69use crate::segment::PreparedFlatSegments;
70use crate::segment::SegmentFlags;
71use crate::types_ext::PreparedFlatSegmentsExt;
72use crate::utils;
73use crate::Error::NeedSlowPath;
74use crate::IdSet;
75use crate::IdSpan;
76use crate::Level;
77use crate::Result;
78use crate::VerLink;
79use crate::VertexListWithOptions;
80
81mod builder;
82#[cfg(any(test, feature = "indexedlog-backend"))]
83mod indexedlog_namedag;
84mod mem_namedag;
85
86pub use builder::NameDagBuilder;
87#[cfg(any(test, feature = "indexedlog-backend"))]
88pub use indexedlog_namedag::IndexedLogNameDagPath;
89#[cfg(any(test, feature = "indexedlog-backend"))]
90pub use indexedlog_namedag::NameDag;
91pub use mem_namedag::MemNameDag;
92pub use mem_namedag::MemNameDagPath;
93
94pub struct AbstractNameDag<I, M, P, S>
95where
96 I: Send + Sync,
97 M: Send + Sync,
98 P: Send + Sync,
99 S: Send + Sync,
100{
101 pub(crate) dag: I,
102 pub(crate) map: M,
103
104 snapshot: RwLock<Option<Arc<Self>>>,
107
108 pending_heads: VertexListWithOptions,
110
111 path: P,
113
114 state: S,
116
117 id: String,
119
120 persisted_id_set: IdSet,
122
123 overlay_map: Arc<RwLock<CoreMemIdMap>>,
126
127 overlay_map_id_set: IdSet,
132
133 overlay_map_paths: Arc<Mutex<Vec<(AncestorPath, Vec<VertexName>)>>>,
137
138 remote_protocol: Arc<dyn RemoteIdConvertProtocol>,
142
143 missing_vertexes_confirmed_by_remote: Arc<RwLock<HashSet<VertexName>>>,
146}
147
148impl<D, M, P, S> AbstractNameDag<D, M, P, S>
149where
150 D: Send + Sync,
151 M: Send + Sync,
152 P: Send + Sync,
153 S: Send + Sync,
154{
155 pub fn into_idmap_dag(self) -> (M, D) {
157 (self.map, self.dag)
158 }
159
160 pub fn into_idmap_dag_path_state(self) -> (M, D, P, S) {
162 (self.map, self.dag, self.path, self.state)
163 }
164}
165
166#[async_trait::async_trait]
167impl<IS, M, P, S> DagPersistent for AbstractNameDag<IdDag<IS>, M, P, S>
168where
169 IS: IdDagStore + Persist,
170 IdDag<IS>: TryClone + 'static,
171 M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
172 P: Open<OpenTarget = Self> + Send + Sync + 'static,
173 S: TryClone + IntVersion + Persist + Send + Sync + 'static,
174{
175 async fn add_heads_and_flush(
180 &mut self,
181 parents: &dyn Parents,
182 heads: &VertexListWithOptions,
183 ) -> Result<()> {
184 if !self.pending_heads.is_empty() {
185 return programming(format!(
186 "ProgrammingError: add_heads_and_flush called with pending heads ({:?})",
187 &self.pending_heads.vertexes(),
188 ));
189 }
190
191 let old_version = self.state.int_version();
198 let lock = self.state.lock()?;
199 let map_lock = self.map.lock()?;
200 let dag_lock = self.dag.lock()?;
201 self.state.reload(&lock)?;
202 let new_version = self.state.int_version();
203 if old_version != new_version {
204 self.invalidate_snapshot();
205 self.invalidate_missing_vertex_cache();
206 self.invalidate_overlay_map()?;
207 }
208
209 self.map.reload(&map_lock)?;
210 self.dag.reload(&dag_lock)?;
211
212 self.build_with_lock(parents, heads, &map_lock).await?;
214
215 self.map.persist(&map_lock)?;
217 self.dag.persist(&dag_lock)?;
218 self.state.persist(&lock)?;
219 drop(dag_lock);
220 drop(map_lock);
221 drop(lock);
222
223 self.persisted_id_set = self.dag.all_ids_in_groups(&Group::ALL)?;
224 debug_assert_eq!(self.dirty().await?.count().await?, 0);
225 Ok(())
226 }
227
228 async fn flush(&mut self, heads: &VertexListWithOptions) -> Result<()> {
242 for result in self.vertex_id_batch(&heads.vertexes()).await? {
244 result?;
245 }
246 if !heads.vertexes_by_group(Group::NON_MASTER).is_empty() {
249 return programming(format!(
250 "NameDag::flush({:?}) is probably misused (group is not master)",
251 heads
252 ));
253 }
254
255 self.flush_cached_idmap().await?;
257
258 let mut new_name_dag: Self = self.path.open()?;
260
261 let parents: &(dyn DagAlgorithm + Send + Sync) = self;
262 let non_master_heads: VertexListWithOptions = self.pending_heads.clone();
263 let seg_size = self.dag.get_new_segment_size();
264 new_name_dag.dag.set_new_segment_size(seg_size);
265 new_name_dag.set_remote_protocol(self.remote_protocol.clone());
266 new_name_dag.maybe_reuse_caches_from(self);
267 let heads = heads.clone().chain(non_master_heads);
268 new_name_dag.add_heads_and_flush(&parents, &heads).await?;
269 *self = new_name_dag;
270 Ok(())
271 }
272
273 #[tracing::instrument(skip(self))]
276 async fn flush_cached_idmap(&self) -> Result<()> {
277 let mut to_insert: Vec<(AncestorPath, Vec<VertexName>)> = Vec::new();
282 std::mem::swap(&mut to_insert, &mut *self.overlay_map_paths.lock().unwrap());
283 if to_insert.is_empty() {
284 return Ok(());
285 }
286
287 tracing::debug!(target: "dag::cache", "flushing cached idmap ({} items)", to_insert.len());
289 let mut new: Self = self.path.open()?;
290 let lock = new.state.lock()?;
291 let map_lock = new.map.lock()?;
292 let dag_lock = new.dag.lock()?;
293 new.state.reload(&lock)?;
294 new.map.reload(&map_lock)?;
295 new.dag.reload(&dag_lock)?;
296 new.maybe_reuse_caches_from(self);
297 std::mem::swap(&mut to_insert, &mut *new.overlay_map_paths.lock().unwrap());
298 new.flush_cached_idmap_with_lock(&map_lock).await?;
299
300 new.state.persist(&lock)?;
301
302 Ok(())
303 }
304}
305
306impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
307where
308 IS: IdDagStore,
309 IdDag<IS>: TryClone + 'static,
310 M: TryClone + IdConvert + IdMapWrite + Persist + Send + Sync + 'static,
311 P: Send + Sync + 'static,
312 S: TryClone + Send + Sync + 'static,
313{
314 async fn flush_cached_idmap_with_lock(&mut self, map_lock: &M::Lock) -> Result<()> {
316 let mut to_insert: Vec<(AncestorPath, Vec<VertexName>)> = Vec::new();
317 std::mem::swap(&mut to_insert, &mut *self.overlay_map_paths.lock().unwrap());
318 if to_insert.is_empty() {
319 return Ok(());
320 }
321
322 let id_names = calculate_id_name_from_paths(
323 &self.map,
324 &*self.dag,
325 &self.overlay_map_id_set,
326 &to_insert,
327 )
328 .await?;
329
330 let mut skip_vertexes: Option<HashSet<VertexName>> = None;
332 if crate::is_testing() {
333 if let Ok(s) = var("DAG_SKIP_FLUSH_VERTEXES") {
334 skip_vertexes = Some(
335 s.split(",")
336 .filter_map(|s| VertexName::from_hex(s.as_bytes()).ok())
337 .collect(),
338 )
339 }
340 }
341
342 for (id, name) in id_names {
343 if let Some(skip) = &skip_vertexes {
344 if skip.contains(&name) {
345 tracing::info!(
346 target: "dag::cache",
347 "skip flushing {:?}-{} to IdMap set by DAG_SKIP_FLUSH_VERTEXES",
348 &name,
349 id
350 );
351 continue;
352 }
353 }
354 tracing::debug!(target: "dag::cache", "insert {:?}-{} to IdMap", &name, id);
355 self.map.insert(id, name.as_ref()).await?;
356 }
357
358 self.map.persist(map_lock)?;
359 Ok(())
360 }
361}
362
363impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
364where
365 IS: Send + Sync + 'static,
366 M: Send + Sync + 'static,
367 P: Send + Sync + 'static,
368 S: IntVersion + Send + Sync + 'static,
369{
370 fn maybe_reuse_caches_from(&mut self, other: &Self) {
373 if self.state.int_version() != other.state.int_version()
374 || self.persisted_id_set.as_spans() != other.persisted_id_set.as_spans()
375 {
376 tracing::debug!(target: "dag::cache", "cannot reuse cache");
377 return;
378 }
379 tracing::debug!(
380 target: "dag::cache", "reusing cache ({} missing)",
381 other.missing_vertexes_confirmed_by_remote.read().unwrap().len(),
382 );
383 self.missing_vertexes_confirmed_by_remote =
384 other.missing_vertexes_confirmed_by_remote.clone();
385 self.overlay_map = other.overlay_map.clone();
386 self.overlay_map_paths = other.overlay_map_paths.clone();
387 }
388}
389
390#[async_trait::async_trait]
391impl<IS, M, P, S> DagAddHeads for AbstractNameDag<IdDag<IS>, M, P, S>
392where
393 IS: IdDagStore,
394 IdDag<IS>: TryClone,
395 M: TryClone + IdMapAssignHead + Send + Sync + 'static,
396 P: TryClone + Send + Sync + 'static,
397 S: TryClone + Send + Sync + 'static,
398{
399 async fn add_heads(
410 &mut self,
411 parents: &dyn Parents,
412 heads: &VertexListWithOptions,
413 ) -> Result<bool> {
414 self.invalidate_snapshot();
415
416 self.populate_missing_vertexes_for_add_heads(parents, &heads.vertexes())
418 .await?;
419
420 let master_heads = heads.vertexes_by_group(Group::MASTER);
433 if !master_heads.is_empty() {
434 let all = self.dag.all()?;
435 let has_non_master = match all.max() {
436 Some(id) => id.group() == Group::NON_MASTER,
437 None => false,
438 };
439 if has_non_master {
440 return programming(concat!(
441 "add_heads() called with highest_group = MASTER but NON_MASTER group is not empty. ",
442 "To avoid id reassignment this is not supported. ",
443 "Pass highest_group = NON_MASTER, and call flush() (common on client use-case), ",
444 "or avoid inserting to NON_MASTER group (common on server use-case).",
445 ));
446 }
447 }
448
449 let mut outcome = PreparedFlatSegments::default();
462 let mut covered = self.dag().all_ids_in_groups(&Group::ALL)?;
463 let mut reserved = calculate_initial_reserved(self, &covered, heads).await?;
464 for (head, opts) in heads.vertex_options() {
465 let need_assigning = match self
466 .vertex_id_with_max_group(&head, opts.highest_group)
467 .await?
468 {
469 Some(id) => !self.dag.contains_id(id)?,
470 None => true,
471 };
472 if need_assigning {
473 let group = opts.highest_group;
474 let prepared_segments = self
475 .assign_head(head.clone(), parents, group, &mut covered, &reserved)
476 .await?;
477 outcome.merge(prepared_segments);
478 if opts.reserve_size > 0 {
479 let low = self.map.vertex_id(head.clone()).await? + 1;
480 update_reserved(&mut reserved, &covered, low, opts.reserve_size);
481 }
482 self.pending_heads.push((head, opts));
483 }
484 }
485
486 self.dag
488 .build_segments_from_prepared_flat_segments(&outcome)?;
489
490 Ok(outcome.segment_count() > 0)
491 }
492}
493
494#[async_trait::async_trait]
495impl<IS, M, P, S> DagStrip for AbstractNameDag<IdDag<IS>, M, P, S>
496where
497 IS: IdDagStore + Persist,
498 IdDag<IS>: TryClone,
499 M: TryClone + Persist + IdMapWrite + IdConvert + Send + Sync + 'static,
500 P: TryClone + Open<OpenTarget = Self> + Send + Sync + 'static,
501 S: TryClone + IntVersion + Persist + Send + Sync + 'static,
502{
503 async fn strip(&mut self, set: &NameSet) -> Result<()> {
504 if !self.pending_heads.is_empty() {
505 return programming(format!(
506 "strip does not support pending heads ({:?})",
507 &self.pending_heads.vertexes(),
508 ));
509 }
510
511 let mut new: Self = self.path.open()?;
514 let (lock, map_lock, dag_lock) = new.reload()?;
515 new.set_remote_protocol(self.remote_protocol.clone());
516 new.maybe_reuse_caches_from(self);
517
518 new.strip_with_lock(set, &map_lock).await?;
519 new.persist(lock, map_lock, dag_lock)?;
520
521 *self = new;
522 Ok(())
523 }
524}
525
526impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
527where
528 IS: IdDagStore,
529 IdDag<IS>: TryClone,
530 M: TryClone + Persist + IdMapWrite + IdConvert + Send + Sync + 'static,
531 P: TryClone + Send + Sync + 'static,
532 S: TryClone + Send + Sync + 'static,
533{
534 async fn strip_with_lock(&mut self, set: &NameSet, map_lock: &M::Lock) -> Result<()> {
536 if !self.pending_heads.is_empty() {
537 return programming(format!(
538 "strip does not support pending heads ({:?})",
539 &self.pending_heads.vertexes(),
540 ));
541 }
542
543 let id_set = self.to_id_set(set).await?;
544
545 let head_ids: Vec<Id> = {
548 let to_strip = self.dag.descendants(id_set.clone())?;
550 let master_group = self.dag.master_group()?;
552 let master_group_after_strip = master_group.difference(&to_strip);
553 let heads_before_strip = self.dag.heads_ancestors(master_group)?;
554 let heads_after_strip = self.dag.heads_ancestors(master_group_after_strip)?;
555 let new_heads = heads_after_strip.difference(&heads_before_strip);
556 new_heads.iter_desc().collect()
557 };
558 let heads_after_strip = self.vertex_name_batch(&head_ids).await?;
559 tracing::debug!(target: "dag::strip", "heads after strip: {:?}", &heads_after_strip);
560 self.flush_cached_idmap_with_lock(map_lock).await?;
563
564 let removed_id_set = self.dag.strip(id_set)?;
565 tracing::debug!(target: "dag::strip", "removed id set: {:?}", &removed_id_set);
566
567 let mut removed_vertexes = Vec::new();
568 for span in removed_id_set.iter_span_desc() {
569 let vertexes = self.map.remove_range(span.low, span.high).await?;
570 removed_vertexes.extend(vertexes);
571 }
572 tracing::debug!(target: "dag::strip", "removed vertexes: {:?}", &removed_vertexes);
573
574 self.missing_vertexes_confirmed_by_remote
576 .write()
577 .unwrap()
578 .extend(removed_vertexes);
579
580 self.invalidate_snapshot();
582
583 Ok(())
584 }
585}
586
587#[async_trait::async_trait]
588impl<IS, M, P, S> IdMapWrite for AbstractNameDag<IdDag<IS>, M, P, S>
589where
590 IS: IdDagStore,
591 IdDag<IS>: TryClone,
592 M: TryClone + IdMapAssignHead + Send + Sync,
593 P: TryClone + Send + Sync,
594 S: TryClone + Send + Sync,
595{
596 async fn insert(&mut self, id: Id, name: &[u8]) -> Result<()> {
597 self.map.insert(id, name).await
598 }
599
600 async fn remove_range(&mut self, low: Id, high: Id) -> Result<Vec<VertexName>> {
601 self.map.remove_range(low, high).await
602 }
603}
604
605#[async_trait::async_trait]
606impl<IS, M, P, S> DagImportCloneData for AbstractNameDag<IdDag<IS>, M, P, S>
607where
608 IS: IdDagStore + Persist + 'static,
609 IdDag<IS>: TryClone,
610 M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
611 P: TryClone + Send + Sync + 'static,
612 S: TryClone + Persist + Send + Sync + 'static,
613{
614 async fn import_clone_data(&mut self, clone_data: CloneData<VertexName>) -> Result<()> {
615 let (lock, map_lock, dag_lock) = self.reload()?;
618
619 if !self.dag.all()?.is_empty() {
620 return programming("Cannot import clone data for non-empty graph");
621 }
622 for (id, name) in clone_data.idmap {
623 tracing::debug!(target: "dag::clone", "insert IdMap: {:?}-{:?}", &name, id);
624 self.map.insert(id, name.as_ref()).await?;
625 }
626 self.dag
627 .build_segments_from_prepared_flat_segments(&clone_data.flat_segments)?;
628
629 self.verify_missing().await?;
630
631 self.persist(lock, map_lock, dag_lock)
632 }
633}
634
635impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
636where
637 IS: IdDagStore + Persist + 'static,
638 IdDag<IS>: TryClone,
639 M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
640 P: TryClone + Send + Sync + 'static,
641 S: TryClone + Persist + Send + Sync + 'static,
642{
643 async fn verify_missing(&self) -> Result<()> {
645 let missing: Vec<Id> = self.check_universal_ids().await?;
646 if !missing.is_empty() {
647 let msg = format!(
648 concat!(
649 "Clone data does not contain vertex for {:?}. ",
650 "This is most likely a server-side bug."
651 ),
652 missing,
653 );
654 return programming(msg);
655 }
656
657 Ok(())
658 }
659
660 fn reload(&mut self) -> Result<(S::Lock, M::Lock, IS::Lock)> {
661 let lock = self.state.lock()?;
662 let map_lock = self.map.lock()?;
663 let dag_lock = self.dag.lock()?;
664 self.state.reload(&lock)?;
665 self.map.reload(&map_lock)?;
666 self.dag.reload(&dag_lock)?;
667
668 Ok((lock, map_lock, dag_lock))
669 }
670
671 fn persist(&mut self, lock: S::Lock, map_lock: M::Lock, dag_lock: IS::Lock) -> Result<()> {
672 self.map.persist(&map_lock)?;
673 self.dag.persist(&dag_lock)?;
674 self.state.persist(&lock)?;
675
676 self.invalidate_overlay_map()?;
677 self.persisted_id_set = self.dag.all_ids_in_groups(&Group::ALL)?;
678
679 Ok(())
680 }
681}
682
683#[async_trait::async_trait]
684impl<IS, M, P, S> DagImportPullData for AbstractNameDag<IdDag<IS>, M, P, S>
685where
686 IS: IdDagStore + Persist,
687 IdDag<IS>: TryClone,
688 M: TryClone + IdMapAssignHead + Persist + Send + Sync + 'static,
689 P: Open<OpenTarget = Self> + TryClone + Send + Sync + 'static,
690 S: IntVersion + TryClone + Persist + Send + Sync + 'static,
691{
692 async fn import_pull_data(
693 &mut self,
694 clone_data: CloneData<VertexName>,
695 heads: &VertexListWithOptions,
696 ) -> Result<()> {
697 if !self.pending_heads.is_empty() {
698 return programming(format!(
699 "import_pull_data called with pending heads ({:?})",
700 &self.pending_heads.vertexes(),
701 ));
702 }
703 let non_master_heads = heads.vertexes_by_group(Group::NON_MASTER);
704 if !non_master_heads.is_empty() {
705 return programming(format!(
706 concat!(
707 "import_pull_data called with non-master heads ({:?}). ",
708 "This is unsupported because the pull data is lazy and can only be inserted to the master group.",
709 ),
710 non_master_heads
711 ));
712 }
713
714 for id in clone_data.flat_segments.parents_head_and_roots() {
715 if !clone_data.idmap.contains_key(&id) {
716 return programming(format!(
717 "server does not provide name for id {:?} in pull data",
718 id
719 ));
720 }
721 }
722
723 let mut new: Self = self.path.open()?;
725 let (lock, map_lock, dag_lock) = new.reload()?;
726 new.set_remote_protocol(self.remote_protocol.clone());
727 new.maybe_reuse_caches_from(self);
728
729 {
740 let mut root_ids: Vec<Id> = Vec::new();
741 let mut parent_ids: Vec<Id> = Vec::new();
742 let segments = &clone_data.flat_segments.segments;
743 let id_set = IdSet::from_spans(segments.iter().map(|s| s.low..=s.high));
744 for seg in segments {
745 let pids: Vec<Id> = seg.parents.iter().copied().collect();
746 let connected_pids: Vec<Id> = pids
749 .iter()
750 .copied()
751 .filter(|&p| !id_set.contains(p))
752 .collect();
753 if connected_pids.len() == pids.len() {
754 root_ids.push(seg.low);
757 }
758 parent_ids.extend(connected_pids);
759 }
760
761 let to_names = |ids: &[Id], hint: &str| -> Result<Vec<VertexName>> {
762 let names = ids.iter().map(|i| match clone_data.idmap.get(&i) {
763 Some(v) => Ok(v.clone()),
764 None => {
765 programming(format!("server does not provide name for {} {:?}", hint, i))
766 }
767 });
768 names.collect()
769 };
770
771 let parent_names = to_names(&parent_ids, "parent")?;
772 let root_names = to_names(&root_ids, "root")?;
773 tracing::trace!(
774 "pull: connected parents: {:?}, roots: {:?}",
775 &parent_names,
776 &root_names
777 );
778
779 let mut names = parent_names
781 .iter()
782 .chain(root_names.iter())
783 .cloned()
784 .collect::<Vec<_>>();
785 names.sort_unstable();
786 names.dedup();
787 let resolved = new.vertex_id_batch(&names).await?;
788 assert_eq!(resolved.len(), names.len());
789 for (id, name) in resolved.into_iter().zip(names) {
790 if let Ok(id) = id {
791 if !new.map.contains_vertex_name(&name).await? {
792 tracing::debug!(target: "dag::pull", "insert IdMap: {:?}-{:?}", &name, id);
793 new.map.insert(id, name.as_ref()).await?;
794 }
795 }
796 }
797
798 for name in root_names {
799 if new.contains_vertex_name(&name).await? {
800 let e = NeedSlowPath(format!("{:?} exists in local graph", name));
801 return Err(e);
802 }
803 }
804
805 let client_parents = new.vertex_id_batch(&parent_names).await?;
806 client_parents.into_iter().collect::<Result<Vec<Id>>>()?;
807 }
808
809 let mut prepared_client_segments = PreparedFlatSegments::default();
811 let server_idmap = &clone_data.idmap;
812 let server_idmap_by_name: BTreeMap<&VertexName, Id> =
813 server_idmap.iter().map(|(&id, name)| (name, id)).collect();
814 let mut taken = {
816 let covered = new.dag().all_ids_in_groups(&[Group::MASTER])?;
817 covered
823 };
824
825 let server_seg_by_high: BTreeMap<Id, &FlatSegment> = clone_data
827 .flat_segments
828 .segments
829 .iter()
830 .map(|s| (s.high, s))
831 .collect();
832 let find_server_seg_contains_server_id = |server_id: Id| -> Result<&FlatSegment> {
833 let seg = match server_seg_by_high.range(server_id..).next() {
834 Some((_high, &seg)) => {
835 if seg.low <= server_id && seg.high >= server_id {
836 Some(seg)
837 } else {
838 None
839 }
840 }
841 None => None,
842 };
843 seg.ok_or_else(|| {
844 DagError::Programming(format!(
845 "server does not provide segment covering id {}",
846 server_id
847 ))
848 })
849 };
850
851 for (head, opts) in heads.vertex_options() {
858 let mut stack: Vec<&FlatSegment> = vec![];
859 if let Some(&head_server_id) = server_idmap_by_name.get(&head) {
860 let head_server_seg = find_server_seg_contains_server_id(head_server_id)?;
861 stack.push(head_server_seg);
862 }
863
864 while let Some(server_seg) = stack.pop() {
865 let high_vertex = server_idmap[&server_seg.high].clone();
866 let client_high_id = new
867 .map
868 .vertex_id_with_max_group(&high_vertex, Group::NON_MASTER)
869 .await?;
870 match client_high_id {
871 Some(id) if id.group() == Group::MASTER => {
872 continue;
875 }
876 Some(id) => {
877 let e = NeedSlowPath(format!(
881 "{:?} exists in local graph as {:?} - fast path requires MASTER group",
882 &high_vertex, id
883 ));
884 return Err(e);
885 }
886 None => {}
887 }
888
889 let parent_server_ids = &server_seg.parents;
890 let parent_names: Vec<VertexName> = {
891 let iter = parent_server_ids.iter().map(|id| server_idmap[id].clone());
892 iter.collect()
893 };
894
895 let mut parent_client_ids = Vec::new();
897 let mut missng_parent_server_ids = Vec::new();
898
899 {
902 let client_id_res = new.map.vertex_id_batch(&parent_names).await?;
903 assert_eq!(client_id_res.len(), parent_server_ids.len());
904 for (res, &server_id) in client_id_res.into_iter().zip(parent_server_ids) {
905 match res {
906 Ok(id) if id.group() != Group::MASTER => {
907 return Err(NeedSlowPath(format!(
908 "{:?} exists id in local graph as {:?} - fast path requires MASTER group",
909 &parent_names, id
910 )));
911 }
912 Ok(id) => {
913 parent_client_ids.push(id);
914 }
915 Err(crate::Error::VertexNotFound(_)) => {
916 missng_parent_server_ids.push(server_id);
917 }
918 Err(e) => return Err(e),
919 }
920 }
921 }
922
923 if !missng_parent_server_ids.is_empty() {
924 stack.push(server_seg);
926 for &server_id in missng_parent_server_ids.iter().rev() {
929 let parent_server_seg = find_server_seg_contains_server_id(server_id)?;
930 stack.push(parent_server_seg);
931 }
932 continue;
933 }
934
935 let candidate_id = parent_client_ids
938 .iter()
939 .max()
940 .copied()
941 .unwrap_or(Group::MASTER.min_id())
942 + 1;
943 let size = server_seg.high.0 - server_seg.low.0 + 1;
944 let span = find_free_span(&taken, candidate_id, size, false);
945
946 for (&server_id, name) in server_idmap.range(server_seg.low..=server_seg.high) {
949 let client_id = server_id + span.low.0 - server_seg.low.0;
950 if client_id.group() != Group::MASTER {
951 return Err(crate::Error::IdOverflow(Group::MASTER));
952 }
953 new.map.insert(client_id, name.as_ref()).await?;
954 }
955
956 prepared_client_segments.push_segment(span.low, span.high, &parent_client_ids);
958
959 taken.push(span);
961 }
962
963 if opts.reserve_size > 0 {
965 let head_client_id = new.map.vertex_id(head).await?;
966 let span = find_free_span(&taken, head_client_id + 1, opts.reserve_size as _, true);
967 taken.push(span);
968 }
969 }
970
971 new.dag
972 .build_segments_from_prepared_flat_segments(&prepared_client_segments)?;
973
974 if cfg!(debug_assertions) {
975 new.verify_missing().await?;
976 }
977
978 new.persist(lock, map_lock, dag_lock)?;
979 *self = new;
980 Ok(())
981 }
982}
983
984#[async_trait::async_trait]
985impl<IS, M, P, S> DagExportCloneData for AbstractNameDag<IdDag<IS>, M, P, S>
986where
987 IS: IdDagStore,
988 IdDag<IS>: TryClone,
989 M: IdConvert + TryClone + Send + Sync + 'static,
990 P: TryClone + Send + Sync + 'static,
991 S: TryClone + Send + Sync + 'static,
992{
993 async fn export_clone_data(&self) -> Result<CloneData<VertexName>> {
994 let idmap: BTreeMap<Id, VertexName> = {
995 let ids: Vec<Id> = self.dag.universal_ids()?.into_iter().collect();
996 tracing::debug!("export: {} universally known vertexes", ids.len());
997 let names = {
998 let fallible_names = self.vertex_name_batch(&ids).await?;
999 let mut names = Vec::with_capacity(fallible_names.len());
1000 for name in fallible_names {
1001 names.push(name?);
1002 }
1003 names
1004 };
1005 ids.into_iter().zip(names).collect()
1006 };
1007
1008 let flat_segments: PreparedFlatSegments = {
1009 let segments = self.dag.next_segments(Id::MIN, 0)?;
1010 let mut prepared = Vec::with_capacity(segments.len());
1011 for segment in segments {
1012 let span = segment.span()?;
1013 let parents = segment.parents()?;
1014 prepared.push(FlatSegment {
1015 low: span.low,
1016 high: span.high,
1017 parents,
1018 });
1019 }
1020 PreparedFlatSegments {
1021 segments: prepared.into_iter().collect(),
1022 }
1023 };
1024
1025 let data = CloneData {
1026 flat_segments,
1027 idmap,
1028 };
1029 Ok(data)
1030 }
1031}
1032
1033#[async_trait::async_trait]
1034impl<IS, M, P, S> DagExportPullData for AbstractNameDag<IdDag<IS>, M, P, S>
1035where
1036 IS: IdDagStore,
1037 IdDag<IS>: TryClone,
1038 M: IdConvert + TryClone + Send + Sync + 'static,
1039 P: TryClone + Send + Sync + 'static,
1040 S: TryClone + Send + Sync + 'static,
1041{
1042 async fn export_pull_data(&self, set: &NameSet) -> Result<CloneData<VertexName>> {
1043 let id_set = self.to_id_set(&set).await?;
1044
1045 let flat_segments = self.dag.idset_to_flat_segments(id_set)?;
1046 let ids: Vec<_> = flat_segments.parents_head_and_roots().into_iter().collect();
1047
1048 let idmap: BTreeMap<Id, VertexName> = {
1049 tracing::debug!("pull: {} vertexes in idmap", ids.len());
1050 let names = {
1051 let fallible_names = self.vertex_name_batch(&ids).await?;
1052 let mut names = Vec::with_capacity(fallible_names.len());
1053 for name in fallible_names {
1054 names.push(name?);
1055 }
1056 names
1057 };
1058 assert_eq!(ids.len(), names.len());
1059 ids.into_iter().zip(names).collect()
1060 };
1061
1062 let data = CloneData {
1063 flat_segments,
1064 idmap,
1065 };
1066 Ok(data)
1067 }
1068}
1069
1070impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
1071where
1072 IS: IdDagStore,
1073 IdDag<IS>: TryClone,
1074 M: TryClone + Send + Sync,
1075 P: TryClone + Send + Sync,
1076 S: TryClone + Send + Sync,
1077{
1078 fn invalidate_snapshot(&mut self) {
1085 *self.snapshot.write().unwrap() = None;
1086 }
1087
1088 fn invalidate_missing_vertex_cache(&mut self) {
1089 tracing::debug!(target: "dag::cache", "cleared missing cache");
1090 *self.missing_vertexes_confirmed_by_remote.write().unwrap() = Default::default();
1091 }
1092
1093 fn invalidate_overlay_map(&mut self) -> Result<()> {
1094 self.overlay_map = Default::default();
1095 self.update_overlay_map_id_set()?;
1096 tracing::debug!(target: "dag::cache", "cleared overlay map cache");
1097 Ok(())
1098 }
1099
1100 fn update_overlay_map_id_set(&mut self) -> Result<()> {
1101 self.overlay_map_id_set = self.dag.master_group()?;
1102 Ok(())
1103 }
1104
1105 pub(crate) fn try_snapshot(&self) -> Result<Arc<Self>> {
1107 if let Some(s) = self.snapshot.read().unwrap().deref() {
1108 if s.dag.version() == self.dag.version() {
1109 return Ok(Arc::clone(s));
1110 }
1111 }
1112
1113 let mut snapshot = self.snapshot.write().unwrap();
1114 match snapshot.deref() {
1115 Some(s) if s.dag.version() == self.dag.version() => Ok(s.clone()),
1116 _ => {
1117 let cloned = Self {
1118 dag: self.dag.try_clone()?,
1119 map: self.map.try_clone()?,
1120 snapshot: Default::default(),
1121 pending_heads: self.pending_heads.clone(),
1122 persisted_id_set: self.persisted_id_set.clone(),
1123 path: self.path.try_clone()?,
1124 state: self.state.try_clone()?,
1125 id: self.id.clone(),
1126 overlay_map: Arc::clone(&self.overlay_map),
1129 overlay_map_id_set: self.overlay_map_id_set.clone(),
1130 overlay_map_paths: Arc::clone(&self.overlay_map_paths),
1131 remote_protocol: self.remote_protocol.clone(),
1132 missing_vertexes_confirmed_by_remote: Arc::clone(
1133 &self.missing_vertexes_confirmed_by_remote,
1134 ),
1135 };
1136 let result = Arc::new(cloned);
1137 *snapshot = Some(Arc::clone(&result));
1138 Ok(result)
1139 }
1140 }
1141 }
1142
1143 pub fn dag(&self) -> &IdDag<IS> {
1144 &self.dag
1145 }
1146
1147 pub fn map(&self) -> &M {
1148 &self.map
1149 }
1150
1151 pub fn set_remote_protocol(&mut self, protocol: Arc<dyn RemoteIdConvertProtocol>) {
1156 self.remote_protocol = protocol;
1157 }
1158
1159 pub(crate) fn get_remote_protocol(&self) -> Arc<dyn RemoteIdConvertProtocol> {
1160 self.remote_protocol.clone()
1161 }
1162}
1163
1164impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
1165where
1166 IS: IdDagStore,
1167 IdDag<IS>: TryClone,
1168 M: TryClone + IdMapAssignHead + Send + Sync + 'static,
1169 P: TryClone + Send + Sync + 'static,
1170 S: TryClone + Send + Sync + 'static,
1171{
1172 async fn populate_missing_vertexes_for_add_heads(
1173 &mut self,
1174 parents: &dyn Parents,
1175 heads: &[VertexName],
1176 ) -> Result<()> {
1177 if self.is_vertex_lazy() {
1178 let unassigned = calculate_definitely_unassigned_vertexes(self, parents, heads).await?;
1179 let mut missing = self.missing_vertexes_confirmed_by_remote.write().unwrap();
1180 for v in unassigned {
1181 if missing.insert(v.clone()) {
1182 tracing::trace!(target: "dag::cache", "cached missing {:?} (definitely missing)", &v);
1183 }
1184 }
1185 }
1186 Ok(())
1187 }
1188}
1189
1190async fn calculate_definitely_unassigned_vertexes<IS, M, P, S>(
1201 this: &AbstractNameDag<IdDag<IS>, M, P, S>,
1202 parents: &dyn Parents,
1203 heads: &[VertexName],
1204) -> Result<Vec<VertexName>>
1205where
1206 IS: IdDagStore,
1207 IdDag<IS>: TryClone,
1208 M: TryClone + IdMapAssignHead + Send + Sync + 'static,
1209 P: TryClone + Send + Sync + 'static,
1210 S: TryClone + Send + Sync + 'static,
1211{
1212 let subdag = parents.hint_subdag_for_insertion(heads).await?;
1225
1226 let mut remaining = subdag.all().await?;
1227 let mut unassigned = NameSet::empty();
1228
1229 let mut unassigned_roots = Vec::new();
1247 if this.is_vertex_lazy() {
1248 let roots = subdag.roots(remaining.clone()).await?;
1249 let mut roots_iter = roots.iter().await?;
1250 while let Some(root) = roots_iter.next().await {
1251 let root = root?;
1252
1253 if matches!(
1255 &this.contains_vertex_name_locally(&[root.clone()]).await?[..],
1256 [true]
1257 ) {
1258 tracing::debug!(target: "dag::definitelymissing", "root {:?} is already known", &root);
1259 continue;
1260 }
1261
1262 let root_parents_id_set = {
1263 let root_parents = parents.parent_names(root.clone()).await?;
1264 let root_parents_set = match this
1265 .sort(&NameSet::from_static_names(root_parents))
1266 .await
1267 {
1268 Ok(set) => set,
1269 Err(_) => {
1270 tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear (parents cannot be resolved)", &root);
1271 continue;
1272 }
1273 };
1274 this.to_id_set(&root_parents_set).await?
1275 };
1276
1277 if root_parents_id_set.is_empty() {
1280 tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear (no parents)", &root);
1281 continue;
1282 }
1283
1284 if root_parents_id_set
1288 .iter_desc()
1289 .all(|i| i.group() == Group::NON_MASTER)
1290 {
1291 tracing::debug!(target: "dag::definitelymissing", "root {:?} is not assigned (non-lazy parent)", &root);
1292 unassigned_roots.push(root);
1293 continue;
1294 }
1295
1296 let children_ids: Vec<Id> = this
1299 .dag
1300 .children(root_parents_id_set)?
1301 .iter_desc()
1302 .collect();
1303 if this
1304 .map
1305 .contains_vertex_id_locally(&children_ids)
1306 .await?
1307 .iter()
1308 .all(|b| *b)
1309 {
1310 tracing::debug!(target: "dag::definitelymissing", "root {:?} is not assigned (children of parents are known)", &root);
1311 unassigned_roots.push(root);
1312 continue;
1313 }
1314
1315 tracing::trace!(target: "dag::definitelymissing", "root {:?} is unclear", &root);
1316 }
1317
1318 if !unassigned_roots.is_empty() {
1319 unassigned = subdag
1320 .descendants(NameSet::from_static_names(unassigned_roots))
1321 .await?;
1322 remaining = remaining.difference(&unassigned);
1323 }
1324 }
1325
1326 let filter_known = |sample: &[VertexName]| -> BoxFuture<Result<Vec<VertexName>>> {
1329 let sample = sample.to_vec();
1330 async {
1331 let known_bools: Vec<bool> = {
1332 let ids = this.vertex_id_batch(&sample).await?;
1333 ids.into_iter().map(|i| i.is_ok()).collect()
1334 };
1335 debug_assert_eq!(sample.len(), known_bools.len());
1336 let known = sample
1337 .into_iter()
1338 .zip(known_bools)
1339 .filter_map(|(v, b)| if b { Some(v) } else { None })
1340 .collect();
1341 Ok(known)
1342 }
1343 .boxed()
1344 };
1345 let assigned = utils::filter_known(remaining.clone(), &filter_known).await?;
1346 unassigned = unassigned.union(&remaining.difference(&assigned));
1347 tracing::debug!(target: "dag::definitelymissing", "unassigned (missing): {:?}", &unassigned);
1348
1349 let unassigned = unassigned.iter().await?.try_collect().await?;
1350 Ok(unassigned)
1351}
1352
1353impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
1355where
1356 IS: IdDagStore,
1357 IdDag<IS>: TryClone,
1358 M: IdConvert + TryClone + Send + Sync,
1359 P: TryClone + Send + Sync,
1360 S: TryClone + Send + Sync,
1361{
1362 async fn resolve_vertexes_remotely(&self, names: &[VertexName]) -> Result<Vec<Option<Id>>> {
1365 if names.is_empty() {
1366 return Ok(Vec::new());
1367 }
1368 if is_remote_protocol_disabled() {
1369 return Err(io::Error::new(
1370 io::ErrorKind::WouldBlock,
1371 "resolving vertexes remotely disabled",
1372 )
1373 .into());
1374 }
1375 if names.len() < 30 {
1376 tracing::debug!(target: "dag::protocol", "resolve names {:?} remotely", &names);
1377 } else {
1378 tracing::debug!(target: "dag::protocol", "resolve names ({}) remotely", names.len());
1379 }
1380 crate::failpoint!("dag-resolve-vertexes-remotely");
1381 let request: protocol::RequestNameToLocation =
1382 (self.map(), self.dag()).process(names.to_vec()).await?;
1383 let path_names = self
1384 .remote_protocol
1385 .resolve_names_to_relative_paths(request.heads, request.names)
1386 .await?;
1387 self.insert_relative_paths(path_names).await?;
1388 let overlay = self.overlay_map.read().unwrap();
1389 let mut ids = Vec::with_capacity(names.len());
1390 let mut missing = self.missing_vertexes_confirmed_by_remote.write().unwrap();
1391 for name in names {
1392 if let Some(id) = overlay.lookup_vertex_id(name) {
1393 ids.push(Some(id));
1394 } else {
1395 tracing::trace!(target: "dag::cache", "cached missing {:?} (server confirmed)", &name);
1396 missing.insert(name.clone());
1397 ids.push(None);
1398 }
1399 }
1400 Ok(ids)
1401 }
1402
1403 async fn resolve_ids_remotely(&self, ids: &[Id]) -> Result<Vec<VertexName>> {
1406 if ids.is_empty() {
1407 return Ok(Vec::new());
1408 }
1409 if is_remote_protocol_disabled() {
1410 return Err(io::Error::new(
1411 io::ErrorKind::WouldBlock,
1412 "resolving ids remotely disabled",
1413 )
1414 .into());
1415 }
1416 if ids.len() < 30 {
1417 tracing::debug!(target: "dag::protocol", "resolve ids {:?} remotely", &ids);
1418 } else {
1419 tracing::debug!(target: "dag::protocol", "resolve ids ({}) remotely", ids.len());
1420 }
1421 crate::failpoint!("dag-resolve-ids-remotely");
1422 let request: protocol::RequestLocationToName = (self.map(), self.dag())
1423 .process(IdSet::from_spans(ids.iter().copied()))
1424 .await?;
1425 let path_names = self
1426 .remote_protocol
1427 .resolve_relative_paths_to_names(request.paths)
1428 .await?;
1429 self.insert_relative_paths(path_names).await?;
1430 let overlay = self.overlay_map.read().unwrap();
1431 let mut names = Vec::with_capacity(ids.len());
1432 for &id in ids {
1433 if let Some(name) = overlay.lookup_vertex_name(id) {
1434 names.push(name);
1435 } else {
1436 return id.not_found();
1437 }
1438 }
1439 Ok(names)
1440 }
1441
1442 async fn insert_relative_paths(
1444 &self,
1445 path_names: Vec<(AncestorPath, Vec<VertexName>)>,
1446 ) -> Result<()> {
1447 if path_names.is_empty() {
1448 return Ok(());
1449 }
1450 let to_insert: Vec<(Id, VertexName)> = calculate_id_name_from_paths(
1451 self.map(),
1452 self.dag().deref(),
1453 &self.overlay_map_id_set,
1454 &path_names,
1455 )
1456 .await?;
1457
1458 let mut paths = self.overlay_map_paths.lock().unwrap();
1459 paths.extend(path_names);
1460 drop(paths);
1461
1462 let mut overlay = self.overlay_map.write().unwrap();
1463 for (id, name) in to_insert {
1464 tracing::trace!(target: "dag::cache", "cached mapping {:?} <=> {:?}", id, &name);
1465 overlay.insert_vertex_id_name(id, name);
1466 }
1467
1468 Ok(())
1469 }
1470}
1471
1472async fn calculate_id_name_from_paths(
1474 map: &dyn IdConvert,
1475 dag: &dyn IdDagAlgorithm,
1476 overlay_map_id_set: &IdSet,
1477 path_names: &[(AncestorPath, Vec<VertexName>)],
1478) -> Result<Vec<(Id, VertexName)>> {
1479 if path_names.is_empty() {
1480 return Ok(Vec::new());
1481 }
1482 let mut to_insert: Vec<(Id, VertexName)> =
1483 Vec::with_capacity(path_names.iter().map(|(_, ns)| ns.len()).sum());
1484 for (path, names) in path_names {
1485 if names.is_empty() {
1486 continue;
1487 }
1488 let x_id = map.vertex_id(path.x.clone()).await.map_err(|e| {
1490 let msg = format!(
1491 concat!(
1492 "Cannot resolve x ({:?}) in x~n locally. The x is expected to be known ",
1493 "locally and is populated at clone time. This x~n is used to convert ",
1494 "{:?} to a location in the graph. (Check initial clone logic) ",
1495 "(Error: {})",
1496 ),
1497 &path.x, &names[0], e
1498 );
1499 crate::Error::Programming(msg)
1500 })?;
1501 tracing::trace!(
1502 "resolve path {:?} names {:?} (x = {}) to overlay",
1503 &path,
1504 &names,
1505 x_id
1506 );
1507 if !overlay_map_id_set.contains(x_id) {
1508 crate::failpoint!("dag-error-x-n-overflow");
1509 let msg = format!(
1510 concat!(
1511 "Server returned x~n (x = {:?} {}, n = {}). But x is out of range ",
1512 "({:?}). This is not expected and indicates some ",
1513 "logic error on the server side."
1514 ),
1515 &path.x, x_id, path.n, overlay_map_id_set
1516 );
1517 return programming(msg);
1518 }
1519 let mut id = match dag.first_ancestor_nth(x_id, path.n).map_err(|e| {
1520 let msg = format!(
1521 concat!(
1522 "Cannot resolve x~n (x = {:?} {}, n = {}): {}. ",
1523 "This indicates the client-side graph is somewhat incompatible from the ",
1524 "server-side graph. Something (server-side or client-side) was probably ",
1525 "seriously wrong before this error."
1526 ),
1527 &path.x, x_id, path.n, e
1528 );
1529 crate::Error::Programming(msg)
1530 }) {
1531 Err(e) => {
1532 crate::failpoint!("dag-error-x-n-unresolvable");
1533 return Err(e);
1534 }
1535 Ok(id) => id,
1536 };
1537 if names.len() < 30 {
1538 tracing::debug!("resolved {:?} => {} {:?}", &path, id, &names);
1539 } else {
1540 tracing::debug!("resolved {:?} => {} {:?} ...", &path, id, &names[0]);
1541 }
1542 for (i, name) in names.into_iter().enumerate() {
1543 if i > 0 {
1544 id = match dag.parent_ids(id)?.first().cloned() {
1546 Some(id) => id,
1547 None => {
1548 let msg = format!(
1549 concat!(
1550 "Cannot resolve x~(n+i) (x = {:?} {}, n = {}, i = {}) locally. ",
1551 "This indicates the client-side graph is somewhat incompatible ",
1552 "from the server-side graph. Something (server-side or ",
1553 "client-side) was probably seriously wrong before this error."
1554 ),
1555 &path.x, x_id, path.n, i
1556 );
1557 return programming(msg);
1558 }
1559 }
1560 }
1561
1562 tracing::trace!(" resolved {:?} = {:?}", id, &name,);
1563 to_insert.push((id, name.clone()));
1564 }
1565 }
1566 Ok(to_insert)
1567}
1568
1569#[async_trait::async_trait]
1572impl<IS, M, P, S> RemoteIdConvertProtocol for AbstractNameDag<IdDag<IS>, M, P, S>
1573where
1574 IS: IdDagStore,
1575 IdDag<IS>: TryClone,
1576 M: IdConvert + TryClone + Send + Sync + 'static,
1577 P: TryClone + Send + Sync + 'static,
1578 S: TryClone + Send + Sync + 'static,
1579{
1580 async fn resolve_names_to_relative_paths(
1581 &self,
1582 heads: Vec<VertexName>,
1583 names: Vec<VertexName>,
1584 ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
1585 let request = protocol::RequestNameToLocation { names, heads };
1586 let response: protocol::ResponseIdNamePair =
1587 (self.map(), self.dag()).process(request).await?;
1588 Ok(response.path_names)
1589 }
1590
1591 async fn resolve_relative_paths_to_names(
1592 &self,
1593 paths: Vec<AncestorPath>,
1594 ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
1595 let request = protocol::RequestLocationToName { paths };
1596 let response: protocol::ResponseIdNamePair =
1597 (self.map(), self.dag()).process(request).await?;
1598 Ok(response.path_names)
1599 }
1600}
1601
1602#[async_trait::async_trait]
1604impl<IS, M, P, S> RemoteIdConvertProtocol for Arc<AbstractNameDag<IdDag<IS>, M, P, S>>
1605where
1606 IS: IdDagStore,
1607 IdDag<IS>: TryClone,
1608 M: IdConvert + TryClone + Send + Sync + 'static,
1609 P: TryClone + Send + Sync + 'static,
1610 S: TryClone + Send + Sync + 'static,
1611{
1612 async fn resolve_names_to_relative_paths(
1613 &self,
1614 heads: Vec<VertexName>,
1615 names: Vec<VertexName>,
1616 ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
1617 self.deref()
1618 .resolve_names_to_relative_paths(heads, names)
1619 .await
1620 }
1621
1622 async fn resolve_relative_paths_to_names(
1623 &self,
1624 paths: Vec<AncestorPath>,
1625 ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
1626 self.deref().resolve_relative_paths_to_names(paths).await
1627 }
1628}
1629
1630#[async_trait::async_trait]
1635impl<IS, M, P, S> DagAlgorithm for AbstractNameDag<IdDag<IS>, M, P, S>
1636where
1637 IS: IdDagStore,
1638 IdDag<IS>: TryClone + 'static,
1639 M: TryClone + IdConvert + Sync + Send + 'static,
1640 P: TryClone + Sync + Send + 'static,
1641 S: TryClone + Sync + Send + 'static,
1642{
1643 async fn sort(&self, set: &NameSet) -> Result<NameSet> {
1645 let hints = set.hints();
1646 if hints.contains(Flags::TOPO_DESC)
1647 && matches!(hints.dag_version(), Some(v) if v <= self.dag_version())
1648 && matches!(hints.id_map_version(), Some(v) if v <= self.map_version())
1649 {
1650 Ok(set.clone())
1651 } else {
1652 let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1653 let mut spans = IdSet::empty();
1654 let mut iter = set.iter().await?.chunks(1 << 17);
1655 while let Some(names) = iter.next().await {
1656 let names = names.into_iter().collect::<Result<Vec<_>>>()?;
1657 let ids = self.vertex_id_batch(&names).await?;
1658 for id in ids {
1659 spans.push(id?);
1660 }
1661 }
1662 let result = NameSet::from_spans_dag(spans, self)?;
1663 result.hints().add_flags(flags);
1664 Ok(result)
1665 }
1666 }
1667
1668 async fn parent_names(&self, name: VertexName) -> Result<Vec<VertexName>> {
1670 let id = self.vertex_id(name).await?;
1671 let parent_ids = self.dag().parent_ids(id)?;
1672 let mut result = Vec::with_capacity(parent_ids.len());
1673 for id in parent_ids {
1674 result.push(self.vertex_name(id).await?);
1675 }
1676 Ok(result)
1677 }
1678
1679 async fn all(&self) -> Result<NameSet> {
1681 let spans = self.dag().all()?;
1682 let result = NameSet::from_spans_dag(spans, self)?;
1683 result.hints().add_flags(Flags::FULL);
1684 Ok(result)
1685 }
1686
1687 async fn master_group(&self) -> Result<NameSet> {
1689 let spans = self.dag().master_group()?;
1690 let result = NameSet::from_spans_dag(spans, self)?;
1691 result.hints().add_flags(Flags::ANCESTORS);
1692 Ok(result)
1693 }
1694
1695 async fn ancestors(&self, set: NameSet) -> Result<NameSet> {
1697 if set.hints().contains(Flags::ANCESTORS)
1698 && set.hints().dag_version() <= Some(self.dag_version())
1699 {
1700 return Ok(set);
1701 }
1702 let spans = self.to_id_set(&set).await?;
1703 let spans = self.dag().ancestors(spans)?;
1704 let result = NameSet::from_spans_dag(spans, self)?;
1705 result.hints().add_flags(Flags::ANCESTORS);
1706 Ok(result)
1707 }
1708
1709 async fn first_ancestors(&self, set: NameSet) -> Result<NameSet> {
1711 if set.hints().contains(Flags::ANCESTORS)
1713 && set.hints().dag_version() <= Some(self.dag_version())
1714 {
1715 return Ok(set);
1716 }
1717 let spans = self.to_id_set(&set).await?;
1718 let spans = self.dag().first_ancestors(spans)?;
1719 let result = NameSet::from_spans_dag(spans, self)?;
1720 #[cfg(test)]
1721 {
1722 result.assert_eq(crate::default_impl::first_ancestors(self, set).await?);
1723 }
1724 Ok(result)
1725 }
1726
1727 async fn merges(&self, set: NameSet) -> Result<NameSet> {
1729 let spans = self.to_id_set(&set).await?;
1730 let spans = self.dag().merges(spans)?;
1731 let result = NameSet::from_spans_dag(spans, self)?;
1732 #[cfg(test)]
1733 {
1734 result.assert_eq(crate::default_impl::merges(self, set).await?);
1735 }
1736 Ok(result)
1737 }
1738
1739 async fn parents(&self, set: NameSet) -> Result<NameSet> {
1744 let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1746 let spans = self.dag().parents(self.to_id_set(&set).await?)?;
1747 let result = NameSet::from_spans_dag(spans, self)?;
1748 result.hints().add_flags(flags);
1749 #[cfg(test)]
1750 {
1751 result.assert_eq(crate::default_impl::parents(self, set).await?);
1752 }
1753 Ok(result)
1754 }
1755
1756 async fn first_ancestor_nth(&self, name: VertexName, n: u64) -> Result<Option<VertexName>> {
1758 #[cfg(test)]
1759 let name2 = name.clone();
1760 let id = self.vertex_id(name).await?;
1761 let id = self.dag().try_first_ancestor_nth(id, n)?;
1762 let result = match id {
1763 None => None,
1764 Some(id) => Some(self.vertex_name(id).await?),
1765 };
1766 #[cfg(test)]
1767 {
1768 let result2 = crate::default_impl::first_ancestor_nth(self, name2, n).await?;
1769 assert_eq!(result, result2);
1770 }
1771 Ok(result)
1772 }
1773
1774 async fn heads(&self, set: NameSet) -> Result<NameSet> {
1776 if set.hints().contains(Flags::ANCESTORS)
1777 && set.hints().dag_version() <= Some(self.dag_version())
1778 {
1779 return self.heads_ancestors(set).await;
1781 }
1782 let spans = self.dag().heads(self.to_id_set(&set).await?)?;
1783 let result = NameSet::from_spans_dag(spans, self)?;
1784 #[cfg(test)]
1785 {
1786 result.assert_eq(crate::default_impl::heads(self, set).await?);
1787 }
1788 Ok(result)
1789 }
1790
1791 async fn children(&self, set: NameSet) -> Result<NameSet> {
1793 let spans = self.dag().children(self.to_id_set(&set).await?)?;
1794 let result = NameSet::from_spans_dag(spans, self)?;
1795 Ok(result)
1796 }
1797
1798 async fn roots(&self, set: NameSet) -> Result<NameSet> {
1800 let flags = extract_ancestor_flag_if_compatible(set.hints(), self.dag_version());
1801 let spans = self.dag().roots(self.to_id_set(&set).await?)?;
1802 let result = NameSet::from_spans_dag(spans, self)?;
1803 result.hints().add_flags(flags);
1804 #[cfg(test)]
1805 {
1806 result.assert_eq(crate::default_impl::roots(self, set).await?);
1807 }
1808 Ok(result)
1809 }
1810
1811 async fn gca_one(&self, set: NameSet) -> Result<Option<VertexName>> {
1817 let result: Option<VertexName> = match self.dag().gca_one(self.to_id_set(&set).await?)? {
1818 None => None,
1819 Some(id) => Some(self.vertex_name(id).await?),
1820 };
1821 #[cfg(test)]
1822 {
1823 assert_eq!(&result, &crate::default_impl::gca_one(self, set).await?);
1824 }
1825 Ok(result)
1826 }
1827
1828 async fn gca_all(&self, set: NameSet) -> Result<NameSet> {
1831 let spans = self.dag().gca_all(self.to_id_set(&set).await?)?;
1832 let result = NameSet::from_spans_dag(spans, self)?;
1833 #[cfg(test)]
1834 {
1835 result.assert_eq(crate::default_impl::gca_all(self, set).await?);
1836 }
1837 Ok(result)
1838 }
1839
1840 async fn common_ancestors(&self, set: NameSet) -> Result<NameSet> {
1842 let spans = self.dag().common_ancestors(self.to_id_set(&set).await?)?;
1843 let result = NameSet::from_spans_dag(spans, self)?;
1844 result.hints().add_flags(Flags::ANCESTORS);
1845 #[cfg(test)]
1846 {
1847 result.assert_eq(crate::default_impl::common_ancestors(self, set).await?);
1848 }
1849 Ok(result)
1850 }
1851
1852 async fn is_ancestor(&self, ancestor: VertexName, descendant: VertexName) -> Result<bool> {
1854 #[cfg(test)]
1855 let result2 =
1856 crate::default_impl::is_ancestor(self, ancestor.clone(), descendant.clone()).await?;
1857 let ancestor_id = self.vertex_id(ancestor).await?;
1858 let descendant_id = self.vertex_id(descendant).await?;
1859 let result = self.dag().is_ancestor(ancestor_id, descendant_id)?;
1860 #[cfg(test)]
1861 {
1862 assert_eq!(&result, &result2);
1863 }
1864 Ok(result)
1865 }
1866
1867 async fn heads_ancestors(&self, set: NameSet) -> Result<NameSet> {
1877 let spans = self.dag().heads_ancestors(self.to_id_set(&set).await?)?;
1878 let result = NameSet::from_spans_dag(spans, self)?;
1879 #[cfg(test)]
1880 {
1881 if !set.hints().contains(Flags::ANCESTORS) {
1884 result.assert_eq(crate::default_impl::heads_ancestors(self, set).await?);
1885 }
1886 }
1887 Ok(result)
1888 }
1889
1890 async fn range(&self, roots: NameSet, heads: NameSet) -> Result<NameSet> {
1892 let roots = self.to_id_set(&roots).await?;
1893 let heads = self.to_id_set(&heads).await?;
1894 let spans = self.dag().range(roots, heads)?;
1895 let result = NameSet::from_spans_dag(spans, self)?;
1896 Ok(result)
1897 }
1898
1899 async fn descendants(&self, set: NameSet) -> Result<NameSet> {
1901 let spans = self.dag().descendants(self.to_id_set(&set).await?)?;
1902 let result = NameSet::from_spans_dag(spans, self)?;
1903 Ok(result)
1904 }
1905
1906 async fn dirty(&self) -> Result<NameSet> {
1908 let all = self.dag().all()?;
1909 let spans = all.difference(&self.persisted_id_set);
1910 let set = NameSet::from_spans_dag(spans, self)?;
1911 Ok(set)
1912 }
1913
1914 fn is_vertex_lazy(&self) -> bool {
1915 !self.remote_protocol.is_local()
1916 }
1917
1918 fn dag_snapshot(&self) -> Result<Arc<dyn DagAlgorithm + Send + Sync>> {
1920 Ok(self.try_snapshot()? as Arc<dyn DagAlgorithm + Send + Sync>)
1921 }
1922
1923 fn id_dag_snapshot(&self) -> Result<Arc<dyn IdDagAlgorithm + Send + Sync>> {
1924 let store = self.dag.try_clone()?.store;
1925 Ok(Arc::new(store))
1926 }
1927
1928 fn dag_id(&self) -> &str {
1929 &self.id
1930 }
1931
1932 fn dag_version(&self) -> &VerLink {
1933 &self.dag.version()
1934 }
1935}
1936
1937fn extract_ancestor_flag_if_compatible(hints: &Hints, dag_version: &VerLink) -> Flags {
1940 if hints.dag_version() <= Some(dag_version) {
1941 hints.flags() & Flags::ANCESTORS
1942 } else {
1943 Flags::empty()
1944 }
1945}
1946
1947#[async_trait::async_trait]
1948impl<I, M, P, S> PrefixLookup for AbstractNameDag<I, M, P, S>
1949where
1950 I: Send + Sync,
1951 M: PrefixLookup + Send + Sync,
1952 P: Send + Sync,
1953 S: Send + Sync,
1954{
1955 async fn vertexes_by_hex_prefix(
1956 &self,
1957 hex_prefix: &[u8],
1958 limit: usize,
1959 ) -> Result<Vec<VertexName>> {
1960 let mut list = self.map.vertexes_by_hex_prefix(hex_prefix, limit).await?;
1961 let overlay_list = self
1962 .overlay_map
1963 .read()
1964 .unwrap()
1965 .lookup_vertexes_by_hex_prefix(hex_prefix, limit)?;
1966 list.extend(overlay_list);
1967 list.sort_unstable();
1968 list.dedup();
1969 list.truncate(limit);
1970 Ok(list)
1971 }
1972}
1973
1974#[async_trait::async_trait]
1975impl<IS, M, P, S> IdConvert for AbstractNameDag<IdDag<IS>, M, P, S>
1976where
1977 IS: IdDagStore,
1978 IdDag<IS>: TryClone,
1979 M: IdConvert + TryClone + Send + Sync + 'static,
1980 P: TryClone + Send + Sync + 'static,
1981 S: TryClone + Send + Sync + 'static,
1982{
1983 async fn vertex_id(&self, name: VertexName) -> Result<Id> {
1984 match self.map.vertex_id(name.clone()).await {
1985 Ok(id) => Ok(id),
1986 Err(crate::Error::VertexNotFound(_)) if self.is_vertex_lazy() => {
1987 if let Some(id) = self.overlay_map.read().unwrap().lookup_vertex_id(&name) {
1988 return Ok(id);
1989 }
1990 if self
1991 .missing_vertexes_confirmed_by_remote
1992 .read()
1993 .unwrap()
1994 .contains(&name)
1995 {
1996 return name.not_found();
1997 }
1998 let ids = self.resolve_vertexes_remotely(&[name.clone()]).await?;
1999 if let Some(Some(id)) = ids.first() {
2000 Ok(*id)
2001 } else {
2002 name.not_found()
2004 }
2005 }
2006 Err(e) => Err(e),
2007 }
2008 }
2009
2010 async fn vertex_id_with_max_group(
2011 &self,
2012 name: &VertexName,
2013 max_group: Group,
2014 ) -> Result<Option<Id>> {
2015 match self.map.vertex_id_with_max_group(name, max_group).await {
2016 Ok(Some(id)) => Ok(Some(id)),
2017 Err(err) => Err(err),
2018 Ok(None) if self.is_vertex_lazy() => {
2019 if let Some(id) = self.overlay_map.read().unwrap().lookup_vertex_id(&name) {
2020 return Ok(Some(id));
2021 }
2022 if self
2023 .missing_vertexes_confirmed_by_remote
2024 .read()
2025 .unwrap()
2026 .contains(&name)
2027 {
2028 return Ok(None);
2029 }
2030 if max_group == Group::MASTER
2031 && self
2032 .map
2033 .vertex_id_with_max_group(name, Group::NON_MASTER)
2034 .await?
2035 .is_some()
2036 {
2037 return Ok(None);
2040 }
2041 match self.resolve_vertexes_remotely(&[name.clone()]).await {
2042 Ok(ids) => match ids.first() {
2043 Some(Some(id)) => Ok(Some(*id)),
2044 Some(None) | None => Ok(None),
2045 },
2046 Err(e) => Err(e),
2047 }
2048 }
2049 Ok(None) => Ok(None),
2050 }
2051 }
2052
2053 async fn vertex_name(&self, id: Id) -> Result<VertexName> {
2054 match self.map.vertex_name(id).await {
2055 Ok(name) => Ok(name),
2056 Err(crate::Error::IdNotFound(_)) if self.is_vertex_lazy() => {
2057 if let Some(name) = self.overlay_map.read().unwrap().lookup_vertex_name(id) {
2058 return Ok(name);
2059 }
2060 let max_master_id = self.dag.master_group()?.max();
2062 if Some(id) > max_master_id {
2063 return id.not_found();
2064 }
2065 let names = self.resolve_ids_remotely(&[id]).await?;
2066 if let Some(name) = names.into_iter().next() {
2067 Ok(name)
2068 } else {
2069 id.not_found()
2070 }
2071 }
2072 Err(e) => Err(e),
2073 }
2074 }
2075
2076 async fn contains_vertex_name(&self, name: &VertexName) -> Result<bool> {
2077 match self.map.contains_vertex_name(name).await {
2078 Ok(true) => Ok(true),
2079 Ok(false) if self.is_vertex_lazy() => {
2080 if self
2081 .overlay_map
2082 .read()
2083 .unwrap()
2084 .lookup_vertex_id(name)
2085 .is_some()
2086 {
2087 return Ok(true);
2088 }
2089 if self
2090 .missing_vertexes_confirmed_by_remote
2091 .read()
2092 .unwrap()
2093 .contains(&name)
2094 {
2095 return Ok(false);
2096 }
2097 match self.resolve_vertexes_remotely(&[name.clone()]).await {
2098 Ok(ids) => match ids.first() {
2099 Some(Some(_)) => Ok(true),
2100 Some(None) | None => Ok(false),
2101 },
2102 Err(e) => Err(e),
2103 }
2104 }
2105 Ok(false) => Ok(false),
2106 Err(e) => Err(e),
2107 }
2108 }
2109
2110 async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
2111 let mut list = self.map.contains_vertex_id_locally(ids).await?;
2112 let map = self.overlay_map.read().unwrap();
2113 for (b, id) in list.iter_mut().zip(ids.iter().copied()) {
2114 if !*b {
2115 *b = *b || map.has_vertex_id(id);
2116 }
2117 }
2118 Ok(list)
2119 }
2120
2121 async fn contains_vertex_name_locally(&self, names: &[VertexName]) -> Result<Vec<bool>> {
2122 tracing::trace!("contains_vertex_name_locally names: {:?}", &names);
2123 let mut list = self.map.contains_vertex_name_locally(names).await?;
2124 tracing::trace!("contains_vertex_name_locally list (local): {:?}", &list);
2125 assert_eq!(list.len(), names.len());
2126 let map = self.overlay_map.read().unwrap();
2127 for (b, name) in list.iter_mut().zip(names.iter()) {
2128 if !*b && map.has_vertex_name(name) {
2129 tracing::trace!("contains_vertex_name_locally overlay has {:?}", &name);
2130 *b = true;
2131 }
2132 }
2133 Ok(list)
2134 }
2135
2136 async fn vertex_name_batch(&self, ids: &[Id]) -> Result<Vec<Result<VertexName>>> {
2137 let mut list = self.map.vertex_name_batch(ids).await?;
2138 if self.is_vertex_lazy() {
2139 {
2141 let map = self.overlay_map.read().unwrap();
2142 for (r, id) in list.iter_mut().zip(ids) {
2143 if let Some(name) = map.lookup_vertex_name(*id) {
2144 *r = Ok(name);
2145 }
2146 }
2147 }
2148 let missing_indexes: Vec<usize> = {
2150 let max_master_id = self.dag.master_group()?.max();
2151 list.iter()
2152 .enumerate()
2153 .filter_map(|(i, r)| match r {
2154 Err(_) if Some(ids[i]) <= max_master_id => Some(i),
2156 Err(_) | Ok(_) => None,
2157 })
2158 .collect()
2159 };
2160 let missing_ids: Vec<Id> = missing_indexes.iter().map(|i| ids[*i]).collect();
2161 let resolved = self.resolve_ids_remotely(&missing_ids).await?;
2162 for (i, name) in missing_indexes.into_iter().zip(resolved.into_iter()) {
2163 list[i] = Ok(name);
2164 }
2165 }
2166 Ok(list)
2167 }
2168
2169 async fn vertex_id_batch(&self, names: &[VertexName]) -> Result<Vec<Result<Id>>> {
2170 let mut list = self.map.vertex_id_batch(names).await?;
2171 if self.is_vertex_lazy() {
2172 {
2174 let map = self.overlay_map.read().unwrap();
2175 for (r, name) in list.iter_mut().zip(names) {
2176 if let Some(id) = map.lookup_vertex_id(name) {
2177 *r = Ok(id);
2178 }
2179 }
2180 }
2181 let missing_indexes: Vec<usize> = {
2183 let known_missing = self.missing_vertexes_confirmed_by_remote.read().unwrap();
2184 list.iter()
2185 .enumerate()
2186 .filter_map(|(i, r)| {
2187 if r.is_err() && !known_missing.contains(&names[i]) {
2188 Some(i)
2189 } else {
2190 None
2191 }
2192 })
2193 .collect()
2194 };
2195 if !missing_indexes.is_empty() {
2196 let missing_names: Vec<VertexName> =
2197 missing_indexes.iter().map(|i| names[*i].clone()).collect();
2198 let resolved = self.resolve_vertexes_remotely(&missing_names).await?;
2199 for (i, id) in missing_indexes.into_iter().zip(resolved.into_iter()) {
2200 if let Some(id) = id {
2201 list[i] = Ok(id);
2202 }
2203 }
2204 }
2205 }
2206 Ok(list)
2207 }
2208
2209 fn map_id(&self) -> &str {
2210 self.map.map_id()
2211 }
2212
2213 fn map_version(&self) -> &VerLink {
2214 self.map.map_version()
2215 }
2216}
2217
2218impl<IS, M, P, S> AbstractNameDag<IdDag<IS>, M, P, S>
2219where
2220 IS: IdDagStore,
2221 IdDag<IS>: TryClone + 'static,
2222 M: TryClone + Persist + IdMapWrite + IdConvert + Sync + Send + 'static,
2223 P: TryClone + Sync + Send + 'static,
2224 S: TryClone + Sync + Send + 'static,
2225{
2226 async fn build_with_lock(
2249 &mut self,
2250 parents: &dyn Parents,
2251 heads: &VertexListWithOptions,
2252 map_lock: &M::Lock,
2253 ) -> Result<()> {
2254 enum Input<'a> {
2256 Borrowed(&'a dyn Parents, &'a VertexListWithOptions),
2257 Owned(Box<dyn Parents>, VertexListWithOptions),
2258 }
2259
2260 let mut stack = vec![Input::Borrowed(parents, heads)];
2262
2263 let mut loop_count = 0;
2265
2266 while let Some(input) = stack.pop() {
2267 loop_count += 1;
2268 if loop_count > 2 {
2269 return bug("should not loop > 2 times (1st insertion+strip, 2nd reinsert)");
2270 }
2271
2272 let (parents, heads) = match &input {
2273 Input::Borrowed(p, h) => (*p, *h),
2274 Input::Owned(p, h) => (p.as_ref(), h),
2275 };
2276
2277 if self.is_vertex_lazy() {
2279 let heads: Vec<VertexName> = heads.vertexes();
2280 self.populate_missing_vertexes_for_add_heads(parents, &heads)
2281 .await?;
2282 }
2283
2284 let to_reassign: NameSet = self.find_vertexes_to_reassign(parents, heads).await?;
2287 if !to_reassign.is_empty().await? {
2288 let reinsert_heads: VertexListWithOptions = {
2289 let heads = self
2290 .heads(
2291 self.descendants(to_reassign.clone())
2292 .await?
2293 .difference(&to_reassign),
2294 )
2295 .await?;
2296 tracing::debug!(target: "dag::reassign", "need to rebuild heads: {:?}", &heads);
2297 let heads: Vec<VertexName> = heads.iter().await?.try_collect().await?;
2298 VertexListWithOptions::from(heads)
2299 };
2300 let reinsert_parents: Box<dyn Parents> = Box::new(self.dag_snapshot()?);
2301 self.strip_with_lock(&to_reassign, map_lock).await?;
2302
2303 stack.push(Input::Owned(reinsert_parents, reinsert_heads));
2305 };
2306
2307 let mut outcome = PreparedFlatSegments::default();
2309 let mut covered = self.dag().all_ids_in_groups(&Group::ALL)?;
2310 let mut reserved = calculate_initial_reserved(self, &covered, heads).await?;
2311 for group in [Group::MASTER, Group::NON_MASTER] {
2312 for (vertex, opts) in heads.vertex_options() {
2313 if opts.highest_group != group {
2314 continue;
2315 }
2316 let prepared_segments = self
2319 .assign_head(vertex.clone(), parents, group, &mut covered, &reserved)
2320 .await?;
2321 outcome.merge(prepared_segments);
2322 if opts.reserve_size > 0 {
2324 let low = self.map.vertex_id(vertex).await? + 1;
2325 update_reserved(&mut reserved, &covered, low, opts.reserve_size);
2326 }
2327 }
2328 }
2329
2330 self.dag
2332 .build_segments_from_prepared_flat_segments(&outcome)?;
2333
2334 self.update_overlay_map_id_set()?;
2337 }
2338
2339 Ok(())
2340 }
2341
2342 async fn find_vertexes_to_reassign(
2350 &self,
2351 parents: &dyn Parents,
2352 heads: &VertexListWithOptions,
2353 ) -> Result<NameSet> {
2354 let master_heads = heads.vertexes_by_group(Group::MASTER);
2356
2357 let mut id_set = IdSet::empty();
2359 let mut to_visit: Vec<VertexName> = master_heads;
2360 let mut visited = HashSet::new();
2361 while let Some(vertex) = to_visit.pop() {
2362 if !visited.insert(vertex.clone()) {
2363 continue;
2364 }
2365 let id = self.vertex_id_optional(&vertex).await?;
2366 if let Some(id) = id {
2368 if id.group() == Group::MASTER {
2369 continue;
2371 } else {
2372 id_set.push(id);
2374 }
2375 }
2376 let parents = parents.parent_names(vertex).await?;
2377 to_visit.extend(parents);
2378 }
2379
2380 let set = NameSet::from_spans_dag(id_set, self)?;
2381 tracing::debug!(target: "dag::reassign", "need to reassign: {:?}", &set);
2382 Ok(set)
2383 }
2384}
2385
2386async fn calculate_initial_reserved(
2392 map: &dyn IdConvert,
2393 covered: &IdSet,
2394 heads: &VertexListWithOptions,
2395) -> Result<IdSet> {
2396 let mut reserved = IdSet::empty();
2397 for (vertex, opts) in heads.vertex_options() {
2398 if opts.reserve_size == 0 {
2399 continue;
2401 }
2402 if let Some(id) = map
2403 .vertex_id_with_max_group(&vertex, opts.highest_group)
2404 .await?
2405 {
2406 update_reserved(&mut reserved, covered, id + 1, opts.reserve_size);
2407 }
2408 }
2409 Ok(reserved)
2410}
2411
2412fn update_reserved(reserved: &mut IdSet, covered: &IdSet, low: Id, reserve_size: u32) {
2413 if reserve_size == 0 {
2414 return;
2415 }
2416 let span = find_free_span(covered, low, reserve_size as _, true);
2417 reserved.push(span);
2418}
2419
2420fn find_free_span(covered: &IdSet, low: Id, reserve_size: u64, shrink_to_fit: bool) -> IdSpan {
2429 assert!(reserve_size > 0);
2430 let mut low = low;
2431 let mut high;
2432 loop {
2433 high = (low + (reserve_size as u64) - 1).min(low.group().max_id());
2434 let reserved = IdSet::from_spans(vec![low..=high]);
2436 let intersected = reserved.intersection(covered);
2437 if let Some(span) = intersected.iter_span_asc().next() {
2438 let last_free = span.low - 1;
2441 if last_free >= low && shrink_to_fit {
2442 high = last_free;
2453 } else {
2454 low = span.high + 1;
2463 continue;
2464 }
2465 }
2466 break;
2467 }
2468 let span = IdSpan::new(low, high);
2469 if !shrink_to_fit {
2470 assert_eq!(span.count(), reserve_size);
2471 }
2472 span
2473}
2474
2475fn is_ok_some<T>(value: Result<Option<T>>) -> bool {
2476 match value {
2477 Ok(Some(_)) => true,
2478 _ => false,
2479 }
2480}
2481
2482impl<IS, M, P, S> IdMapSnapshot for AbstractNameDag<IdDag<IS>, M, P, S>
2483where
2484 IS: IdDagStore,
2485 IdDag<IS>: TryClone + 'static,
2486 M: TryClone + IdConvert + Send + Sync + 'static,
2487 P: TryClone + Send + Sync + 'static,
2488 S: TryClone + Send + Sync + 'static,
2489{
2490 fn id_map_snapshot(&self) -> Result<Arc<dyn IdConvert + Send + Sync>> {
2491 Ok(self.try_snapshot()? as Arc<dyn IdConvert + Send + Sync>)
2492 }
2493}
2494
2495impl<IS, M, P, S> fmt::Debug for AbstractNameDag<IdDag<IS>, M, P, S>
2496where
2497 IS: IdDagStore,
2498 M: IdConvert + Send + Sync,
2499 P: Send + Sync,
2500 S: Send + Sync,
2501{
2502 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2503 debug(&self.dag, &self.map, f)
2504 }
2505}
2506
2507pub(crate) fn debug_segments_by_level_group<S: IdDagStore>(
2508 iddag: &IdDag<S>,
2509 idmap: &dyn IdConvert,
2510 level: Level,
2511 group: Group,
2512) -> Vec<String> {
2513 let mut result = Vec::new();
2514 let show = |id: Id| DebugId {
2516 id,
2517 name: non_blocking_result(idmap.vertex_name(id)).ok(),
2518 };
2519 let show_flags = |flags: SegmentFlags| -> String {
2520 let mut result = Vec::new();
2521 if flags.contains(SegmentFlags::HAS_ROOT) {
2522 result.push("Root");
2523 }
2524 if flags.contains(SegmentFlags::ONLY_HEAD) {
2525 result.push("OnlyHead");
2526 }
2527 result.join(" ")
2528 };
2529
2530 if let Ok(segments) = iddag.next_segments(group.min_id(), level) {
2531 for segment in segments.into_iter().rev() {
2532 if let (Ok(span), Ok(parents), Ok(flags)) =
2533 (segment.span(), segment.parents(), segment.flags())
2534 {
2535 let mut line = format!(
2536 "{:.12?} : {:.12?} {:.12?}",
2537 show(span.low),
2538 show(span.high),
2539 parents.into_iter().map(show).collect::<Vec<_>>(),
2540 );
2541 let flags = show_flags(flags);
2542 if !flags.is_empty() {
2543 line += &format!(" {}", flags);
2544 }
2545 result.push(line);
2546 }
2547 }
2548 }
2549 result
2550}
2551
2552fn debug<S: IdDagStore>(
2553 iddag: &IdDag<S>,
2554 idmap: &dyn IdConvert,
2555 f: &mut fmt::Formatter,
2556) -> fmt::Result {
2557 if let Ok(max_level) = iddag.max_level() {
2558 writeln!(f, "Max Level: {}", max_level)?;
2559 for lv in (0..=max_level).rev() {
2560 writeln!(f, " Level {}", lv)?;
2561 for group in Group::ALL.iter().cloned() {
2562 writeln!(f, " {}:", group)?;
2563 if let Ok(segments) = iddag.next_segments(group.min_id(), lv) {
2564 writeln!(f, " Segments: {}", segments.len())?;
2565 for line in debug_segments_by_level_group(iddag, idmap, lv, group) {
2566 writeln!(f, " {}", line)?;
2567 }
2568 }
2569 }
2570 }
2571 }
2572
2573 Ok(())
2574}
2575
2576struct DebugId {
2577 id: Id,
2578 name: Option<VertexName>,
2579}
2580
2581impl fmt::Debug for DebugId {
2582 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2583 if let Some(name) = &self.name {
2584 fmt::Debug::fmt(&name, f)?;
2585 f.write_str("+")?;
2586 }
2587 write!(f, "{:?}", self.id)?;
2588 Ok(())
2589 }
2590}