lance/dataset/
refs.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::Range;
5
6use futures::stream::{StreamExt, TryStreamExt};
7use itertools::Itertools;
8use lance_io::object_store::ObjectStore;
9use lance_table::io::commit::CommitHandler;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14use crate::dataset::branch_location::BranchLocation;
15use crate::dataset::refs::Ref::{Tag, Version};
16use crate::{Error, Result};
17use serde::de::DeserializeOwned;
18use snafu::location;
19use std::cmp::Ordering;
20use std::collections::HashMap;
21use std::fmt;
22use std::fmt::Formatter;
23use std::io::ErrorKind;
24
25/// Lance Ref
26#[derive(Debug, Clone)]
27pub enum Ref {
28    // This is a global version identifier present as (branch_name, version_number)
29    // if branch_name is None, it points to the main branch
30    // if version_number is None, it points to the latest version
31    Version(Option<String>, Option<u64>),
32    // Tag name points to the global version identifier, could be considered as an alias of specific global version
33    Tag(String),
34}
35
36impl From<u64> for Ref {
37    fn from(ref_: u64) -> Self {
38        Version(None, Some(ref_))
39    }
40}
41
42impl From<&str> for Ref {
43    fn from(ref_: &str) -> Self {
44        Tag(ref_.to_string())
45    }
46}
47
48impl From<(&str, u64)> for Ref {
49    fn from(_ref: (&str, u64)) -> Self {
50        Version(Some(_ref.0.to_string()), Some(_ref.1))
51    }
52}
53
54impl From<(Option<String>, Option<u64>)> for Ref {
55    fn from(_ref: (Option<String>, Option<u64>)) -> Self {
56        Version(_ref.0, _ref.1)
57    }
58}
59
60impl From<(&str, Option<u64>)> for Ref {
61    fn from(_ref: (&str, Option<u64>)) -> Self {
62        Version(Some(_ref.0.to_string()), _ref.1)
63    }
64}
65
66impl fmt::Display for Ref {
67    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
68        match self {
69            Version(branch, version_number) => {
70                let branch_name = branch.as_deref().unwrap_or("main");
71                let version_str = version_number
72                    .map(|v| v.to_string())
73                    .unwrap_or_else(|| "latest".to_string());
74                write!(f, "{}:{}", branch_name, version_str)
75            }
76            Tag(tag_name) => write!(f, "{}", tag_name),
77        }
78    }
79}
80
81#[derive(Debug, Clone)]
82pub struct Refs {
83    pub(crate) object_store: Arc<ObjectStore>,
84    pub(crate) commit_handler: Arc<dyn CommitHandler>,
85    pub(crate) base_location: BranchLocation,
86}
87
88impl Refs {
89    pub fn new(
90        object_store: Arc<ObjectStore>,
91        commit_handler: Arc<dyn CommitHandler>,
92        base_location: BranchLocation,
93    ) -> Self {
94        Self {
95            object_store,
96            commit_handler,
97            base_location,
98        }
99    }
100
101    pub fn tags(&self) -> Tags<'_> {
102        Tags { refs: self }
103    }
104
105    pub fn branches(&self) -> Branches<'_> {
106        Branches { refs: self }
107    }
108
109    pub fn base(&self) -> &Path {
110        &self.base_location.path
111    }
112
113    pub fn root(&self) -> Result<BranchLocation> {
114        self.base_location.find_main()
115    }
116}
117
118/// Tags operation
119#[derive(Debug, Clone)]
120pub struct Tags<'a> {
121    refs: &'a Refs,
122}
123
124/// Branches operation
125#[derive(Debug, Clone)]
126pub struct Branches<'a> {
127    refs: &'a Refs,
128}
129
130impl Tags<'_> {
131    fn object_store(&self) -> &ObjectStore {
132        &self.refs.object_store
133    }
134}
135
136impl Branches<'_> {
137    fn object_store(&self) -> &ObjectStore {
138        &self.refs.object_store
139    }
140}
141
142impl Tags<'_> {
143    pub async fn fetch_tags(&self) -> Result<Vec<(String, TagContents)>> {
144        let root_location = self.refs.root()?;
145        let base_path = base_tags_path(&root_location.path);
146        let tag_files = self.object_store().read_dir(base_path).await?;
147
148        let tag_names: Vec<String> = tag_files
149            .iter()
150            .filter_map(|name| name.strip_suffix(".json"))
151            .map(|name| name.to_string())
152            .collect_vec();
153
154        let root_path = &root_location.path;
155        futures::stream::iter(tag_names)
156            .map(|tag_name| async move {
157                let contents =
158                    TagContents::from_path(&tag_path(root_path, &tag_name), self.object_store())
159                        .await?;
160                Ok((tag_name, contents))
161            })
162            .buffer_unordered(10)
163            .try_collect()
164            .await
165    }
166
167    pub async fn list(&self) -> Result<HashMap<String, TagContents>> {
168        self.fetch_tags()
169            .await
170            .map(|tags| tags.into_iter().collect())
171    }
172
173    pub async fn list_tags_ordered(
174        &self,
175        order: Option<Ordering>,
176    ) -> Result<Vec<(String, TagContents)>> {
177        let mut tags = self.fetch_tags().await?;
178        tags.sort_by(|a, b| {
179            let desired_ordering = order.unwrap_or(Ordering::Greater);
180            let version_ordering = a.1.version.cmp(&b.1.version);
181            let version_result = match desired_ordering {
182                Ordering::Less => version_ordering,
183                _ => version_ordering.reverse(),
184            };
185            version_result.then_with(|| a.0.cmp(&b.0))
186        });
187        Ok(tags)
188    }
189
190    pub async fn get_version(&self, tag: &str) -> Result<u64> {
191        self.get(tag).await.map(|tag| tag.version)
192    }
193
194    pub async fn get(&self, tag: &str) -> Result<TagContents> {
195        check_valid_tag(tag)?;
196
197        let root_location = self.refs.root()?;
198        let tag_file = tag_path(&root_location.path, tag);
199
200        if !self.object_store().exists(&tag_file).await? {
201            return Err(Error::RefNotFound {
202                message: format!("tag {} does not exist", tag),
203            });
204        }
205
206        let tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?;
207
208        Ok(tag_contents)
209    }
210
211    pub async fn create(&self, tag: &str, version: u64) -> Result<()> {
212        self.create_on_branch(tag, version, None).await
213    }
214
215    pub async fn create_on_branch(
216        &self,
217        tag: &str,
218        version_number: u64,
219        branch: Option<&str>,
220    ) -> Result<()> {
221        check_valid_tag(tag)?;
222
223        let root_location = self.refs.root()?;
224        let branch = branch.map(String::from);
225        let tag_file = tag_path(&root_location.path, tag);
226
227        if self.object_store().exists(&tag_file).await? {
228            return Err(Error::RefConflict {
229                message: format!("tag {} already exists", tag),
230            });
231        }
232
233        let branch_location = self.refs.base_location.find_branch(branch.clone())?;
234        let manifest_file = self
235            .refs
236            .commit_handler
237            .resolve_version_location(
238                &branch_location.path,
239                version_number,
240                &self.refs.object_store.inner,
241            )
242            .await?;
243
244        if !self.object_store().exists(&manifest_file.path).await? {
245            return Err(Error::VersionNotFound {
246                message: format!(
247                    "version {}::{} does not exist",
248                    branch.unwrap_or("Main".to_string()),
249                    version_number
250                ),
251            });
252        }
253
254        let manifest_size = if let Some(size) = manifest_file.size {
255            size as usize
256        } else {
257            self.object_store().size(&manifest_file.path).await? as usize
258        };
259
260        let tag_contents = TagContents {
261            branch,
262            version: version_number,
263            manifest_size,
264        };
265
266        self.object_store()
267            .put(
268                &tag_file,
269                serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
270            )
271            .await
272            .map(|_| ())
273    }
274
275    pub async fn delete(&self, tag: &str) -> Result<()> {
276        check_valid_tag(tag)?;
277
278        let root_location = self.refs.root()?;
279        let tag_file = tag_path(&root_location.path, tag);
280
281        if !self.object_store().exists(&tag_file).await? {
282            return Err(Error::RefNotFound {
283                message: format!("tag {} does not exist", tag),
284            });
285        }
286
287        self.object_store().delete(&tag_file).await
288    }
289
290    pub async fn update(&self, tag: &str, version: u64) -> Result<()> {
291        self.update_on_branch(tag, version, None).await
292    }
293
294    /// Update a tag to a branch::version
295    pub async fn update_on_branch(
296        &self,
297        tag: &str,
298        version_number: u64,
299        branch: Option<&str>,
300    ) -> Result<()> {
301        check_valid_tag(tag)?;
302
303        let branch = branch.map(String::from);
304        let root_location = self.refs.root()?;
305        let tag_file = tag_path(&root_location.path, tag);
306
307        if !self.object_store().exists(&tag_file).await? {
308            return Err(Error::RefNotFound {
309                message: format!("tag {} does not exist", tag),
310            });
311        }
312
313        let target_branch_location = self.refs.base_location.find_branch(branch.clone())?;
314        let manifest_file = self
315            .refs
316            .commit_handler
317            .resolve_version_location(
318                &target_branch_location.path,
319                version_number,
320                &self.refs.object_store.inner,
321            )
322            .await?;
323
324        if !self.object_store().exists(&manifest_file.path).await? {
325            return Err(Error::VersionNotFound {
326                message: format!("version {} does not exist", version_number),
327            });
328        }
329
330        let manifest_size = if let Some(size) = manifest_file.size {
331            size as usize
332        } else {
333            self.object_store().size(&manifest_file.path).await? as usize
334        };
335
336        let tag_contents = TagContents {
337            branch,
338            version: version_number,
339            manifest_size,
340        };
341
342        self.object_store()
343            .put(
344                &tag_file,
345                serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
346            )
347            .await
348            .map(|_| ())
349    }
350}
351
352impl Branches<'_> {
353    pub async fn fetch(&self) -> Result<Vec<(String, BranchContents)>> {
354        let root_location = self.refs.root()?;
355        let base_path = base_branches_contents_path(&root_location.path);
356        let branch_files = self.object_store().read_dir(base_path).await?;
357
358        let branch_names: Vec<String> = branch_files
359            .iter()
360            .filter_map(|name| name.strip_suffix(".json"))
361            .map(|str| {
362                Path::from_url_path(str)
363                    .map_err(|e| Error::InvalidRef {
364                        message: format!(
365                            "Failed to decode branch name: {} due to exception {}",
366                            str, e
367                        ),
368                    })
369                    .map(|path| path.to_string())
370            })
371            .collect::<Result<Vec<_>>>()?;
372
373        let branch_path = &root_location.path;
374        futures::stream::iter(branch_names)
375            .map(|name| async move {
376                let contents = BranchContents::from_path(
377                    &branch_contents_path(branch_path, &name),
378                    self.object_store(),
379                )
380                .await?;
381                Ok((name, contents))
382            })
383            .buffer_unordered(10)
384            .try_collect()
385            .await
386    }
387
388    pub async fn list(&self) -> Result<HashMap<String, BranchContents>> {
389        self.fetch()
390            .await
391            .map(|branches| branches.into_iter().collect())
392    }
393
394    pub async fn get(&self, branch: &str) -> Result<BranchContents> {
395        check_valid_branch(branch)?;
396
397        let root_location = self.refs.root()?;
398        let branch_file = branch_contents_path(&root_location.path, branch);
399
400        if !self.object_store().exists(&branch_file).await? {
401            return Err(Error::RefNotFound {
402                message: format!("branch {} does not exist", branch),
403            });
404        }
405
406        let branch_contents = BranchContents::from_path(&branch_file, self.object_store()).await?;
407
408        Ok(branch_contents)
409    }
410
411    pub async fn create(
412        &self,
413        branch_name: &str,
414        version_number: u64,
415        source_branch: Option<&str>,
416    ) -> Result<()> {
417        check_valid_branch(branch_name)?;
418
419        let source_branch = source_branch.map(String::from);
420        let root_location = self.refs.root()?;
421        let branch_file = branch_contents_path(&root_location.path, branch_name);
422        if self.object_store().exists(&branch_file).await? {
423            return Err(Error::RefConflict {
424                message: format!("branch {} already exists", branch_name),
425            });
426        }
427
428        let branch_location = self.refs.base_location.find_branch(source_branch.clone())?;
429        // Verify the source version exists
430        let manifest_file = self
431            .refs
432            .commit_handler
433            .resolve_version_location(
434                &branch_location.path,
435                version_number,
436                &self.refs.object_store.inner,
437            )
438            .await?;
439
440        if !self.object_store().exists(&manifest_file.path).await? {
441            return Err(Error::VersionNotFound {
442                message: format!("Manifest file {} does not exist", &manifest_file.path),
443            });
444        };
445
446        let branch_contents = BranchContents {
447            parent_branch: source_branch,
448            parent_version: version_number,
449            create_at: chrono::Utc::now().timestamp() as u64,
450            manifest_size: if let Some(size) = manifest_file.size {
451                size as usize
452            } else {
453                self.object_store().size(&manifest_file.path).await? as usize
454            },
455        };
456
457        self.object_store()
458            .put(
459                &branch_file,
460                serde_json::to_string_pretty(&branch_contents)?.as_bytes(),
461            )
462            .await
463            .map(|_| ())
464    }
465
466    /// Delete a branch
467    ///
468    /// If the `BranchContents` does not exist, it will return an error directly unless `force` is true.
469    /// If `force` is true, it will try to delete the branch directories no matter `BranchContents` exists or not.
470    pub async fn delete(&self, branch: &str, force: bool) -> Result<()> {
471        check_valid_branch(branch)?;
472
473        let root_location = self.refs.root()?;
474        let branch_file = branch_contents_path(&root_location.path, branch);
475        if self.object_store().exists(&branch_file).await? {
476            self.object_store().delete(&branch_file).await?;
477        } else if force {
478            log::warn!("BranchContents of {} does not exist", branch);
479        } else {
480            return Err(Error::RefNotFound {
481                message: format!("Branch {} does not exist", branch),
482            });
483        }
484
485        // Clean up branch directories
486        self.cleanup_branch_directories(branch).await
487    }
488
489    pub async fn list_ordered(
490        &self,
491        order: Option<Ordering>,
492    ) -> Result<Vec<(String, BranchContents)>> {
493        let mut branches = self.fetch().await?;
494        branches.sort_by(|a, b| {
495            let desired_ordering = order.unwrap_or(Ordering::Greater);
496            let version_ordering = a.1.parent_version.cmp(&b.1.parent_version);
497            let version_result = match desired_ordering {
498                Ordering::Less => version_ordering,
499                _ => version_ordering.reverse(),
500            };
501            version_result.then_with(|| a.0.cmp(&b.0))
502        });
503        Ok(branches)
504    }
505
506    /// Clean up empty parent directories
507    async fn cleanup_branch_directories(&self, branch: &str) -> Result<()> {
508        let branches = self.list().await?;
509        let remaining_branches: Vec<&str> = branches.keys().map(|k| k.as_str()).collect();
510
511        if let Some(delete_path) =
512            Self::get_cleanup_path(branch, &remaining_branches, &self.refs.base_location)?
513        {
514            if let Err(e) = self.refs.object_store.remove_dir_all(delete_path).await {
515                match &e {
516                    Error::IO { source, .. } => {
517                        if let Some(io_err) = source.downcast_ref::<std::io::Error>() {
518                            if io_err.kind() == ErrorKind::NotFound {
519                                log::debug!("Branch directory already deleted: {}", io_err);
520                            } else {
521                                return Err(e);
522                            }
523                        } else {
524                            return Err(e);
525                        }
526                    }
527                    _ => return Err(e),
528                }
529            }
530        }
531        Ok(())
532    }
533
534    fn get_cleanup_path(
535        branch: &str,
536        remaining_branches: &[&str],
537        base_location: &BranchLocation,
538    ) -> Result<Option<Path>> {
539        let mut longest_used_length = 0;
540        for &candidate in remaining_branches {
541            let common_len = branch
542                .chars()
543                .zip(candidate.chars())
544                .take_while(|(a, b)| a == b)
545                .count();
546
547            if common_len > longest_used_length {
548                longest_used_length = common_len;
549            }
550        }
551        // Means this branch path is used as a prefix of other branches
552        if longest_used_length == branch.len() {
553            return Ok(None);
554        }
555
556        let mut used_relative_path = &branch[..longest_used_length];
557        if let Some(last_slash_index) = used_relative_path.rfind('/') {
558            used_relative_path = &used_relative_path[..last_slash_index];
559        }
560        let unused_dir = &branch[used_relative_path.len()..].trim_start_matches('/');
561        if let Some(sub_dir) = unused_dir.split('/').next() {
562            let relative_dir = format!("{}/{}", used_relative_path, sub_dir);
563            // Use base_location to generate the cleanup path
564            let absolute_dir = base_location.find_branch(Some(relative_dir))?;
565            Ok(Some(absolute_dir.path))
566        } else {
567            Ok(None)
568        }
569    }
570}
571
572#[derive(Debug, Clone, Serialize, Deserialize)]
573#[serde(rename_all = "camelCase")]
574pub struct TagContents {
575    pub branch: Option<String>,
576    pub version: u64,
577    pub manifest_size: usize,
578}
579
580#[derive(Debug, Clone, Serialize, Deserialize)]
581#[serde(rename_all = "camelCase")]
582pub struct BranchContents {
583    pub parent_branch: Option<String>,
584    pub parent_version: u64,
585    pub create_at: u64, // unix timestamp
586    pub manifest_size: usize,
587}
588
589pub fn base_tags_path(base_path: &Path) -> Path {
590    base_path.child("_refs").child("tags")
591}
592
593pub fn base_branches_contents_path(base_path: &Path) -> Path {
594    base_path.child("_refs").child("branches")
595}
596
597pub fn tag_path(base_path: &Path, branch: &str) -> Path {
598    base_tags_path(base_path).child(format!("{}.json", branch))
599}
600
601// Note: child will encode '/' to '%2F'
602pub fn branch_contents_path(base_path: &Path, branch: &str) -> Path {
603    base_branches_contents_path(base_path).child(format!("{}.json", branch))
604}
605
606async fn from_path<T>(path: &Path, object_store: &ObjectStore) -> Result<T>
607where
608    T: DeserializeOwned,
609{
610    let tag_reader = object_store.open(path).await?;
611    let tag_bytes = tag_reader
612        .get_range(Range {
613            start: 0,
614            end: tag_reader.size().await?,
615        })
616        .await?;
617    let json_str = String::from_utf8(tag_bytes.to_vec())
618        .map_err(|e| Error::corrupt_file(path.clone(), e.to_string(), location!()))?;
619    Ok(serde_json::from_str(&json_str)?)
620}
621
622impl TagContents {
623    pub async fn from_path(path: &Path, object_store: &ObjectStore) -> Result<Self> {
624        from_path(path, object_store).await
625    }
626}
627
628impl BranchContents {
629    pub async fn from_path(path: &Path, object_store: &ObjectStore) -> Result<Self> {
630        from_path(path, object_store).await
631    }
632}
633
634pub fn check_valid_branch(branch_name: &str) -> Result<()> {
635    if branch_name.is_empty() {
636        return Err(Error::InvalidRef {
637            message: "Branch name cannot be empty".to_string(),
638        });
639    }
640
641    // Validate if the branch name starts or ends with a '/'
642    if branch_name.starts_with('/') || branch_name.ends_with('/') {
643        return Err(Error::InvalidRef {
644            message: "Branch name cannot start or end with a '/'".to_string(),
645        });
646    }
647
648    // Validate if there are any consecutive '/' in the branch name
649    if branch_name.contains("//") {
650        return Err(Error::InvalidRef {
651            message: "Branch name cannot contain consecutive '/'".to_string(),
652        });
653    }
654
655    // Validate if there are any dangerous characters in the branch name
656    if branch_name.contains("..") || branch_name.contains('\\') {
657        return Err(Error::InvalidRef {
658            message: "Branch name cannot contain '..' or '\\'".to_string(),
659        });
660    }
661
662    for segment in branch_name.split('/') {
663        if segment.is_empty() {
664            return Err(Error::InvalidRef {
665                message: "Branch name cannot have empty segments between '/'".to_string(),
666            });
667        }
668        if !segment
669            .chars()
670            .all(|c| c.is_alphanumeric() || c == '.' || c == '-' || c == '_')
671        {
672            return Err(Error::InvalidRef {
673                message: format!("Branch segment '{}' contains invalid characters. Only alphanumeric, '.', '-', '_' are allowed.", segment),
674            });
675        }
676    }
677
678    if branch_name.ends_with(".lock") {
679        return Err(Error::InvalidRef {
680            message: "Branch name cannot end with '.lock'".to_string(),
681        });
682    }
683
684    if branch_name.eq("main") {
685        return Err(Error::InvalidRef {
686            message: "Branch name cannot be 'main'".to_string(),
687        });
688    }
689    Ok(())
690}
691
692pub fn check_valid_tag(s: &str) -> Result<()> {
693    if s.is_empty() {
694        return Err(Error::InvalidRef {
695            message: "Ref cannot be empty".to_string(),
696        });
697    }
698
699    if !s
700        .chars()
701        .all(|c| c.is_alphanumeric() || c == '.' || c == '-' || c == '_')
702    {
703        return Err(Error::InvalidRef {
704            message: "Ref characters must be either alphanumeric, '.', '-' or '_'".to_string(),
705        });
706    }
707
708    if s.starts_with('.') {
709        return Err(Error::InvalidRef {
710            message: "Ref cannot begin with a dot".to_string(),
711        });
712    }
713
714    if s.ends_with('.') {
715        return Err(Error::InvalidRef {
716            message: "Ref cannot end with a dot".to_string(),
717        });
718    }
719
720    if s.ends_with(".lock") {
721        return Err(Error::InvalidRef {
722            message: "Ref cannot end with .lock".to_string(),
723        });
724    }
725
726    if s.contains("..") {
727        return Err(Error::InvalidRef {
728            message: "Ref cannot have two consecutive dots".to_string(),
729        });
730    }
731
732    Ok(())
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738    use datafusion::common::assert_contains;
739
740    use rstest::rstest;
741
742    #[rstest]
743    fn test_ok_ref(
744        #[values(
745            "ref",
746            "ref-with-dashes",
747            "ref.extension",
748            "ref_with_underscores",
749            "v1.2.3-rc4"
750        )]
751        r: &str,
752    ) {
753        check_valid_tag(r).unwrap();
754    }
755
756    #[rstest]
757    fn test_err_ref(
758        #[values(
759            "",
760            "../ref",
761            ".ref",
762            "/ref",
763            "@",
764            "deeply/nested/ref",
765            "nested//ref",
766            "nested/ref",
767            "nested\\ref",
768            "ref*",
769            "ref.lock",
770            "ref/",
771            "ref?",
772            "ref@{ref",
773            "ref[",
774            "ref^",
775            "~/ref",
776            "ref.",
777            "ref..ref"
778        )]
779        r: &str,
780    ) {
781        assert_contains!(
782            check_valid_tag(r).err().unwrap().to_string(),
783            "Ref is invalid: Ref"
784        );
785    }
786
787    #[rstest]
788    fn test_valid_branch_names(
789        #[values(
790            "feature/login",
791            "bugfix/issue-123",
792            "release/v1.2.3",
793            "user/someone/my-feature",
794            "normal",
795            "with-dash",
796            "with_underscore",
797            "with.dot"
798        )]
799        branch_name: &str,
800    ) {
801        assert!(
802            check_valid_branch(branch_name).is_ok(),
803            "Branch name '{}' should be valid",
804            branch_name
805        );
806    }
807
808    #[rstest]
809    fn test_invalid_branch_names(
810        #[values(
811            "",
812            "/start-with-slash",
813            "end-with-slash/",
814            "have//consecutive-slash",
815            "have..dot-dot",
816            "have\\backslash",
817            "segment/",
818            "/segment",
819            "segment//empty",
820            "name.lock",
821            "bad@character",
822            "bad segment"
823        )]
824        branch_name: &str,
825    ) {
826        assert!(
827            check_valid_branch(branch_name).is_err(),
828            "Branch name '{}' should be invalid",
829            branch_name
830        );
831    }
832
833    #[test]
834    fn test_path_functions() {
835        let base_path = Path::from("dataset");
836
837        // Test base_tags_path
838        let tags_path = base_tags_path(&base_path);
839        assert_eq!(tags_path, Path::from("dataset/_refs/tags"));
840
841        // Test base_branches_path
842        let branches_path = base_branches_contents_path(&base_path);
843        assert_eq!(branches_path, Path::from("dataset/_refs/branches"));
844
845        // Test tag_path
846        let tag_file_path = tag_path(&base_path, "v1.0.0");
847        assert_eq!(tag_file_path, Path::from("dataset/_refs/tags/v1.0.0.json"));
848
849        // Test branch_path
850        let branch_file_path = branch_contents_path(&base_path, "feature");
851        assert_eq!(
852            branch_file_path,
853            Path::from("dataset/_refs/branches/feature.json")
854        );
855    }
856
857    #[tokio::test]
858    async fn test_refs_from_traits() {
859        // Test From<u64> for Ref
860        let version_ref: Ref = 42u64.into();
861        match version_ref {
862            Version(branch, v) => {
863                assert_eq!(v, Some(42));
864                assert_eq!(branch, None)
865            }
866            _ => panic!("Expected Version variant"),
867        }
868
869        // Test From<&str> for Ref
870        let tag_ref: Ref = "test_tag".into();
871        match tag_ref {
872            Tag(name) => assert_eq!(name, "test_tag"),
873            _ => panic!("Expected Tag variant"),
874        }
875
876        // Test From<(&str, u64)> for Ref
877        let branch_ref: Ref = ("test_branch", 10u64).into();
878        match branch_ref {
879            Version(name, version) => {
880                assert_eq!(name.unwrap(), "test_branch");
881                assert_eq!(version, Some(10));
882            }
883            _ => panic!("Expected Branch variant"),
884        }
885    }
886
887    #[tokio::test]
888    async fn test_branch_contents_serialization() {
889        let branch_contents = BranchContents {
890            parent_branch: Some("main".to_string()),
891            parent_version: 42,
892            create_at: 1234567890,
893            manifest_size: 1024,
894        };
895
896        // Test serialization
897        let json = serde_json::to_string(&branch_contents).unwrap();
898        assert!(json.contains("parentBranch"));
899        assert!(json.contains("parentVersion"));
900        assert!(json.contains("createAt"));
901        assert!(json.contains("manifestSize"));
902
903        // Test deserialization
904        let deserialized: BranchContents = serde_json::from_str(&json).unwrap();
905        assert_eq!(deserialized.parent_branch, branch_contents.parent_branch);
906        assert_eq!(deserialized.parent_version, branch_contents.parent_version);
907        assert_eq!(deserialized.create_at, branch_contents.create_at);
908        assert_eq!(deserialized.manifest_size, branch_contents.manifest_size);
909    }
910
911    #[tokio::test]
912    async fn test_tag_contents_serialization() {
913        let tag_contents = TagContents {
914            branch: Some("feature".to_string()),
915            version: 10,
916            manifest_size: 2048,
917        };
918
919        // Test serialization
920        let json = serde_json::to_string(&tag_contents).unwrap();
921        assert!(json.contains("branch"));
922        assert!(json.contains("version"));
923        assert!(json.contains("manifestSize"));
924
925        // Test deserialization
926        let deserialized: TagContents = serde_json::from_str(&json).unwrap();
927        assert_eq!(deserialized.branch, tag_contents.branch);
928        assert_eq!(deserialized.version, tag_contents.version);
929        assert_eq!(deserialized.manifest_size, tag_contents.manifest_size);
930    }
931
932    #[rstest]
933    #[case("feature/auth", &["feature/login", "feature/signup"], Some("feature/auth"))]
934    #[case("feature/auth/module", &["feature/other"], Some("feature/auth"))]
935    #[case("a/b/c", &["a/b/d", "a/e"], Some("a/b/c"))]
936    #[case("feature/auth", &["feature/auth/sub"], None)]
937    #[case("feature", &["feature/sub1", "feature/sub2"], None)]
938    #[case("a/b", &["a/b/c", "a/b/d"], None)]
939    #[case("main", &[], Some("main"))]
940    #[case("a", &["a"], None)]
941    #[case("single", &["other"], Some("single"))]
942    #[case("feature/auth/login/oauth", &["feature/auth/login/basic", "feature/auth/signup"], Some("feature/auth/login/oauth"))]
943    #[case("feature/user-auth", &["feature/user-signup"], Some("feature/user-auth"))]
944    #[case("release/2024.01", &["release/2024.02"], Some("release/2024.01"))]
945    #[case("very/long/common/prefix/branch1", &["very/long/common/prefix/branch2"], Some("very/long/common/prefix/branch1"))]
946    #[case("feature", &["bugfix", "hotfix"], Some("feature"))]
947    #[case("feature/sub", &["feature", "other"], Some("feature/sub"))]
948    fn test_get_cleanup_path(
949        #[case] branch_to_delete: &str,
950        #[case] remaining_branches: &[&str],
951        #[case] expected_relative_cleanup_path: Option<&str>,
952    ) {
953        let dataset_root_dir = "file:///var/balabala/dataset1".to_string();
954        let base_location = BranchLocation {
955            path: Path::from(format!("{}/tree/random_branch", dataset_root_dir.as_str())),
956            uri: format!("{}/tree/random_branch", dataset_root_dir.as_str()),
957            branch: Some("random_branch".to_string()),
958        };
959
960        let result =
961            Branches::get_cleanup_path(branch_to_delete, remaining_branches, &base_location)
962                .unwrap();
963
964        match expected_relative_cleanup_path {
965            Some(expected_relative) => {
966                assert!(
967                    result.is_some(),
968                    "Expected cleanup path but got None for branch: {}",
969                    branch_to_delete
970                );
971                let expected_full_path = base_location
972                    .find_branch(Some(expected_relative.to_string()))
973                    .unwrap()
974                    .path;
975                assert_eq!(result.unwrap().as_ref(), expected_full_path.as_ref());
976            }
977            None => {
978                assert!(
979                    result.is_none(),
980                    "Expected no cleanup but got: {:?} for branch: {}",
981                    result,
982                    branch_to_delete
983                );
984            }
985        }
986    }
987}