1use std::ops::Range;
5
6use futures::stream::{StreamExt, TryStreamExt};
7use itertools::Itertools;
8use lance_io::object_store::ObjectStore;
9use lance_table::io::commit::CommitHandler;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14use crate::dataset::branch_location::BranchLocation;
15use crate::dataset::refs::Ref::{Tag, Version};
16use crate::{Error, Result};
17use serde::de::DeserializeOwned;
18use snafu::location;
19use std::cmp::Ordering;
20use std::collections::HashMap;
21use std::fmt;
22use std::fmt::Formatter;
23use std::io::ErrorKind;
24
25#[derive(Debug, Clone)]
27pub enum Ref {
28 Version(Option<String>, Option<u64>),
32 Tag(String),
34}
35
36impl From<u64> for Ref {
37 fn from(ref_: u64) -> Self {
38 Version(None, Some(ref_))
39 }
40}
41
42impl From<&str> for Ref {
43 fn from(ref_: &str) -> Self {
44 Tag(ref_.to_string())
45 }
46}
47
48impl From<(&str, u64)> for Ref {
49 fn from(_ref: (&str, u64)) -> Self {
50 Version(Some(_ref.0.to_string()), Some(_ref.1))
51 }
52}
53
54impl From<(Option<String>, Option<u64>)> for Ref {
55 fn from(_ref: (Option<String>, Option<u64>)) -> Self {
56 Version(_ref.0, _ref.1)
57 }
58}
59
60impl From<(&str, Option<u64>)> for Ref {
61 fn from(_ref: (&str, Option<u64>)) -> Self {
62 Version(Some(_ref.0.to_string()), _ref.1)
63 }
64}
65
66impl fmt::Display for Ref {
67 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
68 match self {
69 Version(branch, version_number) => {
70 let branch_name = branch.as_deref().unwrap_or("main");
71 let version_str = version_number
72 .map(|v| v.to_string())
73 .unwrap_or_else(|| "latest".to_string());
74 write!(f, "{}:{}", branch_name, version_str)
75 }
76 Tag(tag_name) => write!(f, "{}", tag_name),
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
82pub struct Refs {
83 pub(crate) object_store: Arc<ObjectStore>,
84 pub(crate) commit_handler: Arc<dyn CommitHandler>,
85 pub(crate) base_location: BranchLocation,
86}
87
88impl Refs {
89 pub fn new(
90 object_store: Arc<ObjectStore>,
91 commit_handler: Arc<dyn CommitHandler>,
92 base_location: BranchLocation,
93 ) -> Self {
94 Self {
95 object_store,
96 commit_handler,
97 base_location,
98 }
99 }
100
101 pub fn tags(&self) -> Tags<'_> {
102 Tags { refs: self }
103 }
104
105 pub fn branches(&self) -> Branches<'_> {
106 Branches { refs: self }
107 }
108
109 pub fn base(&self) -> &Path {
110 &self.base_location.path
111 }
112
113 pub fn root(&self) -> Result<BranchLocation> {
114 self.base_location.find_main()
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct Tags<'a> {
121 refs: &'a Refs,
122}
123
124#[derive(Debug, Clone)]
126pub struct Branches<'a> {
127 refs: &'a Refs,
128}
129
130impl Tags<'_> {
131 fn object_store(&self) -> &ObjectStore {
132 &self.refs.object_store
133 }
134}
135
136impl Branches<'_> {
137 fn object_store(&self) -> &ObjectStore {
138 &self.refs.object_store
139 }
140}
141
142impl Tags<'_> {
143 pub async fn fetch_tags(&self) -> Result<Vec<(String, TagContents)>> {
144 let root_location = self.refs.root()?;
145 let base_path = base_tags_path(&root_location.path);
146 let tag_files = self.object_store().read_dir(base_path).await?;
147
148 let tag_names: Vec<String> = tag_files
149 .iter()
150 .filter_map(|name| name.strip_suffix(".json"))
151 .map(|name| name.to_string())
152 .collect_vec();
153
154 let root_path = &root_location.path;
155 futures::stream::iter(tag_names)
156 .map(|tag_name| async move {
157 let contents =
158 TagContents::from_path(&tag_path(root_path, &tag_name), self.object_store())
159 .await?;
160 Ok((tag_name, contents))
161 })
162 .buffer_unordered(10)
163 .try_collect()
164 .await
165 }
166
167 pub async fn list(&self) -> Result<HashMap<String, TagContents>> {
168 self.fetch_tags()
169 .await
170 .map(|tags| tags.into_iter().collect())
171 }
172
173 pub async fn list_tags_ordered(
174 &self,
175 order: Option<Ordering>,
176 ) -> Result<Vec<(String, TagContents)>> {
177 let mut tags = self.fetch_tags().await?;
178 tags.sort_by(|a, b| {
179 let desired_ordering = order.unwrap_or(Ordering::Greater);
180 let version_ordering = a.1.version.cmp(&b.1.version);
181 let version_result = match desired_ordering {
182 Ordering::Less => version_ordering,
183 _ => version_ordering.reverse(),
184 };
185 version_result.then_with(|| a.0.cmp(&b.0))
186 });
187 Ok(tags)
188 }
189
190 pub async fn get_version(&self, tag: &str) -> Result<u64> {
191 self.get(tag).await.map(|tag| tag.version)
192 }
193
194 pub async fn get(&self, tag: &str) -> Result<TagContents> {
195 check_valid_tag(tag)?;
196
197 let root_location = self.refs.root()?;
198 let tag_file = tag_path(&root_location.path, tag);
199
200 if !self.object_store().exists(&tag_file).await? {
201 return Err(Error::RefNotFound {
202 message: format!("tag {} does not exist", tag),
203 });
204 }
205
206 let tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?;
207
208 Ok(tag_contents)
209 }
210
211 pub async fn create(&self, tag: &str, version: u64) -> Result<()> {
212 self.create_on_branch(tag, version, None).await
213 }
214
215 pub async fn create_on_branch(
216 &self,
217 tag: &str,
218 version_number: u64,
219 branch: Option<&str>,
220 ) -> Result<()> {
221 check_valid_tag(tag)?;
222
223 let root_location = self.refs.root()?;
224 let branch = branch.map(String::from);
225 let tag_file = tag_path(&root_location.path, tag);
226
227 if self.object_store().exists(&tag_file).await? {
228 return Err(Error::RefConflict {
229 message: format!("tag {} already exists", tag),
230 });
231 }
232
233 let branch_location = self.refs.base_location.find_branch(branch.clone())?;
234 let manifest_file = self
235 .refs
236 .commit_handler
237 .resolve_version_location(
238 &branch_location.path,
239 version_number,
240 &self.refs.object_store.inner,
241 )
242 .await?;
243
244 if !self.object_store().exists(&manifest_file.path).await? {
245 return Err(Error::VersionNotFound {
246 message: format!(
247 "version {}::{} does not exist",
248 branch.unwrap_or("Main".to_string()),
249 version_number
250 ),
251 });
252 }
253
254 let manifest_size = if let Some(size) = manifest_file.size {
255 size as usize
256 } else {
257 self.object_store().size(&manifest_file.path).await? as usize
258 };
259
260 let tag_contents = TagContents {
261 branch,
262 version: version_number,
263 manifest_size,
264 };
265
266 self.object_store()
267 .put(
268 &tag_file,
269 serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
270 )
271 .await
272 .map(|_| ())
273 }
274
275 pub async fn delete(&self, tag: &str) -> Result<()> {
276 check_valid_tag(tag)?;
277
278 let root_location = self.refs.root()?;
279 let tag_file = tag_path(&root_location.path, tag);
280
281 if !self.object_store().exists(&tag_file).await? {
282 return Err(Error::RefNotFound {
283 message: format!("tag {} does not exist", tag),
284 });
285 }
286
287 self.object_store().delete(&tag_file).await
288 }
289
290 pub async fn update(&self, tag: &str, version: u64) -> Result<()> {
291 self.update_on_branch(tag, version, None).await
292 }
293
294 pub async fn update_on_branch(
296 &self,
297 tag: &str,
298 version_number: u64,
299 branch: Option<&str>,
300 ) -> Result<()> {
301 check_valid_tag(tag)?;
302
303 let branch = branch.map(String::from);
304 let root_location = self.refs.root()?;
305 let tag_file = tag_path(&root_location.path, tag);
306
307 if !self.object_store().exists(&tag_file).await? {
308 return Err(Error::RefNotFound {
309 message: format!("tag {} does not exist", tag),
310 });
311 }
312
313 let target_branch_location = self.refs.base_location.find_branch(branch.clone())?;
314 let manifest_file = self
315 .refs
316 .commit_handler
317 .resolve_version_location(
318 &target_branch_location.path,
319 version_number,
320 &self.refs.object_store.inner,
321 )
322 .await?;
323
324 if !self.object_store().exists(&manifest_file.path).await? {
325 return Err(Error::VersionNotFound {
326 message: format!("version {} does not exist", version_number),
327 });
328 }
329
330 let manifest_size = if let Some(size) = manifest_file.size {
331 size as usize
332 } else {
333 self.object_store().size(&manifest_file.path).await? as usize
334 };
335
336 let tag_contents = TagContents {
337 branch,
338 version: version_number,
339 manifest_size,
340 };
341
342 self.object_store()
343 .put(
344 &tag_file,
345 serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
346 )
347 .await
348 .map(|_| ())
349 }
350}
351
352impl Branches<'_> {
353 pub async fn fetch(&self) -> Result<Vec<(String, BranchContents)>> {
354 let root_location = self.refs.root()?;
355 let base_path = base_branches_contents_path(&root_location.path);
356 let branch_files = self.object_store().read_dir(base_path).await?;
357
358 let branch_names: Vec<String> = branch_files
359 .iter()
360 .filter_map(|name| name.strip_suffix(".json"))
361 .map(|str| {
362 Path::from_url_path(str)
363 .map_err(|e| Error::InvalidRef {
364 message: format!(
365 "Failed to decode branch name: {} due to exception {}",
366 str, e
367 ),
368 })
369 .map(|path| path.to_string())
370 })
371 .collect::<Result<Vec<_>>>()?;
372
373 let branch_path = &root_location.path;
374 futures::stream::iter(branch_names)
375 .map(|name| async move {
376 let contents = BranchContents::from_path(
377 &branch_contents_path(branch_path, &name),
378 self.object_store(),
379 )
380 .await?;
381 Ok((name, contents))
382 })
383 .buffer_unordered(10)
384 .try_collect()
385 .await
386 }
387
388 pub async fn list(&self) -> Result<HashMap<String, BranchContents>> {
389 self.fetch()
390 .await
391 .map(|branches| branches.into_iter().collect())
392 }
393
394 pub async fn get(&self, branch: &str) -> Result<BranchContents> {
395 check_valid_branch(branch)?;
396
397 let root_location = self.refs.root()?;
398 let branch_file = branch_contents_path(&root_location.path, branch);
399
400 if !self.object_store().exists(&branch_file).await? {
401 return Err(Error::RefNotFound {
402 message: format!("branch {} does not exist", branch),
403 });
404 }
405
406 let branch_contents = BranchContents::from_path(&branch_file, self.object_store()).await?;
407
408 Ok(branch_contents)
409 }
410
411 pub async fn create(
412 &self,
413 branch_name: &str,
414 version_number: u64,
415 source_branch: Option<&str>,
416 ) -> Result<()> {
417 check_valid_branch(branch_name)?;
418
419 let source_branch = source_branch.map(String::from);
420 let root_location = self.refs.root()?;
421 let branch_file = branch_contents_path(&root_location.path, branch_name);
422 if self.object_store().exists(&branch_file).await? {
423 return Err(Error::RefConflict {
424 message: format!("branch {} already exists", branch_name),
425 });
426 }
427
428 let branch_location = self.refs.base_location.find_branch(source_branch.clone())?;
429 let manifest_file = self
431 .refs
432 .commit_handler
433 .resolve_version_location(
434 &branch_location.path,
435 version_number,
436 &self.refs.object_store.inner,
437 )
438 .await?;
439
440 if !self.object_store().exists(&manifest_file.path).await? {
441 return Err(Error::VersionNotFound {
442 message: format!("Manifest file {} does not exist", &manifest_file.path),
443 });
444 };
445
446 let branch_contents = BranchContents {
447 parent_branch: source_branch,
448 parent_version: version_number,
449 create_at: chrono::Utc::now().timestamp() as u64,
450 manifest_size: if let Some(size) = manifest_file.size {
451 size as usize
452 } else {
453 self.object_store().size(&manifest_file.path).await? as usize
454 },
455 };
456
457 self.object_store()
458 .put(
459 &branch_file,
460 serde_json::to_string_pretty(&branch_contents)?.as_bytes(),
461 )
462 .await
463 .map(|_| ())
464 }
465
466 pub async fn delete(&self, branch: &str, force: bool) -> Result<()> {
471 check_valid_branch(branch)?;
472
473 let root_location = self.refs.root()?;
474 let branch_file = branch_contents_path(&root_location.path, branch);
475 if self.object_store().exists(&branch_file).await? {
476 self.object_store().delete(&branch_file).await?;
477 } else if force {
478 log::warn!("BranchContents of {} does not exist", branch);
479 } else {
480 return Err(Error::RefNotFound {
481 message: format!("Branch {} does not exist", branch),
482 });
483 }
484
485 self.cleanup_branch_directories(branch).await
487 }
488
489 pub async fn list_ordered(
490 &self,
491 order: Option<Ordering>,
492 ) -> Result<Vec<(String, BranchContents)>> {
493 let mut branches = self.fetch().await?;
494 branches.sort_by(|a, b| {
495 let desired_ordering = order.unwrap_or(Ordering::Greater);
496 let version_ordering = a.1.parent_version.cmp(&b.1.parent_version);
497 let version_result = match desired_ordering {
498 Ordering::Less => version_ordering,
499 _ => version_ordering.reverse(),
500 };
501 version_result.then_with(|| a.0.cmp(&b.0))
502 });
503 Ok(branches)
504 }
505
506 async fn cleanup_branch_directories(&self, branch: &str) -> Result<()> {
508 let branches = self.list().await?;
509 let remaining_branches: Vec<&str> = branches.keys().map(|k| k.as_str()).collect();
510
511 if let Some(delete_path) =
512 Self::get_cleanup_path(branch, &remaining_branches, &self.refs.base_location)?
513 {
514 if let Err(e) = self.refs.object_store.remove_dir_all(delete_path).await {
515 match &e {
516 Error::IO { source, .. } => {
517 if let Some(io_err) = source.downcast_ref::<std::io::Error>() {
518 if io_err.kind() == ErrorKind::NotFound {
519 log::debug!("Branch directory already deleted: {}", io_err);
520 } else {
521 return Err(e);
522 }
523 } else {
524 return Err(e);
525 }
526 }
527 _ => return Err(e),
528 }
529 }
530 }
531 Ok(())
532 }
533
534 fn get_cleanup_path(
535 branch: &str,
536 remaining_branches: &[&str],
537 base_location: &BranchLocation,
538 ) -> Result<Option<Path>> {
539 let mut longest_used_length = 0;
540 for &candidate in remaining_branches {
541 let common_len = branch
542 .chars()
543 .zip(candidate.chars())
544 .take_while(|(a, b)| a == b)
545 .count();
546
547 if common_len > longest_used_length {
548 longest_used_length = common_len;
549 }
550 }
551 if longest_used_length == branch.len() {
553 return Ok(None);
554 }
555
556 let mut used_relative_path = &branch[..longest_used_length];
557 if let Some(last_slash_index) = used_relative_path.rfind('/') {
558 used_relative_path = &used_relative_path[..last_slash_index];
559 }
560 let unused_dir = &branch[used_relative_path.len()..].trim_start_matches('/');
561 if let Some(sub_dir) = unused_dir.split('/').next() {
562 let relative_dir = format!("{}/{}", used_relative_path, sub_dir);
563 let absolute_dir = base_location.find_branch(Some(relative_dir))?;
565 Ok(Some(absolute_dir.path))
566 } else {
567 Ok(None)
568 }
569 }
570}
571
572#[derive(Debug, Clone, Serialize, Deserialize)]
573#[serde(rename_all = "camelCase")]
574pub struct TagContents {
575 pub branch: Option<String>,
576 pub version: u64,
577 pub manifest_size: usize,
578}
579
580#[derive(Debug, Clone, Serialize, Deserialize)]
581#[serde(rename_all = "camelCase")]
582pub struct BranchContents {
583 pub parent_branch: Option<String>,
584 pub parent_version: u64,
585 pub create_at: u64, pub manifest_size: usize,
587}
588
589pub fn base_tags_path(base_path: &Path) -> Path {
590 base_path.child("_refs").child("tags")
591}
592
593pub fn base_branches_contents_path(base_path: &Path) -> Path {
594 base_path.child("_refs").child("branches")
595}
596
597pub fn tag_path(base_path: &Path, branch: &str) -> Path {
598 base_tags_path(base_path).child(format!("{}.json", branch))
599}
600
601pub fn branch_contents_path(base_path: &Path, branch: &str) -> Path {
603 base_branches_contents_path(base_path).child(format!("{}.json", branch))
604}
605
606async fn from_path<T>(path: &Path, object_store: &ObjectStore) -> Result<T>
607where
608 T: DeserializeOwned,
609{
610 let tag_reader = object_store.open(path).await?;
611 let tag_bytes = tag_reader
612 .get_range(Range {
613 start: 0,
614 end: tag_reader.size().await?,
615 })
616 .await?;
617 let json_str = String::from_utf8(tag_bytes.to_vec())
618 .map_err(|e| Error::corrupt_file(path.clone(), e.to_string(), location!()))?;
619 Ok(serde_json::from_str(&json_str)?)
620}
621
622impl TagContents {
623 pub async fn from_path(path: &Path, object_store: &ObjectStore) -> Result<Self> {
624 from_path(path, object_store).await
625 }
626}
627
628impl BranchContents {
629 pub async fn from_path(path: &Path, object_store: &ObjectStore) -> Result<Self> {
630 from_path(path, object_store).await
631 }
632}
633
634pub fn check_valid_branch(branch_name: &str) -> Result<()> {
635 if branch_name.is_empty() {
636 return Err(Error::InvalidRef {
637 message: "Branch name cannot be empty".to_string(),
638 });
639 }
640
641 if branch_name.starts_with('/') || branch_name.ends_with('/') {
643 return Err(Error::InvalidRef {
644 message: "Branch name cannot start or end with a '/'".to_string(),
645 });
646 }
647
648 if branch_name.contains("//") {
650 return Err(Error::InvalidRef {
651 message: "Branch name cannot contain consecutive '/'".to_string(),
652 });
653 }
654
655 if branch_name.contains("..") || branch_name.contains('\\') {
657 return Err(Error::InvalidRef {
658 message: "Branch name cannot contain '..' or '\\'".to_string(),
659 });
660 }
661
662 for segment in branch_name.split('/') {
663 if segment.is_empty() {
664 return Err(Error::InvalidRef {
665 message: "Branch name cannot have empty segments between '/'".to_string(),
666 });
667 }
668 if !segment
669 .chars()
670 .all(|c| c.is_alphanumeric() || c == '.' || c == '-' || c == '_')
671 {
672 return Err(Error::InvalidRef {
673 message: format!("Branch segment '{}' contains invalid characters. Only alphanumeric, '.', '-', '_' are allowed.", segment),
674 });
675 }
676 }
677
678 if branch_name.ends_with(".lock") {
679 return Err(Error::InvalidRef {
680 message: "Branch name cannot end with '.lock'".to_string(),
681 });
682 }
683
684 if branch_name.eq("main") {
685 return Err(Error::InvalidRef {
686 message: "Branch name cannot be 'main'".to_string(),
687 });
688 }
689 Ok(())
690}
691
692pub fn check_valid_tag(s: &str) -> Result<()> {
693 if s.is_empty() {
694 return Err(Error::InvalidRef {
695 message: "Ref cannot be empty".to_string(),
696 });
697 }
698
699 if !s
700 .chars()
701 .all(|c| c.is_alphanumeric() || c == '.' || c == '-' || c == '_')
702 {
703 return Err(Error::InvalidRef {
704 message: "Ref characters must be either alphanumeric, '.', '-' or '_'".to_string(),
705 });
706 }
707
708 if s.starts_with('.') {
709 return Err(Error::InvalidRef {
710 message: "Ref cannot begin with a dot".to_string(),
711 });
712 }
713
714 if s.ends_with('.') {
715 return Err(Error::InvalidRef {
716 message: "Ref cannot end with a dot".to_string(),
717 });
718 }
719
720 if s.ends_with(".lock") {
721 return Err(Error::InvalidRef {
722 message: "Ref cannot end with .lock".to_string(),
723 });
724 }
725
726 if s.contains("..") {
727 return Err(Error::InvalidRef {
728 message: "Ref cannot have two consecutive dots".to_string(),
729 });
730 }
731
732 Ok(())
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738 use datafusion::common::assert_contains;
739
740 use rstest::rstest;
741
742 #[rstest]
743 fn test_ok_ref(
744 #[values(
745 "ref",
746 "ref-with-dashes",
747 "ref.extension",
748 "ref_with_underscores",
749 "v1.2.3-rc4"
750 )]
751 r: &str,
752 ) {
753 check_valid_tag(r).unwrap();
754 }
755
756 #[rstest]
757 fn test_err_ref(
758 #[values(
759 "",
760 "../ref",
761 ".ref",
762 "/ref",
763 "@",
764 "deeply/nested/ref",
765 "nested//ref",
766 "nested/ref",
767 "nested\\ref",
768 "ref*",
769 "ref.lock",
770 "ref/",
771 "ref?",
772 "ref@{ref",
773 "ref[",
774 "ref^",
775 "~/ref",
776 "ref.",
777 "ref..ref"
778 )]
779 r: &str,
780 ) {
781 assert_contains!(
782 check_valid_tag(r).err().unwrap().to_string(),
783 "Ref is invalid: Ref"
784 );
785 }
786
787 #[rstest]
788 fn test_valid_branch_names(
789 #[values(
790 "feature/login",
791 "bugfix/issue-123",
792 "release/v1.2.3",
793 "user/someone/my-feature",
794 "normal",
795 "with-dash",
796 "with_underscore",
797 "with.dot"
798 )]
799 branch_name: &str,
800 ) {
801 assert!(
802 check_valid_branch(branch_name).is_ok(),
803 "Branch name '{}' should be valid",
804 branch_name
805 );
806 }
807
808 #[rstest]
809 fn test_invalid_branch_names(
810 #[values(
811 "",
812 "/start-with-slash",
813 "end-with-slash/",
814 "have//consecutive-slash",
815 "have..dot-dot",
816 "have\\backslash",
817 "segment/",
818 "/segment",
819 "segment//empty",
820 "name.lock",
821 "bad@character",
822 "bad segment"
823 )]
824 branch_name: &str,
825 ) {
826 assert!(
827 check_valid_branch(branch_name).is_err(),
828 "Branch name '{}' should be invalid",
829 branch_name
830 );
831 }
832
833 #[test]
834 fn test_path_functions() {
835 let base_path = Path::from("dataset");
836
837 let tags_path = base_tags_path(&base_path);
839 assert_eq!(tags_path, Path::from("dataset/_refs/tags"));
840
841 let branches_path = base_branches_contents_path(&base_path);
843 assert_eq!(branches_path, Path::from("dataset/_refs/branches"));
844
845 let tag_file_path = tag_path(&base_path, "v1.0.0");
847 assert_eq!(tag_file_path, Path::from("dataset/_refs/tags/v1.0.0.json"));
848
849 let branch_file_path = branch_contents_path(&base_path, "feature");
851 assert_eq!(
852 branch_file_path,
853 Path::from("dataset/_refs/branches/feature.json")
854 );
855 }
856
857 #[tokio::test]
858 async fn test_refs_from_traits() {
859 let version_ref: Ref = 42u64.into();
861 match version_ref {
862 Version(branch, v) => {
863 assert_eq!(v, Some(42));
864 assert_eq!(branch, None)
865 }
866 _ => panic!("Expected Version variant"),
867 }
868
869 let tag_ref: Ref = "test_tag".into();
871 match tag_ref {
872 Tag(name) => assert_eq!(name, "test_tag"),
873 _ => panic!("Expected Tag variant"),
874 }
875
876 let branch_ref: Ref = ("test_branch", 10u64).into();
878 match branch_ref {
879 Version(name, version) => {
880 assert_eq!(name.unwrap(), "test_branch");
881 assert_eq!(version, Some(10));
882 }
883 _ => panic!("Expected Branch variant"),
884 }
885 }
886
887 #[tokio::test]
888 async fn test_branch_contents_serialization() {
889 let branch_contents = BranchContents {
890 parent_branch: Some("main".to_string()),
891 parent_version: 42,
892 create_at: 1234567890,
893 manifest_size: 1024,
894 };
895
896 let json = serde_json::to_string(&branch_contents).unwrap();
898 assert!(json.contains("parentBranch"));
899 assert!(json.contains("parentVersion"));
900 assert!(json.contains("createAt"));
901 assert!(json.contains("manifestSize"));
902
903 let deserialized: BranchContents = serde_json::from_str(&json).unwrap();
905 assert_eq!(deserialized.parent_branch, branch_contents.parent_branch);
906 assert_eq!(deserialized.parent_version, branch_contents.parent_version);
907 assert_eq!(deserialized.create_at, branch_contents.create_at);
908 assert_eq!(deserialized.manifest_size, branch_contents.manifest_size);
909 }
910
911 #[tokio::test]
912 async fn test_tag_contents_serialization() {
913 let tag_contents = TagContents {
914 branch: Some("feature".to_string()),
915 version: 10,
916 manifest_size: 2048,
917 };
918
919 let json = serde_json::to_string(&tag_contents).unwrap();
921 assert!(json.contains("branch"));
922 assert!(json.contains("version"));
923 assert!(json.contains("manifestSize"));
924
925 let deserialized: TagContents = serde_json::from_str(&json).unwrap();
927 assert_eq!(deserialized.branch, tag_contents.branch);
928 assert_eq!(deserialized.version, tag_contents.version);
929 assert_eq!(deserialized.manifest_size, tag_contents.manifest_size);
930 }
931
932 #[rstest]
933 #[case("feature/auth", &["feature/login", "feature/signup"], Some("feature/auth"))]
934 #[case("feature/auth/module", &["feature/other"], Some("feature/auth"))]
935 #[case("a/b/c", &["a/b/d", "a/e"], Some("a/b/c"))]
936 #[case("feature/auth", &["feature/auth/sub"], None)]
937 #[case("feature", &["feature/sub1", "feature/sub2"], None)]
938 #[case("a/b", &["a/b/c", "a/b/d"], None)]
939 #[case("main", &[], Some("main"))]
940 #[case("a", &["a"], None)]
941 #[case("single", &["other"], Some("single"))]
942 #[case("feature/auth/login/oauth", &["feature/auth/login/basic", "feature/auth/signup"], Some("feature/auth/login/oauth"))]
943 #[case("feature/user-auth", &["feature/user-signup"], Some("feature/user-auth"))]
944 #[case("release/2024.01", &["release/2024.02"], Some("release/2024.01"))]
945 #[case("very/long/common/prefix/branch1", &["very/long/common/prefix/branch2"], Some("very/long/common/prefix/branch1"))]
946 #[case("feature", &["bugfix", "hotfix"], Some("feature"))]
947 #[case("feature/sub", &["feature", "other"], Some("feature/sub"))]
948 fn test_get_cleanup_path(
949 #[case] branch_to_delete: &str,
950 #[case] remaining_branches: &[&str],
951 #[case] expected_relative_cleanup_path: Option<&str>,
952 ) {
953 let dataset_root_dir = "file:///var/balabala/dataset1".to_string();
954 let base_location = BranchLocation {
955 path: Path::from(format!("{}/tree/random_branch", dataset_root_dir.as_str())),
956 uri: format!("{}/tree/random_branch", dataset_root_dir.as_str()),
957 branch: Some("random_branch".to_string()),
958 };
959
960 let result =
961 Branches::get_cleanup_path(branch_to_delete, remaining_branches, &base_location)
962 .unwrap();
963
964 match expected_relative_cleanup_path {
965 Some(expected_relative) => {
966 assert!(
967 result.is_some(),
968 "Expected cleanup path but got None for branch: {}",
969 branch_to_delete
970 );
971 let expected_full_path = base_location
972 .find_branch(Some(expected_relative.to_string()))
973 .unwrap()
974 .path;
975 assert_eq!(result.unwrap().as_ref(), expected_full_path.as_ref());
976 }
977 None => {
978 assert!(
979 result.is_none(),
980 "Expected no cleanup but got: {:?} for branch: {}",
981 result,
982 branch_to_delete
983 );
984 }
985 }
986 }
987}