Skip to main content

lance/dataset/
refs.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::Range;
5
6use chrono::{DateTime, Utc};
7use futures::stream::{StreamExt, TryStreamExt};
8use itertools::Itertools;
9use lance_io::object_store::ObjectStore;
10use lance_table::io::commit::CommitHandler;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14
15use crate::dataset::branch_location::BranchLocation;
16use crate::dataset::refs::Ref::{Tag, Version, VersionNumber};
17use crate::utils::temporal::utc_now;
18use crate::{Error, Result};
19use serde::de::DeserializeOwned;
20use std::cmp::Ordering;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::Formatter;
24use std::io::ErrorKind;
25use uuid::Uuid;
26
27pub const MAIN_BRANCH: &str = "main";
28
29/// Lance Ref
30#[derive(Debug, Clone)]
31pub enum Ref {
32    // Version number points of the current branch
33    VersionNumber(u64),
34    // This is a global version identifier present as (branch_name, version_number)
35    // if branch_name is None, it points to the main branch
36    // if version_number is None, it points to the latest version
37    Version(Option<String>, Option<u64>),
38    // Tag name points to the global version identifier, could be considered as an alias of specific global version
39    Tag(String),
40}
41
42impl From<u64> for Ref {
43    fn from(reference: u64) -> Self {
44        VersionNumber(reference)
45    }
46}
47
48impl From<&str> for Ref {
49    fn from(reference: &str) -> Self {
50        Tag(reference.to_string())
51    }
52}
53
54impl From<(&str, u64)> for Ref {
55    fn from(reference: (&str, u64)) -> Self {
56        Version(standardize_branch(reference.0), Some(reference.1))
57    }
58}
59
60impl From<(Option<&str>, Option<u64>)> for Ref {
61    fn from(reference: (Option<&str>, Option<u64>)) -> Self {
62        Version(reference.0.and_then(standardize_branch), reference.1)
63    }
64}
65
66impl From<(&str, Option<u64>)> for Ref {
67    fn from(reference: (&str, Option<u64>)) -> Self {
68        Version(standardize_branch(reference.0), reference.1)
69    }
70}
71
72impl fmt::Display for Ref {
73    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
74        match self {
75            Version(branch, version_number) => {
76                let version_str = version_number
77                    .map(|v| v.to_string())
78                    .unwrap_or_else(|| "latest".to_string());
79                write!(f, "{}:{}", normalize_branch(branch.as_deref()), version_str)
80            }
81            VersionNumber(version_number) => write!(f, "{}", version_number),
82            Tag(tag_name) => write!(f, "{}", tag_name),
83        }
84    }
85}
86
87#[derive(Debug, Clone)]
88pub struct Refs {
89    pub(crate) object_store: Arc<ObjectStore>,
90    pub(crate) commit_handler: Arc<dyn CommitHandler>,
91    pub(crate) base_location: BranchLocation,
92}
93
94impl Refs {
95    pub fn new(
96        object_store: Arc<ObjectStore>,
97        commit_handler: Arc<dyn CommitHandler>,
98        base_location: BranchLocation,
99    ) -> Self {
100        Self {
101            object_store,
102            commit_handler,
103            base_location,
104        }
105    }
106
107    pub fn tags(&self) -> Tags<'_> {
108        Tags { refs: self }
109    }
110
111    pub fn branches(&self) -> Branches<'_> {
112        Branches { refs: self }
113    }
114
115    pub fn base(&self) -> &Path {
116        &self.base_location.path
117    }
118
119    pub fn root(&self) -> Result<BranchLocation> {
120        self.base_location.find_main()
121    }
122}
123
124/// Tags operation
125#[derive(Debug, Clone)]
126pub struct Tags<'a> {
127    refs: &'a Refs,
128}
129
130/// Branches operation
131#[derive(Debug, Clone)]
132pub struct Branches<'a> {
133    refs: &'a Refs,
134}
135
136impl Tags<'_> {
137    fn object_store(&self) -> &ObjectStore {
138        &self.refs.object_store
139    }
140}
141
142impl Branches<'_> {
143    fn object_store(&self) -> &ObjectStore {
144        &self.refs.object_store
145    }
146}
147
148impl Tags<'_> {
149    pub async fn fetch_tags(&self) -> Result<Vec<(String, TagContents)>> {
150        let root_location = self.refs.root()?;
151        let base_path = base_tags_path(&root_location.path);
152        let tag_files = self.object_store().read_dir(base_path).await?;
153
154        let tag_names: Vec<String> = tag_files
155            .iter()
156            .filter_map(|name| name.strip_suffix(".json"))
157            .map(|name| name.to_string())
158            .collect_vec();
159
160        let root_path = &root_location.path;
161        futures::stream::iter(tag_names)
162            .map(|tag_name| async move {
163                let contents =
164                    TagContents::from_path(&tag_path(root_path, &tag_name), self.object_store())
165                        .await?;
166                Ok((tag_name, contents))
167            })
168            .buffer_unordered(10)
169            .try_collect()
170            .await
171    }
172
173    pub async fn list(&self) -> Result<HashMap<String, TagContents>> {
174        self.fetch_tags()
175            .await
176            .map(|tags| tags.into_iter().collect())
177    }
178
179    pub async fn list_tags_ordered(
180        &self,
181        order: Option<Ordering>,
182    ) -> Result<Vec<(String, TagContents)>> {
183        let mut tags = self.fetch_tags().await?;
184        tags.sort_by(|a, b| {
185            let desired_ordering = order.unwrap_or(Ordering::Greater);
186            let version_ordering = a.1.version.cmp(&b.1.version);
187            let version_result = match desired_ordering {
188                Ordering::Less => version_ordering,
189                _ => version_ordering.reverse(),
190            };
191            version_result.then_with(|| a.0.cmp(&b.0))
192        });
193        Ok(tags)
194    }
195
196    pub async fn get_version(&self, tag: &str) -> Result<u64> {
197        self.get(tag).await.map(|tag| tag.version)
198    }
199
200    pub async fn get(&self, tag: &str) -> Result<TagContents> {
201        check_valid_tag(tag)?;
202
203        let root_location = self.refs.root()?;
204        let tag_file = tag_path(&root_location.path, tag);
205
206        if !self.object_store().exists(&tag_file).await? {
207            return Err(Error::RefNotFound {
208                message: format!("tag {} does not exist", tag),
209            });
210        }
211
212        let tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?;
213        Ok(tag_contents)
214    }
215
216    pub async fn create(&self, tag: &str, reference: impl Into<Ref>) -> Result<()> {
217        check_valid_tag(tag)?;
218        let root_location = self.refs.root()?;
219        let tag_file = tag_path(&root_location.path, tag);
220
221        if self.object_store().exists(&tag_file).await? {
222            return Err(Error::RefConflict {
223                message: format!("tag {} already exists", tag),
224            });
225        }
226        let now = utc_now();
227        let tag_contents = self
228            .build_tag_content_by_ref(reference, Some(now), Some(now))
229            .await?;
230
231        self.object_store()
232            .put(
233                &tag_file,
234                serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
235            )
236            .await
237            .map(|_| ())
238    }
239
240    pub async fn delete(&self, tag: &str) -> Result<()> {
241        check_valid_tag(tag)?;
242
243        let root_location = self.refs.root()?;
244        let tag_file = tag_path(&root_location.path, tag);
245
246        if !self.object_store().exists(&tag_file).await? {
247            return Err(Error::RefNotFound {
248                message: format!("tag {} does not exist", tag),
249            });
250        }
251
252        self.object_store().delete(&tag_file).await
253    }
254
255    pub async fn update(&self, tag: &str, reference: impl Into<Ref>) -> Result<()> {
256        check_valid_tag(tag)?;
257
258        let root_location = self.refs.root()?;
259        let tag_file = tag_path(&root_location.path, tag);
260        if !self.object_store().exists(&tag_file).await? {
261            return Err(Error::RefNotFound {
262                message: format!("tag {} does not exist", tag),
263            });
264        }
265        let mut tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?;
266        let updated_reference = self
267            .build_tag_content_by_ref(reference, tag_contents.created_at, Some(utc_now()))
268            .await?;
269        tag_contents.branch = updated_reference.branch;
270        tag_contents.version = updated_reference.version;
271        tag_contents.created_at = updated_reference.created_at;
272        tag_contents.updated_at = updated_reference.updated_at;
273        tag_contents.manifest_size = updated_reference.manifest_size;
274
275        self.object_store()
276            .put(
277                &tag_file,
278                serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
279            )
280            .await
281            .map(|_| ())
282    }
283
284    pub async fn replace_metadata(
285        &self,
286        tag: &str,
287        metadata: HashMap<String, String>,
288    ) -> Result<()> {
289        check_valid_tag(tag)?;
290
291        let root_location = self.refs.root()?;
292        let tag_file = tag_path(&root_location.path, tag);
293        if !self.object_store().exists(&tag_file).await? {
294            return Err(Error::RefNotFound {
295                message: format!("tag {} does not exist", tag),
296            });
297        }
298
299        let mut tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?;
300        tag_contents.metadata = metadata;
301
302        self.object_store()
303            .put(
304                &tag_file,
305                serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
306            )
307            .await
308            .map(|_| ())
309    }
310
311    async fn build_tag_content_by_ref(
312        &self,
313        reference: impl Into<Ref>,
314        created_at: Option<DateTime<Utc>>,
315        updated_at: Option<DateTime<Utc>>,
316    ) -> Result<TagContents> {
317        let reference = reference.into();
318        let (branch, version_number) = match reference {
319            Version(branch, version_number) => (branch, version_number),
320            VersionNumber(version_number) => {
321                (self.refs.base_location.branch.clone(), Some(version_number))
322            }
323            Tag(tag_name) => {
324                let tag_content = self.get(tag_name.as_str()).await?;
325                (tag_content.branch, Some(tag_content.version))
326            }
327        };
328
329        let branch_location = self.refs.base_location.find_branch(branch.as_deref())?;
330        let manifest_file = if let Some(version_number) = version_number {
331            self.refs
332                .commit_handler
333                .resolve_version_location(
334                    &branch_location.path,
335                    version_number,
336                    &self.refs.object_store.inner,
337                )
338                .await?
339        } else {
340            self.refs
341                .commit_handler
342                .resolve_latest_location(&branch_location.path, &self.refs.object_store)
343                .await?
344        };
345
346        if !self.object_store().exists(&manifest_file.path).await? {
347            return Err(Error::VersionNotFound {
348                message: format!("version {} does not exist", Version(branch, version_number)),
349            });
350        }
351
352        let manifest_size = if let Some(size) = manifest_file.size {
353            size as usize
354        } else {
355            self.object_store().size(&manifest_file.path).await? as usize
356        };
357
358        let tag_contents = TagContents {
359            branch,
360            version: manifest_file.version,
361            created_at,
362            updated_at,
363            manifest_size,
364            metadata: HashMap::new(),
365        };
366        Ok(tag_contents)
367    }
368}
369
370impl Branches<'_> {
371    pub(crate) fn is_main_branch(branch: Option<&str>) -> bool {
372        branch == Some(MAIN_BRANCH)
373    }
374
375    pub async fn fetch(&self) -> Result<Vec<(String, BranchContents)>> {
376        let root_location = self.refs.root()?;
377        let base_path = base_branches_contents_path(&root_location.path);
378        let branch_files = self.object_store().read_dir(base_path).await?;
379
380        let branch_names: Vec<String> = branch_files
381            .iter()
382            .filter_map(|name| name.strip_suffix(".json"))
383            .map(|str| {
384                Path::from_url_path(str)
385                    .map_err(|e| Error::InvalidRef {
386                        message: format!(
387                            "Failed to decode branch name: {} due to exception {}",
388                            str, e
389                        ),
390                    })
391                    .map(|path| path.to_string())
392            })
393            .collect::<Result<Vec<_>>>()?;
394
395        let branch_path = &root_location.path;
396        futures::stream::iter(branch_names)
397            .map(|name| async move {
398                let contents = BranchContents::from_path(
399                    &branch_contents_path(branch_path, &name),
400                    self.object_store(),
401                    &name,
402                )
403                .await?;
404                Ok((name, contents))
405            })
406            .buffer_unordered(10)
407            .try_collect()
408            .await
409    }
410
411    pub async fn list(&self) -> Result<HashMap<String, BranchContents>> {
412        self.fetch()
413            .await
414            .map(|branches| branches.into_iter().collect())
415    }
416
417    pub async fn get(&self, branch: &str) -> Result<BranchContents> {
418        check_valid_branch(branch)?;
419
420        let root_location = self.refs.root()?;
421        let branch_file = branch_contents_path(&root_location.path, branch);
422
423        if !self.object_store().exists(&branch_file).await? {
424            return Err(Error::RefNotFound {
425                message: format!("branch {} does not exist", branch),
426            });
427        }
428
429        let branch_contents =
430            BranchContents::from_path(&branch_file, self.object_store(), branch).await?;
431
432        Ok(branch_contents)
433    }
434
435    pub async fn get_identifier(&self, branch: Option<&str>) -> Result<BranchIdentifier> {
436        if let Some(branch_name) = branch {
437            let branch_contents = self.get(branch_name).await?;
438            Ok(branch_contents.identifier)
439        } else {
440            Ok(BranchIdentifier::main())
441        }
442    }
443
444    // Only create branch metadata
445    pub(crate) async fn create(
446        &self,
447        branch_name: &str,
448        version_number: u64,
449        source_branch: Option<&str>,
450    ) -> Result<()> {
451        check_valid_branch(branch_name)?;
452
453        let source_branch = source_branch.and_then(standardize_branch);
454        let root_location = self.refs.root()?;
455        let branch_file = branch_contents_path(&root_location.path, branch_name);
456        if self.object_store().exists(&branch_file).await? {
457            return Err(Error::RefConflict {
458                message: format!("branch {} already exists", branch_name),
459            });
460        }
461
462        let branch_location = self
463            .refs
464            .base_location
465            .find_branch(source_branch.as_deref())?;
466        // Verify the source version exists
467        let manifest_file = self
468            .refs
469            .commit_handler
470            .resolve_version_location(
471                &branch_location.path,
472                version_number,
473                &self.refs.object_store.inner,
474            )
475            .await?;
476
477        if !self.object_store().exists(&manifest_file.path).await? {
478            return Err(Error::VersionNotFound {
479                message: format!("Manifest file {} does not exist", &manifest_file.path),
480            });
481        };
482
483        let parent_branch_id = if let Some(ref parent_branch) = source_branch {
484            let parent_file = branch_contents_path(&root_location.path, parent_branch);
485            if self.object_store().exists(&parent_file).await? {
486                BranchContents::from_path(&parent_file, self.object_store(), parent_branch)
487                    .await?
488                    .identifier
489            } else {
490                return Err(Error::RefNotFound {
491                    message: format!("Parent branch {} does not exist", branch_name),
492                });
493            }
494        } else {
495            BranchIdentifier::main()
496        };
497
498        let branch_contents = BranchContents {
499            parent_branch: source_branch,
500            identifier: BranchIdentifier::new(&parent_branch_id, version_number),
501            parent_version: version_number,
502            create_at: chrono::Utc::now().timestamp() as u64,
503            manifest_size: if let Some(size) = manifest_file.size {
504                size as usize
505            } else {
506                self.object_store().size(&manifest_file.path).await? as usize
507            },
508            metadata: HashMap::new(),
509        };
510
511        self.object_store()
512            .put(
513                &branch_file,
514                serde_json::to_string_pretty(&branch_contents)?.as_bytes(),
515            )
516            .await
517            .map(|_| ())
518    }
519
520    pub async fn replace_metadata(
521        &self,
522        branch: &str,
523        metadata: HashMap<String, String>,
524    ) -> Result<()> {
525        check_valid_branch(branch)?;
526
527        let root_location = self.refs.root()?;
528        let branch_file = branch_contents_path(&root_location.path, branch);
529        if !self.object_store().exists(&branch_file).await? {
530            return Err(Error::RefNotFound {
531                message: format!("branch {} does not exist", branch),
532            });
533        }
534
535        let mut branch_contents =
536            BranchContents::from_path(&branch_file, self.object_store(), branch).await?;
537        branch_contents.metadata = metadata;
538
539        self.object_store()
540            .put(
541                &branch_file,
542                serde_json::to_string_pretty(&branch_contents)?.as_bytes(),
543            )
544            .await
545            .map(|_| ())
546    }
547
548    /// Delete a branch
549    ///
550    /// If the `BranchContents` does not exist, it will return an error directly unless `force` is true.
551    /// If `force` is true, it will try to delete the branch directories no matter `BranchContents` exists or not.
552    pub async fn delete(&self, branch: &str, force: bool) -> Result<()> {
553        check_valid_branch(branch)?;
554
555        let all_branches = self.list().await?;
556        let branch_id = all_branches
557            .get(branch)
558            .map(|contents| contents.identifier.clone());
559        if let Some(branch_id) = branch_id {
560            let referenced_versions = branch_id.collect_referenced_versions(&all_branches);
561            if !referenced_versions.is_empty() {
562                return Err(Error::RefConflict {
563                    message: format!(
564                        "Branch {} is referenced by {:?} versions, can not delete",
565                        branch, referenced_versions
566                    ),
567                });
568            }
569        } else if !force {
570            return Err(Error::RefNotFound {
571                message: format!("Branch {} does not exist", branch),
572            });
573        } else {
574            log::warn!("BranchContents of {} does not exist", branch);
575        }
576
577        let root_location = self.refs.root()?;
578        let branch_file = branch_contents_path(&root_location.path, branch);
579        if self.object_store().exists(&branch_file).await? {
580            self.object_store().delete(&branch_file).await?;
581        }
582
583        // Clean up branch directories
584        self.cleanup_branch_directories(branch).await
585    }
586
587    pub async fn list_ordered(
588        &self,
589        order: Option<Ordering>,
590    ) -> Result<Vec<(String, BranchContents)>> {
591        let mut branches = self.fetch().await?;
592        branches.sort_by(|a, b| {
593            let desired_ordering = order.unwrap_or(Ordering::Greater);
594            let version_ordering = a.1.parent_version.cmp(&b.1.parent_version);
595            let version_result = match desired_ordering {
596                Ordering::Less => version_ordering,
597                _ => version_ordering.reverse(),
598            };
599            version_result.then_with(|| a.0.cmp(&b.0))
600        });
601        Ok(branches)
602    }
603
604    /// Clean up empty parent directories
605    async fn cleanup_branch_directories(&self, branch: &str) -> Result<()> {
606        let branches = self.list().await?;
607        let remaining_branches: Vec<&str> = branches.keys().map(|k| k.as_str()).collect();
608
609        if let Some(delete_path) =
610            Self::get_cleanup_path(branch, &remaining_branches, &self.refs.base_location)?
611            && let Err(e) = self.refs.object_store.remove_dir_all(delete_path).await
612        {
613            match &e {
614                Error::IO { source, .. } => {
615                    if let Some(io_err) = source.downcast_ref::<std::io::Error>() {
616                        if io_err.kind() == ErrorKind::NotFound {
617                            log::debug!("Branch directory already deleted: {}", io_err);
618                        } else {
619                            return Err(e);
620                        }
621                    } else {
622                        return Err(e);
623                    }
624                }
625                _ => return Err(e),
626            }
627        }
628        Ok(())
629    }
630
631    fn get_cleanup_path(
632        branch: &str,
633        remaining_branches: &[&str],
634        base_location: &BranchLocation,
635    ) -> Result<Option<Path>> {
636        let deleted_branch = BranchRelativePath::new(branch);
637        let mut related_branches = Vec::new();
638        let mut relative_dir = branch.to_string();
639        for branch in remaining_branches {
640            let branch = BranchRelativePath::new(branch);
641            if branch.is_parent(&deleted_branch) || branch.is_child(&deleted_branch) {
642                related_branches.push(branch);
643            } else if let Some(common_prefix) = deleted_branch.find_common_prefix(&branch) {
644                related_branches.push(common_prefix);
645            }
646        }
647
648        related_branches.sort_by(|a, b| a.segments.len().cmp(&b.segments.len()).reverse());
649        if let Some(branch) = related_branches.first() {
650            if branch.is_child(&deleted_branch) || branch == &deleted_branch {
651                // There are children of the deleted branch, we can't delete any directory for now
652                // Example: deleted_branch = "a/b/c", remaining_branches = ["a/b/c/d"], we need to delete nothing
653                return Ok(None);
654            } else {
655                // We pick the longest common directory between the deleted branch and the remaining branches
656                // Then delete the first child of this common directory
657                // Example: deleted_branch = "a/b/c", remaining_branches = ["a"], we need to delete "a/b"
658                relative_dir = format!(
659                    "{}/{}",
660                    branch.segments.join("/"),
661                    deleted_branch.segments[branch.segments.len()]
662                );
663            }
664        } else if !deleted_branch.segments.is_empty() {
665            // There are no common directories between the deleted branch and the remaining branches
666            // We need to delete the entire directory
667            // Example: deleted_branch = "a/b/c", remaining_branches = [], we need to delete "a"
668            relative_dir = deleted_branch.segments[0].to_string();
669        }
670
671        let absolute_dir = base_location.find_branch(Some(relative_dir.as_str()))?;
672        Ok(Some(absolute_dir.path))
673    }
674}
675
676#[derive(Debug, PartialEq)]
677struct BranchRelativePath<'a> {
678    segments: Vec<&'a str>,
679}
680
681impl<'a> BranchRelativePath<'a> {
682    fn new(branch_name: &'a str) -> Self {
683        let segments = branch_name.split('/').collect_vec();
684        Self { segments }
685    }
686
687    fn find_common_prefix(&self, other: &Self) -> Option<Self> {
688        let mut common_segments = Vec::new();
689        for (i, segment) in self.segments.iter().enumerate() {
690            if i >= other.segments.len() || other.segments[i] != *segment {
691                break;
692            }
693            common_segments.push(*segment);
694        }
695        if !common_segments.is_empty() {
696            Some(BranchRelativePath {
697                segments: common_segments,
698            })
699        } else {
700            None
701        }
702    }
703
704    fn is_parent(&self, other: &Self) -> bool {
705        if other.segments.len() <= self.segments.len() {
706            false
707        } else {
708            for (i, segment) in self.segments.iter().enumerate() {
709                if other.segments[i] != *segment {
710                    return false;
711                }
712            }
713            true
714        }
715    }
716
717    fn is_child(&self, other: &Self) -> bool {
718        if other.segments.len() >= self.segments.len() {
719            false
720        } else {
721            for (i, segment) in other.segments.iter().enumerate() {
722                if self.segments[i] != *segment {
723                    return false;
724                }
725            }
726            true
727        }
728    }
729}
730
731#[derive(Debug, Clone, Serialize, Deserialize)]
732#[serde(rename_all = "camelCase")]
733pub struct TagContents {
734    pub branch: Option<String>,
735    pub version: u64,
736    #[serde(skip_serializing_if = "Option::is_none")]
737    pub created_at: Option<DateTime<Utc>>,
738    #[serde(skip_serializing_if = "Option::is_none")]
739    pub updated_at: Option<DateTime<Utc>>,
740    pub manifest_size: usize,
741    /// Metadata associated with this tag.
742    ///
743    /// Missing metadata is deserialized as an empty map.
744    #[serde(default)]
745    pub metadata: HashMap<String, String>,
746}
747
748#[derive(Debug, Clone, Serialize, Deserialize)]
749#[serde(rename_all = "camelCase")]
750pub struct BranchContents {
751    pub parent_branch: Option<String>,
752    #[serde(default = "BranchIdentifier::missing_identifier_sentinel")]
753    pub identifier: BranchIdentifier,
754    pub parent_version: u64,
755    pub create_at: u64, // unix timestamp
756    pub manifest_size: usize,
757    /// Metadata associated with this branch.
758    ///
759    /// Missing metadata is deserialized as an empty map.
760    #[serde(default)]
761    pub metadata: HashMap<String, String>,
762}
763
764#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
765pub struct BranchIdentifier {
766    pub version_mapping: Vec<(u64, String)>,
767}
768
769impl BranchIdentifier {
770    pub fn new(parent: &Self, parent_version: u64) -> Self {
771        let mut version_mapping = parent.version_mapping.clone();
772        version_mapping.push((parent_version, Uuid::new_v4().simple().to_string()));
773        Self { version_mapping }
774    }
775
776    /// Creates a sentinel identifier for legacy branch metadata that lacks an explicit
777    /// identifier.
778    ///
779    /// `BranchContents::from_path` replaces this value with a deterministic synthetic
780    /// identifier. Keeping this sentinel stable lets us distinguish missing identifiers from
781    /// persisted identifiers without changing this field to `Option<BranchIdentifier>`.
782    pub fn missing_identifier_sentinel() -> Self {
783        Self {
784            version_mapping: vec![(0, Uuid::nil().simple().to_string())],
785        }
786    }
787
788    fn synthetic_identifier(
789        branch_name: &str,
790        parent_branch: Option<&str>,
791        parent_version: u64,
792        create_at: u64,
793    ) -> Self {
794        let identifier_input = format!(
795            "branch_name={branch_name}\nparent_branch={}\nparent_version={parent_version}\ncreate_at={create_at}",
796            parent_branch.unwrap_or("")
797        );
798        Self {
799            version_mapping: vec![(
800                0,
801                Uuid::from_bytes(Self::synthetic_identifier_bytes(
802                    identifier_input.as_bytes(),
803                ))
804                .simple()
805                .to_string(),
806            )],
807        }
808    }
809
810    fn synthetic_identifier_bytes(input: &[u8]) -> [u8; 16] {
811        // Use fixed, local hashing so legacy fallback identifiers stay deterministic without
812        // enabling extra UUID generation features.
813        const FNV_OFFSET: u64 = 0xcbf29ce484222325;
814        const FNV_PRIME: u64 = 0x100000001b3;
815
816        fn hash_with_seed(input: &[u8], seed: u64) -> u64 {
817            input.iter().fold(seed, |hash, byte| {
818                (hash ^ u64::from(*byte)).wrapping_mul(FNV_PRIME)
819            })
820        }
821
822        let first = hash_with_seed(input, FNV_OFFSET);
823        let second = hash_with_seed(input, FNV_OFFSET ^ 0x9e3779b97f4a7c15);
824        let mut bytes = [0; 16];
825        bytes[..8].copy_from_slice(&first.to_be_bytes());
826        bytes[8..].copy_from_slice(&second.to_be_bytes());
827        bytes
828    }
829
830    pub fn main() -> Self {
831        Self {
832            version_mapping: vec![],
833        }
834    }
835
836    pub fn parse(identifier: &str) -> Result<Self> {
837        let parts: Vec<&str> = identifier.split(':').collect();
838        if !parts.len().is_multiple_of(2) {
839            return Err(Error::InvalidRef {
840                message: format!(
841                    "Invalid branch identifier: {}, format should be 'ver1:uuid1:ver2:uuid2:...:final_uuid'",
842                    parts.len()
843                ),
844            });
845        }
846
847        let version_mapping = parts
848            .chunks_exact(2)
849            .map(|chunk| {
850                let version = chunk[0].parse::<u64>().map_err(|e| Error::InvalidRef {
851                    message: format!("Invalid version number '{}': {}", chunk[0], e),
852                })?;
853                let uuid = chunk[1].to_string();
854                Ok((version, uuid))
855            })
856            .collect::<Result<Vec<_>>>()?;
857
858        Ok(Self { version_mapping })
859    }
860
861    pub fn find_referenced_version(&self, referenced_branch: &Self) -> Option<u64> {
862        let ref_mapping = &referenced_branch.version_mapping;
863        let next_idx = ref_mapping.len();
864
865        (self.version_mapping.len() > next_idx && self.version_mapping[..next_idx] == *ref_mapping)
866            .then(|| self.version_mapping[next_idx].0)
867            .filter(|&version| version > 0)
868    }
869
870    /// Collects all branches that reference this branch, returning (branch_name, version) tuples.
871    /// Results are in post-order traversal (deepest branches first).
872    pub fn collect_referenced_versions(
873        &self,
874        branches: &HashMap<String, BranchContents>,
875    ) -> Vec<(String, u64)> {
876        let mut branch_ids = branches
877            .iter()
878            .map(|(name, branch)| (branch.identifier.clone(), name.clone()))
879            .collect::<Vec<_>>();
880        // Sort by BranchIdentifier desc to implement post-order traversal.
881        branch_ids.sort_by(|a, b| b.cmp(a));
882        branch_ids
883            .into_iter()
884            .filter_map(|(branch_id, name)| {
885                branch_id
886                    .find_referenced_version(self)
887                    .map(|version| (name, version))
888            })
889            .collect()
890    }
891}
892
893pub fn base_tags_path(base_path: &Path) -> Path {
894    base_path.clone().join("_refs").join("tags")
895}
896
897pub fn base_branches_contents_path(base_path: &Path) -> Path {
898    base_path.clone().join("_refs").join("branches")
899}
900
901pub fn tag_path(base_path: &Path, branch: &str) -> Path {
902    base_tags_path(base_path).join(format!("{}.json", branch))
903}
904
905// Note: child will encode '/' to '%2F'
906pub fn branch_contents_path(base_path: &Path, branch: &str) -> Path {
907    base_branches_contents_path(base_path).join(format!("{}.json", branch))
908}
909
910pub(crate) fn normalize_branch(branch: Option<&str>) -> String {
911    match branch {
912        None => MAIN_BRANCH.to_string(),
913        Some(name) => name.to_string(),
914    }
915}
916
917pub(crate) fn standardize_branch(branch: &str) -> Option<String> {
918    match branch {
919        MAIN_BRANCH => None,
920        name => Some(name.to_string()),
921    }
922}
923
924async fn from_path<T>(path: &Path, object_store: &ObjectStore) -> Result<T>
925where
926    T: DeserializeOwned,
927{
928    let tag_reader = object_store.open(path).await?;
929    let tag_bytes = tag_reader
930        .get_range(Range {
931            start: 0,
932            end: tag_reader.size().await?,
933        })
934        .await?;
935    let json_str = String::from_utf8(tag_bytes.to_vec())
936        .map_err(|e| Error::corrupt_file(path.clone(), e.to_string()))?;
937    Ok(serde_json::from_str(&json_str)?)
938}
939
940impl TagContents {
941    pub async fn from_path(path: &Path, object_store: &ObjectStore) -> Result<Self> {
942        from_path(path, object_store).await
943    }
944}
945
946impl BranchContents {
947    pub async fn from_path(
948        path: &Path,
949        object_store: &ObjectStore,
950        branch_name: &str,
951    ) -> Result<Self> {
952        let mut contents: Self = from_path(path, object_store).await?;
953        if contents.identifier == BranchIdentifier::missing_identifier_sentinel() {
954            // Legacy branch files do not store an identifier. Derive a deterministic fallback
955            // from stable branch metadata so repeated reads expose the same public
956            // branch_identifier.
957            contents.identifier = BranchIdentifier::synthetic_identifier(
958                branch_name,
959                contents.parent_branch.as_deref(),
960                contents.parent_version,
961                contents.create_at,
962            );
963        }
964        Ok(contents)
965    }
966}
967
968pub fn check_valid_branch(branch_name: &str) -> Result<()> {
969    if branch_name.is_empty() {
970        return Err(Error::InvalidRef {
971            message: "Branch name cannot be empty".to_string(),
972        });
973    }
974
975    // Validate if the branch name starts or ends with a '/'
976    if branch_name.starts_with('/') || branch_name.ends_with('/') {
977        return Err(Error::InvalidRef {
978            message: "Branch name cannot start or end with a '/'".to_string(),
979        });
980    }
981
982    // Validate if there are any consecutive '/' in the branch name
983    if branch_name.contains("//") {
984        return Err(Error::InvalidRef {
985            message: "Branch name cannot contain consecutive '/'".to_string(),
986        });
987    }
988
989    // Validate if there are any dangerous characters in the branch name
990    if branch_name.contains("..") || branch_name.contains('\\') {
991        return Err(Error::InvalidRef {
992            message: "Branch name cannot contain '..' or '\\'".to_string(),
993        });
994    }
995
996    for segment in branch_name.split('/') {
997        if segment.is_empty() {
998            return Err(Error::InvalidRef {
999                message: "Branch name cannot have empty segments between '/'".to_string(),
1000            });
1001        }
1002        if !segment
1003            .chars()
1004            .all(|c| c.is_alphanumeric() || c == '.' || c == '-' || c == '_')
1005        {
1006            return Err(Error::InvalidRef {
1007                message: format!(
1008                    "Branch segment '{}' contains invalid characters. Only alphanumeric, '.', '-', '_' are allowed.",
1009                    segment
1010                ),
1011            });
1012        }
1013    }
1014
1015    if branch_name.ends_with(".lock") {
1016        return Err(Error::InvalidRef {
1017            message: "Branch name cannot end with '.lock'".to_string(),
1018        });
1019    }
1020
1021    if branch_name.eq("main") {
1022        return Err(Error::InvalidRef {
1023            message: "Branch name cannot be 'main'".to_string(),
1024        });
1025    }
1026    Ok(())
1027}
1028
1029pub fn check_valid_tag(s: &str) -> Result<()> {
1030    if s.is_empty() {
1031        return Err(Error::InvalidRef {
1032            message: "Ref cannot be empty".to_string(),
1033        });
1034    }
1035
1036    if !s
1037        .chars()
1038        .all(|c| c.is_alphanumeric() || c == '.' || c == '-' || c == '_')
1039    {
1040        return Err(Error::InvalidRef {
1041            message: "Ref characters must be either alphanumeric, '.', '-' or '_'".to_string(),
1042        });
1043    }
1044
1045    if s.starts_with('.') {
1046        return Err(Error::InvalidRef {
1047            message: "Ref cannot begin with a dot".to_string(),
1048        });
1049    }
1050
1051    if s.ends_with('.') {
1052        return Err(Error::InvalidRef {
1053            message: "Ref cannot end with a dot".to_string(),
1054        });
1055    }
1056
1057    if s.ends_with(".lock") {
1058        return Err(Error::InvalidRef {
1059            message: "Ref cannot end with .lock".to_string(),
1060        });
1061    }
1062
1063    if s.contains("..") {
1064        return Err(Error::InvalidRef {
1065            message: "Ref cannot have two consecutive dots".to_string(),
1066        });
1067    }
1068
1069    Ok(())
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074    use super::*;
1075    use datafusion::common::assert_contains;
1076
1077    use rstest::rstest;
1078
1079    #[rstest]
1080    fn test_ok_ref(
1081        #[values(
1082            "ref",
1083            "ref-with-dashes",
1084            "ref.extension",
1085            "ref_with_underscores",
1086            "v1.2.3-rc4"
1087        )]
1088        r: &str,
1089    ) {
1090        check_valid_tag(r).unwrap();
1091    }
1092
1093    #[rstest]
1094    fn test_err_ref(
1095        #[values(
1096            "",
1097            "../ref",
1098            ".ref",
1099            "/ref",
1100            "@",
1101            "deeply/nested/ref",
1102            "nested//ref",
1103            "nested/ref",
1104            "nested\\ref",
1105            "ref*",
1106            "ref.lock",
1107            "ref/",
1108            "ref?",
1109            "ref@{ref",
1110            "ref[",
1111            "ref^",
1112            "~/ref",
1113            "ref.",
1114            "ref..ref"
1115        )]
1116        r: &str,
1117    ) {
1118        assert_contains!(
1119            check_valid_tag(r).err().unwrap().to_string(),
1120            "Ref is invalid: Ref"
1121        );
1122    }
1123
1124    #[rstest]
1125    fn test_valid_branch_names(
1126        #[values(
1127            "feature/login",
1128            "bugfix/issue-123",
1129            "release/v1.2.3",
1130            "user/someone/my-feature",
1131            "normal",
1132            "with-dash",
1133            "with_underscore",
1134            "with.dot"
1135        )]
1136        branch_name: &str,
1137    ) {
1138        assert!(
1139            check_valid_branch(branch_name).is_ok(),
1140            "Branch name '{}' should be valid",
1141            branch_name
1142        );
1143    }
1144
1145    #[rstest]
1146    fn test_invalid_branch_names(
1147        #[values(
1148            "",
1149            "/start-with-slash",
1150            "end-with-slash/",
1151            "have//consecutive-slash",
1152            "have..dot-dot",
1153            "have\\backslash",
1154            "segment/",
1155            "/segment",
1156            "segment//empty",
1157            "name.lock",
1158            "bad@character",
1159            "bad segment"
1160        )]
1161        branch_name: &str,
1162    ) {
1163        assert!(
1164            check_valid_branch(branch_name).is_err(),
1165            "Branch name '{}' should be invalid",
1166            branch_name
1167        );
1168    }
1169
1170    #[test]
1171    fn test_path_functions() {
1172        let base_path = Path::from("dataset");
1173
1174        // Test base_tags_path
1175        let tags_path = base_tags_path(&base_path);
1176        assert_eq!(tags_path, Path::from("dataset/_refs/tags"));
1177
1178        // Test base_branches_path
1179        let branches_path = base_branches_contents_path(&base_path);
1180        assert_eq!(branches_path, Path::from("dataset/_refs/branches"));
1181
1182        // Test tag_path
1183        let tag_file_path = tag_path(&base_path, "v1.0.0");
1184        assert_eq!(tag_file_path, Path::from("dataset/_refs/tags/v1.0.0.json"));
1185
1186        // Test branch_path
1187        let branch_file_path = branch_contents_path(&base_path, "feature");
1188        assert_eq!(
1189            branch_file_path,
1190            Path::from("dataset/_refs/branches/feature.json")
1191        );
1192    }
1193
1194    #[tokio::test]
1195    async fn test_refs_from_traits() {
1196        // Test From<u64> for Ref
1197        let version_ref: Ref = 42u64.into();
1198        match version_ref {
1199            VersionNumber(version_number) => {
1200                assert_eq!(version_number, 42);
1201            }
1202            _ => panic!("Expected Version variant"),
1203        }
1204
1205        // Test From<&str> for Ref
1206        let tag_ref: Ref = "test_tag".into();
1207        match tag_ref {
1208            Tag(name) => assert_eq!(name, "test_tag"),
1209            _ => panic!("Expected Tag variant"),
1210        }
1211
1212        // Test From<(&str, u64)> for Ref
1213        let branch_ref: Ref = ("test_branch", 10u64).into();
1214        match branch_ref {
1215            Version(name, version) => {
1216                assert_eq!(name.unwrap(), "test_branch");
1217                assert_eq!(version, Some(10));
1218            }
1219            _ => panic!("Expected Branch variant"),
1220        }
1221    }
1222
1223    #[tokio::test]
1224    async fn test_branch_contents_serialization() {
1225        let branch_contents = BranchContents {
1226            parent_branch: Some("main".to_string()),
1227            identifier: BranchIdentifier::missing_identifier_sentinel(),
1228            parent_version: 42,
1229            create_at: 1234567890,
1230            manifest_size: 1024,
1231            metadata: HashMap::from([("description".to_string(), "production branch".to_string())]),
1232        };
1233
1234        // Test serialization
1235        let json = serde_json::to_string(&branch_contents).unwrap();
1236        assert!(json.contains("parentBranch"));
1237        assert!(json.contains("parentVersion"));
1238        assert!(json.contains("createAt"));
1239        assert!(json.contains("manifestSize"));
1240        assert!(json.contains("metadata"));
1241
1242        // Test deserialization
1243        let deserialized: BranchContents = serde_json::from_str(&json).unwrap();
1244        assert_eq!(deserialized.parent_branch, branch_contents.parent_branch);
1245        assert_eq!(deserialized.parent_version, branch_contents.parent_version);
1246        assert_eq!(deserialized.create_at, branch_contents.create_at);
1247        assert_eq!(deserialized.manifest_size, branch_contents.manifest_size);
1248        assert_eq!(deserialized.metadata, branch_contents.metadata);
1249
1250        // Backward compatibility: older serialized content does not include metadata.
1251        let legacy_json = r#"{"parentBranch":"main","parentVersion":42,"createAt":1234567890,"manifestSize":1024}"#;
1252        let legacy_deserialized: BranchContents = serde_json::from_str(legacy_json).unwrap();
1253        assert!(legacy_deserialized.metadata.is_empty());
1254    }
1255
1256    #[tokio::test]
1257    async fn test_branch_synthetic_uuid_is_stable() {
1258        let legacy_json = r#"{"parentBranch":"main","parentVersion":42,"createAt":1234567890,"manifestSize":1024}"#;
1259        let store = ObjectStore::memory();
1260        let base_path = Path::from("dataset");
1261        let first_path = branch_contents_path(&base_path, "legacy_branch");
1262        store
1263            .put(&first_path, legacy_json.as_bytes())
1264            .await
1265            .unwrap();
1266        let second_path = branch_contents_path(&base_path, "legacy_branch_other");
1267        store
1268            .put(&second_path, legacy_json.as_bytes())
1269            .await
1270            .unwrap();
1271
1272        let first = BranchContents::from_path(&first_path, &store, "legacy_branch")
1273            .await
1274            .unwrap();
1275        let second = BranchContents::from_path(&first_path, &store, "legacy_branch")
1276            .await
1277            .unwrap();
1278        assert_eq!(first.identifier, second.identifier);
1279        assert_ne!(
1280            first.identifier,
1281            BranchIdentifier::missing_identifier_sentinel()
1282        );
1283        assert_eq!(first.identifier.version_mapping[0].1.len(), 32);
1284        assert!(
1285            first.identifier.version_mapping[0]
1286                .1
1287                .chars()
1288                .all(|ch| ch.is_ascii_hexdigit() && !ch.is_ascii_uppercase())
1289        );
1290
1291        let other = BranchContents::from_path(&second_path, &store, "legacy_branch_other")
1292            .await
1293            .unwrap();
1294        assert_ne!(first.identifier, other.identifier);
1295    }
1296
1297    #[tokio::test]
1298    async fn test_tag_contents_serialization() {
1299        let tag_contents = TagContents {
1300            branch: Some("feature".to_string()),
1301            version: 10,
1302            created_at: Some(chrono::DateTime::from_timestamp(1_234_567_000, 456_000_000).unwrap()),
1303            updated_at: Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()),
1304            manifest_size: 2048,
1305            metadata: HashMap::from([("channel".to_string(), "release".to_string())]),
1306        };
1307
1308        // Test serialization
1309        let json = serde_json::to_string(&tag_contents).unwrap();
1310        assert!(json.contains("branch"));
1311        assert!(json.contains("version"));
1312        assert!(json.contains("createdAt"));
1313        assert!(json.contains("updatedAt"));
1314        assert!(json.contains("manifestSize"));
1315        assert!(json.contains("metadata"));
1316
1317        // Test deserialization
1318        let deserialized: TagContents = serde_json::from_str(&json).unwrap();
1319        assert_eq!(deserialized.branch, tag_contents.branch);
1320        assert_eq!(deserialized.version, tag_contents.version);
1321        assert_eq!(deserialized.created_at, tag_contents.created_at);
1322        assert_eq!(deserialized.updated_at, tag_contents.updated_at);
1323        assert_eq!(deserialized.manifest_size, tag_contents.manifest_size);
1324        assert_eq!(deserialized.metadata, tag_contents.metadata);
1325
1326        let tag_contents_without_created_at = TagContents {
1327            branch: Some("feature".to_string()),
1328            version: 10,
1329            created_at: None,
1330            updated_at: Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()),
1331            manifest_size: 2048,
1332            metadata: HashMap::new(),
1333        };
1334        let json_without_created_at =
1335            serde_json::to_string(&tag_contents_without_created_at).unwrap();
1336        assert!(!json_without_created_at.contains("createdAt"));
1337        assert!(json_without_created_at.contains("updatedAt"));
1338
1339        // Backward compatibility: older serialized content does not include timestamps or metadata.
1340        let legacy_json = r#"{"branch":"feature","version":10,"manifestSize":2048}"#;
1341        let legacy_deserialized: TagContents = serde_json::from_str(legacy_json).unwrap();
1342        assert_eq!(legacy_deserialized.created_at, None);
1343        assert_eq!(legacy_deserialized.updated_at, None);
1344        assert!(legacy_deserialized.metadata.is_empty());
1345
1346        let legacy_updated_only_json = r#"{"branch":"feature","version":10,"updatedAt":"2009-02-13T23:31:30.123Z","manifestSize":2048}"#;
1347        let legacy_updated_only_deserialized: TagContents =
1348            serde_json::from_str(legacy_updated_only_json).unwrap();
1349        assert_eq!(legacy_updated_only_deserialized.created_at, None);
1350        assert_eq!(
1351            legacy_updated_only_deserialized.updated_at,
1352            Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap())
1353        );
1354        assert!(legacy_updated_only_deserialized.metadata.is_empty());
1355    }
1356
1357    #[rstest]
1358    #[case("feature/auth", &["feature/auth/sub"], None)]
1359    #[case("feature", &["feature/sub1", "feature/sub2"], None)]
1360    #[case("a/b", &["a/b/c", "b/c/d"], None)]
1361    #[case("main", &[], Some("main"))]
1362    #[case("a", &["a"], None)]
1363    #[case("feature/auth", &["feature/login", "feature/signup"], Some("feature/auth"))]
1364    #[case("feature/sub", &["feature", "other"], Some("feature/sub"))]
1365    #[case("very/long/common/prefix/branch1", &["very/long/common/prefix/branch2"], Some("very/long/common/prefix/branch1"))]
1366    #[case("feature/auth/module", &["feature/other"], Some("feature/auth"))]
1367    #[case("feature/dev", &["bugfix", "hotfix"], Some("feature"))]
1368    #[case("branch1", &["dev/branch2", "feature/nathan/branch3", "branch4"], Some("branch1"))]
1369    fn test_get_cleanup_path(
1370        #[case] branch_to_delete: &str,
1371        #[case] remaining_branches: &[&str],
1372        #[case] expected_relative_cleanup_path: Option<&str>,
1373    ) {
1374        let dataset_root_dir = "file:///var/balabala/dataset1".to_string();
1375        let base_location = BranchLocation {
1376            path: Path::from(format!("{}/tree/random_branch", dataset_root_dir.as_str())),
1377            uri: format!("{}/tree/random_branch", dataset_root_dir.as_str()),
1378            branch: Some("random_branch".to_string()),
1379        };
1380
1381        let result =
1382            Branches::get_cleanup_path(branch_to_delete, remaining_branches, &base_location)
1383                .unwrap();
1384
1385        match expected_relative_cleanup_path {
1386            Some(expected_relative) => {
1387                assert!(
1388                    result.is_some(),
1389                    "Expected cleanup path but got None for branch: {}",
1390                    branch_to_delete
1391                );
1392                let expected_full_path = base_location
1393                    .find_branch(Some(expected_relative))
1394                    .unwrap()
1395                    .path;
1396                assert_eq!(result.unwrap().as_ref(), expected_full_path.as_ref());
1397            }
1398            None => {
1399                assert!(
1400                    result.is_none(),
1401                    "Expected no cleanup but got: {:?} for branch: {}",
1402                    result,
1403                    branch_to_delete
1404                );
1405            }
1406        }
1407    }
1408
1409    /// Build a reusable mocked BranchContents map mirroring cleanup::lineage_tests::build_lineage_datasets.
1410    ///
1411    /// Structure:
1412    ///    main:v1 ──▶ branch1:v1 ──▶ dev/branch2:v2 ──▶ feature/nathan/branch3:v3
1413    ///        │
1414    ///    (main:v2) ──▶ branch4:v2
1415    ///
1416    /// Notes:
1417    /// - The "main" root is virtual (no BranchContents entry).
1418    /// - Version numbers are representative and monotonically increasing along the chain.
1419    /// - Tests reuse this builder to ensure consistent lineage and deterministic assertions.
1420    fn build_mock_branch_contents() -> HashMap<String, BranchContents> {
1421        fn build(
1422            parent_name: Option<&str>,
1423            parent_branch: Option<&BranchContents>,
1424            parent_ver: u64,
1425        ) -> BranchContents {
1426            let parent_branch_id = if let Some(parent_branch) = parent_branch {
1427                parent_branch.identifier.clone()
1428            } else {
1429                BranchIdentifier::main()
1430            };
1431            BranchContents {
1432                parent_branch: parent_name.map(String::from),
1433                identifier: BranchIdentifier::new(&parent_branch_id, parent_ver),
1434                parent_version: parent_ver,
1435                create_at: 0,
1436                manifest_size: 1,
1437                metadata: HashMap::new(),
1438            }
1439        }
1440        let mut contents = HashMap::new();
1441        contents.insert("branch1".to_string(), build(None, None, 1));
1442        contents.insert(
1443            "dev/branch2".to_string(),
1444            build(Some("branch1"), contents.get("branch1"), 2),
1445        );
1446        contents.insert(
1447            "feature/nathan/branch3".to_string(),
1448            build(Some("dev/branch2"), contents.get("dev/branch2"), 3),
1449        );
1450        contents.insert("branch4".to_string(), build(None, None, 5));
1451        contents
1452    }
1453
1454    #[test]
1455    fn test_collect_children_for_branch3() {
1456        let all_branches = build_mock_branch_contents();
1457        let root_id = all_branches
1458            .get("feature/nathan/branch3")
1459            .unwrap()
1460            .identifier
1461            .clone();
1462        assert!(
1463            root_id
1464                .collect_referenced_versions(&all_branches)
1465                .is_empty()
1466        );
1467    }
1468
1469    #[test]
1470    fn test_collect_children_for_branch2() {
1471        let all_branches = build_mock_branch_contents();
1472        let root_id = all_branches.get("dev/branch2").unwrap().identifier.clone();
1473        let children = root_id.collect_referenced_versions(&all_branches);
1474
1475        assert_eq!(children.len(), 1);
1476        assert_eq!(children[0].0.as_str(), "feature/nathan/branch3");
1477        assert_eq!(children[0].1, 3);
1478    }
1479
1480    #[test]
1481    fn test_collect_children_for_branch1() {
1482        let all_branches = build_mock_branch_contents();
1483        let root_id = all_branches.get("branch1").unwrap().identifier.clone();
1484        let children = root_id.collect_referenced_versions(&all_branches);
1485
1486        assert_eq!(children.len(), 2);
1487        assert_eq!(children[0].0.as_str(), "feature/nathan/branch3");
1488        assert_eq!(children[1].0.as_str(), "dev/branch2");
1489        assert_eq!(children[0].1, 2);
1490        assert_eq!(children[1].1, 2);
1491    }
1492
1493    #[test]
1494    fn test_collect_children_for_main() {
1495        let all_branches = build_mock_branch_contents();
1496        let root_id = BranchIdentifier::main();
1497        let children = root_id.collect_referenced_versions(&all_branches);
1498
1499        assert_eq!(children.len(), 4);
1500        assert_eq!(children[0].0.as_str(), "branch4");
1501        assert_eq!(children[1].0.as_str(), "feature/nathan/branch3");
1502        assert_eq!(children[2].0.as_str(), "dev/branch2");
1503        assert_eq!(children[3].0.as_str(), "branch1");
1504        assert_eq!(children[0].1, 5);
1505        assert_eq!(children[1].1, 1);
1506        assert_eq!(children[2].1, 1);
1507        assert_eq!(children[3].1, 1);
1508    }
1509}