1use std::ops::Range;
5
6use chrono::{DateTime, Utc};
7use futures::stream::{StreamExt, TryStreamExt};
8use itertools::Itertools;
9use lance_io::object_store::ObjectStore;
10use lance_table::io::commit::CommitHandler;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14
15use crate::dataset::branch_location::BranchLocation;
16use crate::dataset::refs::Ref::{Tag, Version, VersionNumber};
17use crate::utils::temporal::utc_now;
18use crate::{Error, Result};
19use serde::de::DeserializeOwned;
20use std::cmp::Ordering;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::Formatter;
24use std::io::ErrorKind;
25use uuid::Uuid;
26
27pub const MAIN_BRANCH: &str = "main";
28
29#[derive(Debug, Clone)]
31pub enum Ref {
32 VersionNumber(u64),
34 Version(Option<String>, Option<u64>),
38 Tag(String),
40}
41
42impl From<u64> for Ref {
43 fn from(reference: u64) -> Self {
44 VersionNumber(reference)
45 }
46}
47
48impl From<&str> for Ref {
49 fn from(reference: &str) -> Self {
50 Tag(reference.to_string())
51 }
52}
53
54impl From<(&str, u64)> for Ref {
55 fn from(reference: (&str, u64)) -> Self {
56 Version(standardize_branch(reference.0), Some(reference.1))
57 }
58}
59
60impl From<(Option<&str>, Option<u64>)> for Ref {
61 fn from(reference: (Option<&str>, Option<u64>)) -> Self {
62 Version(reference.0.and_then(standardize_branch), reference.1)
63 }
64}
65
66impl From<(&str, Option<u64>)> for Ref {
67 fn from(reference: (&str, Option<u64>)) -> Self {
68 Version(standardize_branch(reference.0), reference.1)
69 }
70}
71
72impl fmt::Display for Ref {
73 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
74 match self {
75 Version(branch, version_number) => {
76 let version_str = version_number
77 .map(|v| v.to_string())
78 .unwrap_or_else(|| "latest".to_string());
79 write!(f, "{}:{}", normalize_branch(branch.as_deref()), version_str)
80 }
81 VersionNumber(version_number) => write!(f, "{}", version_number),
82 Tag(tag_name) => write!(f, "{}", tag_name),
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
88pub struct Refs {
89 pub(crate) object_store: Arc<ObjectStore>,
90 pub(crate) commit_handler: Arc<dyn CommitHandler>,
91 pub(crate) base_location: BranchLocation,
92}
93
94impl Refs {
95 pub fn new(
96 object_store: Arc<ObjectStore>,
97 commit_handler: Arc<dyn CommitHandler>,
98 base_location: BranchLocation,
99 ) -> Self {
100 Self {
101 object_store,
102 commit_handler,
103 base_location,
104 }
105 }
106
107 pub fn tags(&self) -> Tags<'_> {
108 Tags { refs: self }
109 }
110
111 pub fn branches(&self) -> Branches<'_> {
112 Branches { refs: self }
113 }
114
115 pub fn base(&self) -> &Path {
116 &self.base_location.path
117 }
118
119 pub fn root(&self) -> Result<BranchLocation> {
120 self.base_location.find_main()
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct Tags<'a> {
127 refs: &'a Refs,
128}
129
130#[derive(Debug, Clone)]
132pub struct Branches<'a> {
133 refs: &'a Refs,
134}
135
136impl Tags<'_> {
137 fn object_store(&self) -> &ObjectStore {
138 &self.refs.object_store
139 }
140}
141
142impl Branches<'_> {
143 fn object_store(&self) -> &ObjectStore {
144 &self.refs.object_store
145 }
146}
147
148impl Tags<'_> {
149 pub async fn fetch_tags(&self) -> Result<Vec<(String, TagContents)>> {
150 let root_location = self.refs.root()?;
151 let base_path = base_tags_path(&root_location.path);
152 let tag_files = self.object_store().read_dir(base_path).await?;
153
154 let tag_names: Vec<String> = tag_files
155 .iter()
156 .filter_map(|name| name.strip_suffix(".json"))
157 .map(|name| name.to_string())
158 .collect_vec();
159
160 let root_path = &root_location.path;
161 futures::stream::iter(tag_names)
162 .map(|tag_name| async move {
163 let contents =
164 TagContents::from_path(&tag_path(root_path, &tag_name), self.object_store())
165 .await?;
166 Ok((tag_name, contents))
167 })
168 .buffer_unordered(10)
169 .try_collect()
170 .await
171 }
172
173 pub async fn list(&self) -> Result<HashMap<String, TagContents>> {
174 self.fetch_tags()
175 .await
176 .map(|tags| tags.into_iter().collect())
177 }
178
179 pub async fn list_tags_ordered(
180 &self,
181 order: Option<Ordering>,
182 ) -> Result<Vec<(String, TagContents)>> {
183 let mut tags = self.fetch_tags().await?;
184 tags.sort_by(|a, b| {
185 let desired_ordering = order.unwrap_or(Ordering::Greater);
186 let version_ordering = a.1.version.cmp(&b.1.version);
187 let version_result = match desired_ordering {
188 Ordering::Less => version_ordering,
189 _ => version_ordering.reverse(),
190 };
191 version_result.then_with(|| a.0.cmp(&b.0))
192 });
193 Ok(tags)
194 }
195
196 pub async fn get_version(&self, tag: &str) -> Result<u64> {
197 self.get(tag).await.map(|tag| tag.version)
198 }
199
200 pub async fn get(&self, tag: &str) -> Result<TagContents> {
201 check_valid_tag(tag)?;
202
203 let root_location = self.refs.root()?;
204 let tag_file = tag_path(&root_location.path, tag);
205
206 if !self.object_store().exists(&tag_file).await? {
207 return Err(Error::RefNotFound {
208 message: format!("tag {} does not exist", tag),
209 });
210 }
211
212 let tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?;
213 Ok(tag_contents)
214 }
215
216 pub async fn create(&self, tag: &str, reference: impl Into<Ref>) -> Result<()> {
217 check_valid_tag(tag)?;
218 let root_location = self.refs.root()?;
219 let tag_file = tag_path(&root_location.path, tag);
220
221 if self.object_store().exists(&tag_file).await? {
222 return Err(Error::RefConflict {
223 message: format!("tag {} already exists", tag),
224 });
225 }
226 let now = utc_now();
227 let tag_contents = self
228 .build_tag_content_by_ref(reference, Some(now), Some(now))
229 .await?;
230
231 self.object_store()
232 .put(
233 &tag_file,
234 serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
235 )
236 .await
237 .map(|_| ())
238 }
239
240 pub async fn delete(&self, tag: &str) -> Result<()> {
241 check_valid_tag(tag)?;
242
243 let root_location = self.refs.root()?;
244 let tag_file = tag_path(&root_location.path, tag);
245
246 if !self.object_store().exists(&tag_file).await? {
247 return Err(Error::RefNotFound {
248 message: format!("tag {} does not exist", tag),
249 });
250 }
251
252 self.object_store().delete(&tag_file).await
253 }
254
255 pub async fn update(&self, tag: &str, reference: impl Into<Ref>) -> Result<()> {
256 check_valid_tag(tag)?;
257
258 let root_location = self.refs.root()?;
259 let tag_file = tag_path(&root_location.path, tag);
260 if !self.object_store().exists(&tag_file).await? {
261 return Err(Error::RefNotFound {
262 message: format!("tag {} does not exist", tag),
263 });
264 }
265 let mut tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?;
266 let updated_reference = self
267 .build_tag_content_by_ref(reference, tag_contents.created_at, Some(utc_now()))
268 .await?;
269 tag_contents.branch = updated_reference.branch;
270 tag_contents.version = updated_reference.version;
271 tag_contents.created_at = updated_reference.created_at;
272 tag_contents.updated_at = updated_reference.updated_at;
273 tag_contents.manifest_size = updated_reference.manifest_size;
274
275 self.object_store()
276 .put(
277 &tag_file,
278 serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
279 )
280 .await
281 .map(|_| ())
282 }
283
284 pub async fn replace_metadata(
285 &self,
286 tag: &str,
287 metadata: HashMap<String, String>,
288 ) -> Result<()> {
289 check_valid_tag(tag)?;
290
291 let root_location = self.refs.root()?;
292 let tag_file = tag_path(&root_location.path, tag);
293 if !self.object_store().exists(&tag_file).await? {
294 return Err(Error::RefNotFound {
295 message: format!("tag {} does not exist", tag),
296 });
297 }
298
299 let mut tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?;
300 tag_contents.metadata = metadata;
301
302 self.object_store()
303 .put(
304 &tag_file,
305 serde_json::to_string_pretty(&tag_contents)?.as_bytes(),
306 )
307 .await
308 .map(|_| ())
309 }
310
311 async fn build_tag_content_by_ref(
312 &self,
313 reference: impl Into<Ref>,
314 created_at: Option<DateTime<Utc>>,
315 updated_at: Option<DateTime<Utc>>,
316 ) -> Result<TagContents> {
317 let reference = reference.into();
318 let (branch, version_number) = match reference {
319 Version(branch, version_number) => (branch, version_number),
320 VersionNumber(version_number) => {
321 (self.refs.base_location.branch.clone(), Some(version_number))
322 }
323 Tag(tag_name) => {
324 let tag_content = self.get(tag_name.as_str()).await?;
325 (tag_content.branch, Some(tag_content.version))
326 }
327 };
328
329 let branch_location = self.refs.base_location.find_branch(branch.as_deref())?;
330 let manifest_file = if let Some(version_number) = version_number {
331 self.refs
332 .commit_handler
333 .resolve_version_location(
334 &branch_location.path,
335 version_number,
336 &self.refs.object_store.inner,
337 )
338 .await?
339 } else {
340 self.refs
341 .commit_handler
342 .resolve_latest_location(&branch_location.path, &self.refs.object_store)
343 .await?
344 };
345
346 if !self.object_store().exists(&manifest_file.path).await? {
347 return Err(Error::VersionNotFound {
348 message: format!("version {} does not exist", Version(branch, version_number)),
349 });
350 }
351
352 let manifest_size = if let Some(size) = manifest_file.size {
353 size as usize
354 } else {
355 self.object_store().size(&manifest_file.path).await? as usize
356 };
357
358 let tag_contents = TagContents {
359 branch,
360 version: manifest_file.version,
361 created_at,
362 updated_at,
363 manifest_size,
364 metadata: HashMap::new(),
365 };
366 Ok(tag_contents)
367 }
368}
369
370impl Branches<'_> {
371 pub(crate) fn is_main_branch(branch: Option<&str>) -> bool {
372 branch == Some(MAIN_BRANCH)
373 }
374
375 pub async fn fetch(&self) -> Result<Vec<(String, BranchContents)>> {
376 let root_location = self.refs.root()?;
377 let base_path = base_branches_contents_path(&root_location.path);
378 let branch_files = self.object_store().read_dir(base_path).await?;
379
380 let branch_names: Vec<String> = branch_files
381 .iter()
382 .filter_map(|name| name.strip_suffix(".json"))
383 .map(|str| {
384 Path::from_url_path(str)
385 .map_err(|e| Error::InvalidRef {
386 message: format!(
387 "Failed to decode branch name: {} due to exception {}",
388 str, e
389 ),
390 })
391 .map(|path| path.to_string())
392 })
393 .collect::<Result<Vec<_>>>()?;
394
395 let branch_path = &root_location.path;
396 futures::stream::iter(branch_names)
397 .map(|name| async move {
398 let contents = BranchContents::from_path(
399 &branch_contents_path(branch_path, &name),
400 self.object_store(),
401 &name,
402 )
403 .await?;
404 Ok((name, contents))
405 })
406 .buffer_unordered(10)
407 .try_collect()
408 .await
409 }
410
411 pub async fn list(&self) -> Result<HashMap<String, BranchContents>> {
412 self.fetch()
413 .await
414 .map(|branches| branches.into_iter().collect())
415 }
416
417 pub async fn get(&self, branch: &str) -> Result<BranchContents> {
418 check_valid_branch(branch)?;
419
420 let root_location = self.refs.root()?;
421 let branch_file = branch_contents_path(&root_location.path, branch);
422
423 if !self.object_store().exists(&branch_file).await? {
424 return Err(Error::RefNotFound {
425 message: format!("branch {} does not exist", branch),
426 });
427 }
428
429 let branch_contents =
430 BranchContents::from_path(&branch_file, self.object_store(), branch).await?;
431
432 Ok(branch_contents)
433 }
434
435 pub async fn get_identifier(&self, branch: Option<&str>) -> Result<BranchIdentifier> {
436 if let Some(branch_name) = branch {
437 let branch_contents = self.get(branch_name).await?;
438 Ok(branch_contents.identifier)
439 } else {
440 Ok(BranchIdentifier::main())
441 }
442 }
443
444 pub(crate) async fn create(
446 &self,
447 branch_name: &str,
448 version_number: u64,
449 source_branch: Option<&str>,
450 ) -> Result<()> {
451 check_valid_branch(branch_name)?;
452
453 let source_branch = source_branch.and_then(standardize_branch);
454 let root_location = self.refs.root()?;
455 let branch_file = branch_contents_path(&root_location.path, branch_name);
456 if self.object_store().exists(&branch_file).await? {
457 return Err(Error::RefConflict {
458 message: format!("branch {} already exists", branch_name),
459 });
460 }
461
462 let branch_location = self
463 .refs
464 .base_location
465 .find_branch(source_branch.as_deref())?;
466 let manifest_file = self
468 .refs
469 .commit_handler
470 .resolve_version_location(
471 &branch_location.path,
472 version_number,
473 &self.refs.object_store.inner,
474 )
475 .await?;
476
477 if !self.object_store().exists(&manifest_file.path).await? {
478 return Err(Error::VersionNotFound {
479 message: format!("Manifest file {} does not exist", &manifest_file.path),
480 });
481 };
482
483 let parent_branch_id = if let Some(ref parent_branch) = source_branch {
484 let parent_file = branch_contents_path(&root_location.path, parent_branch);
485 if self.object_store().exists(&parent_file).await? {
486 BranchContents::from_path(&parent_file, self.object_store(), parent_branch)
487 .await?
488 .identifier
489 } else {
490 return Err(Error::RefNotFound {
491 message: format!("Parent branch {} does not exist", branch_name),
492 });
493 }
494 } else {
495 BranchIdentifier::main()
496 };
497
498 let branch_contents = BranchContents {
499 parent_branch: source_branch,
500 identifier: BranchIdentifier::new(&parent_branch_id, version_number),
501 parent_version: version_number,
502 create_at: chrono::Utc::now().timestamp() as u64,
503 manifest_size: if let Some(size) = manifest_file.size {
504 size as usize
505 } else {
506 self.object_store().size(&manifest_file.path).await? as usize
507 },
508 metadata: HashMap::new(),
509 };
510
511 self.object_store()
512 .put(
513 &branch_file,
514 serde_json::to_string_pretty(&branch_contents)?.as_bytes(),
515 )
516 .await
517 .map(|_| ())
518 }
519
520 pub async fn replace_metadata(
521 &self,
522 branch: &str,
523 metadata: HashMap<String, String>,
524 ) -> Result<()> {
525 check_valid_branch(branch)?;
526
527 let root_location = self.refs.root()?;
528 let branch_file = branch_contents_path(&root_location.path, branch);
529 if !self.object_store().exists(&branch_file).await? {
530 return Err(Error::RefNotFound {
531 message: format!("branch {} does not exist", branch),
532 });
533 }
534
535 let mut branch_contents =
536 BranchContents::from_path(&branch_file, self.object_store(), branch).await?;
537 branch_contents.metadata = metadata;
538
539 self.object_store()
540 .put(
541 &branch_file,
542 serde_json::to_string_pretty(&branch_contents)?.as_bytes(),
543 )
544 .await
545 .map(|_| ())
546 }
547
548 pub async fn delete(&self, branch: &str, force: bool) -> Result<()> {
553 check_valid_branch(branch)?;
554
555 let all_branches = self.list().await?;
556 let branch_id = all_branches
557 .get(branch)
558 .map(|contents| contents.identifier.clone());
559 if let Some(branch_id) = branch_id {
560 let referenced_versions = branch_id.collect_referenced_versions(&all_branches);
561 if !referenced_versions.is_empty() {
562 return Err(Error::RefConflict {
563 message: format!(
564 "Branch {} is referenced by {:?} versions, can not delete",
565 branch, referenced_versions
566 ),
567 });
568 }
569 } else if !force {
570 return Err(Error::RefNotFound {
571 message: format!("Branch {} does not exist", branch),
572 });
573 } else {
574 log::warn!("BranchContents of {} does not exist", branch);
575 }
576
577 let root_location = self.refs.root()?;
578 let branch_file = branch_contents_path(&root_location.path, branch);
579 if self.object_store().exists(&branch_file).await? {
580 self.object_store().delete(&branch_file).await?;
581 }
582
583 self.cleanup_branch_directories(branch).await
585 }
586
587 pub async fn list_ordered(
588 &self,
589 order: Option<Ordering>,
590 ) -> Result<Vec<(String, BranchContents)>> {
591 let mut branches = self.fetch().await?;
592 branches.sort_by(|a, b| {
593 let desired_ordering = order.unwrap_or(Ordering::Greater);
594 let version_ordering = a.1.parent_version.cmp(&b.1.parent_version);
595 let version_result = match desired_ordering {
596 Ordering::Less => version_ordering,
597 _ => version_ordering.reverse(),
598 };
599 version_result.then_with(|| a.0.cmp(&b.0))
600 });
601 Ok(branches)
602 }
603
604 async fn cleanup_branch_directories(&self, branch: &str) -> Result<()> {
606 let branches = self.list().await?;
607 let remaining_branches: Vec<&str> = branches.keys().map(|k| k.as_str()).collect();
608
609 if let Some(delete_path) =
610 Self::get_cleanup_path(branch, &remaining_branches, &self.refs.base_location)?
611 && let Err(e) = self.refs.object_store.remove_dir_all(delete_path).await
612 {
613 match &e {
614 Error::IO { source, .. } => {
615 if let Some(io_err) = source.downcast_ref::<std::io::Error>() {
616 if io_err.kind() == ErrorKind::NotFound {
617 log::debug!("Branch directory already deleted: {}", io_err);
618 } else {
619 return Err(e);
620 }
621 } else {
622 return Err(e);
623 }
624 }
625 _ => return Err(e),
626 }
627 }
628 Ok(())
629 }
630
631 fn get_cleanup_path(
632 branch: &str,
633 remaining_branches: &[&str],
634 base_location: &BranchLocation,
635 ) -> Result<Option<Path>> {
636 let deleted_branch = BranchRelativePath::new(branch);
637 let mut related_branches = Vec::new();
638 let mut relative_dir = branch.to_string();
639 for branch in remaining_branches {
640 let branch = BranchRelativePath::new(branch);
641 if branch.is_parent(&deleted_branch) || branch.is_child(&deleted_branch) {
642 related_branches.push(branch);
643 } else if let Some(common_prefix) = deleted_branch.find_common_prefix(&branch) {
644 related_branches.push(common_prefix);
645 }
646 }
647
648 related_branches.sort_by(|a, b| a.segments.len().cmp(&b.segments.len()).reverse());
649 if let Some(branch) = related_branches.first() {
650 if branch.is_child(&deleted_branch) || branch == &deleted_branch {
651 return Ok(None);
654 } else {
655 relative_dir = format!(
659 "{}/{}",
660 branch.segments.join("/"),
661 deleted_branch.segments[branch.segments.len()]
662 );
663 }
664 } else if !deleted_branch.segments.is_empty() {
665 relative_dir = deleted_branch.segments[0].to_string();
669 }
670
671 let absolute_dir = base_location.find_branch(Some(relative_dir.as_str()))?;
672 Ok(Some(absolute_dir.path))
673 }
674}
675
676#[derive(Debug, PartialEq)]
677struct BranchRelativePath<'a> {
678 segments: Vec<&'a str>,
679}
680
681impl<'a> BranchRelativePath<'a> {
682 fn new(branch_name: &'a str) -> Self {
683 let segments = branch_name.split('/').collect_vec();
684 Self { segments }
685 }
686
687 fn find_common_prefix(&self, other: &Self) -> Option<Self> {
688 let mut common_segments = Vec::new();
689 for (i, segment) in self.segments.iter().enumerate() {
690 if i >= other.segments.len() || other.segments[i] != *segment {
691 break;
692 }
693 common_segments.push(*segment);
694 }
695 if !common_segments.is_empty() {
696 Some(BranchRelativePath {
697 segments: common_segments,
698 })
699 } else {
700 None
701 }
702 }
703
704 fn is_parent(&self, other: &Self) -> bool {
705 if other.segments.len() <= self.segments.len() {
706 false
707 } else {
708 for (i, segment) in self.segments.iter().enumerate() {
709 if other.segments[i] != *segment {
710 return false;
711 }
712 }
713 true
714 }
715 }
716
717 fn is_child(&self, other: &Self) -> bool {
718 if other.segments.len() >= self.segments.len() {
719 false
720 } else {
721 for (i, segment) in other.segments.iter().enumerate() {
722 if self.segments[i] != *segment {
723 return false;
724 }
725 }
726 true
727 }
728 }
729}
730
731#[derive(Debug, Clone, Serialize, Deserialize)]
732#[serde(rename_all = "camelCase")]
733pub struct TagContents {
734 pub branch: Option<String>,
735 pub version: u64,
736 #[serde(skip_serializing_if = "Option::is_none")]
737 pub created_at: Option<DateTime<Utc>>,
738 #[serde(skip_serializing_if = "Option::is_none")]
739 pub updated_at: Option<DateTime<Utc>>,
740 pub manifest_size: usize,
741 #[serde(default)]
745 pub metadata: HashMap<String, String>,
746}
747
748#[derive(Debug, Clone, Serialize, Deserialize)]
749#[serde(rename_all = "camelCase")]
750pub struct BranchContents {
751 pub parent_branch: Option<String>,
752 #[serde(default = "BranchIdentifier::missing_identifier_sentinel")]
753 pub identifier: BranchIdentifier,
754 pub parent_version: u64,
755 pub create_at: u64, pub manifest_size: usize,
757 #[serde(default)]
761 pub metadata: HashMap<String, String>,
762}
763
764#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
765pub struct BranchIdentifier {
766 pub version_mapping: Vec<(u64, String)>,
767}
768
769impl BranchIdentifier {
770 pub fn new(parent: &Self, parent_version: u64) -> Self {
771 let mut version_mapping = parent.version_mapping.clone();
772 version_mapping.push((parent_version, Uuid::new_v4().simple().to_string()));
773 Self { version_mapping }
774 }
775
776 pub fn missing_identifier_sentinel() -> Self {
783 Self {
784 version_mapping: vec![(0, Uuid::nil().simple().to_string())],
785 }
786 }
787
788 fn synthetic_identifier(
789 branch_name: &str,
790 parent_branch: Option<&str>,
791 parent_version: u64,
792 create_at: u64,
793 ) -> Self {
794 let identifier_input = format!(
795 "branch_name={branch_name}\nparent_branch={}\nparent_version={parent_version}\ncreate_at={create_at}",
796 parent_branch.unwrap_or("")
797 );
798 Self {
799 version_mapping: vec![(
800 0,
801 Uuid::from_bytes(Self::synthetic_identifier_bytes(
802 identifier_input.as_bytes(),
803 ))
804 .simple()
805 .to_string(),
806 )],
807 }
808 }
809
810 fn synthetic_identifier_bytes(input: &[u8]) -> [u8; 16] {
811 const FNV_OFFSET: u64 = 0xcbf29ce484222325;
814 const FNV_PRIME: u64 = 0x100000001b3;
815
816 fn hash_with_seed(input: &[u8], seed: u64) -> u64 {
817 input.iter().fold(seed, |hash, byte| {
818 (hash ^ u64::from(*byte)).wrapping_mul(FNV_PRIME)
819 })
820 }
821
822 let first = hash_with_seed(input, FNV_OFFSET);
823 let second = hash_with_seed(input, FNV_OFFSET ^ 0x9e3779b97f4a7c15);
824 let mut bytes = [0; 16];
825 bytes[..8].copy_from_slice(&first.to_be_bytes());
826 bytes[8..].copy_from_slice(&second.to_be_bytes());
827 bytes
828 }
829
830 pub fn main() -> Self {
831 Self {
832 version_mapping: vec![],
833 }
834 }
835
836 pub fn parse(identifier: &str) -> Result<Self> {
837 let parts: Vec<&str> = identifier.split(':').collect();
838 if !parts.len().is_multiple_of(2) {
839 return Err(Error::InvalidRef {
840 message: format!(
841 "Invalid branch identifier: {}, format should be 'ver1:uuid1:ver2:uuid2:...:final_uuid'",
842 parts.len()
843 ),
844 });
845 }
846
847 let version_mapping = parts
848 .chunks_exact(2)
849 .map(|chunk| {
850 let version = chunk[0].parse::<u64>().map_err(|e| Error::InvalidRef {
851 message: format!("Invalid version number '{}': {}", chunk[0], e),
852 })?;
853 let uuid = chunk[1].to_string();
854 Ok((version, uuid))
855 })
856 .collect::<Result<Vec<_>>>()?;
857
858 Ok(Self { version_mapping })
859 }
860
861 pub fn find_referenced_version(&self, referenced_branch: &Self) -> Option<u64> {
862 let ref_mapping = &referenced_branch.version_mapping;
863 let next_idx = ref_mapping.len();
864
865 (self.version_mapping.len() > next_idx && self.version_mapping[..next_idx] == *ref_mapping)
866 .then(|| self.version_mapping[next_idx].0)
867 .filter(|&version| version > 0)
868 }
869
870 pub fn collect_referenced_versions(
873 &self,
874 branches: &HashMap<String, BranchContents>,
875 ) -> Vec<(String, u64)> {
876 let mut branch_ids = branches
877 .iter()
878 .map(|(name, branch)| (branch.identifier.clone(), name.clone()))
879 .collect::<Vec<_>>();
880 branch_ids.sort_by(|a, b| b.cmp(a));
882 branch_ids
883 .into_iter()
884 .filter_map(|(branch_id, name)| {
885 branch_id
886 .find_referenced_version(self)
887 .map(|version| (name, version))
888 })
889 .collect()
890 }
891}
892
893pub fn base_tags_path(base_path: &Path) -> Path {
894 base_path.clone().join("_refs").join("tags")
895}
896
897pub fn base_branches_contents_path(base_path: &Path) -> Path {
898 base_path.clone().join("_refs").join("branches")
899}
900
901pub fn tag_path(base_path: &Path, branch: &str) -> Path {
902 base_tags_path(base_path).join(format!("{}.json", branch))
903}
904
905pub fn branch_contents_path(base_path: &Path, branch: &str) -> Path {
907 base_branches_contents_path(base_path).join(format!("{}.json", branch))
908}
909
910pub(crate) fn normalize_branch(branch: Option<&str>) -> String {
911 match branch {
912 None => MAIN_BRANCH.to_string(),
913 Some(name) => name.to_string(),
914 }
915}
916
917pub(crate) fn standardize_branch(branch: &str) -> Option<String> {
918 match branch {
919 MAIN_BRANCH => None,
920 name => Some(name.to_string()),
921 }
922}
923
924async fn from_path<T>(path: &Path, object_store: &ObjectStore) -> Result<T>
925where
926 T: DeserializeOwned,
927{
928 let tag_reader = object_store.open(path).await?;
929 let tag_bytes = tag_reader
930 .get_range(Range {
931 start: 0,
932 end: tag_reader.size().await?,
933 })
934 .await?;
935 let json_str = String::from_utf8(tag_bytes.to_vec())
936 .map_err(|e| Error::corrupt_file(path.clone(), e.to_string()))?;
937 Ok(serde_json::from_str(&json_str)?)
938}
939
940impl TagContents {
941 pub async fn from_path(path: &Path, object_store: &ObjectStore) -> Result<Self> {
942 from_path(path, object_store).await
943 }
944}
945
946impl BranchContents {
947 pub async fn from_path(
948 path: &Path,
949 object_store: &ObjectStore,
950 branch_name: &str,
951 ) -> Result<Self> {
952 let mut contents: Self = from_path(path, object_store).await?;
953 if contents.identifier == BranchIdentifier::missing_identifier_sentinel() {
954 contents.identifier = BranchIdentifier::synthetic_identifier(
958 branch_name,
959 contents.parent_branch.as_deref(),
960 contents.parent_version,
961 contents.create_at,
962 );
963 }
964 Ok(contents)
965 }
966}
967
968pub fn check_valid_branch(branch_name: &str) -> Result<()> {
969 if branch_name.is_empty() {
970 return Err(Error::InvalidRef {
971 message: "Branch name cannot be empty".to_string(),
972 });
973 }
974
975 if branch_name.starts_with('/') || branch_name.ends_with('/') {
977 return Err(Error::InvalidRef {
978 message: "Branch name cannot start or end with a '/'".to_string(),
979 });
980 }
981
982 if branch_name.contains("//") {
984 return Err(Error::InvalidRef {
985 message: "Branch name cannot contain consecutive '/'".to_string(),
986 });
987 }
988
989 if branch_name.contains("..") || branch_name.contains('\\') {
991 return Err(Error::InvalidRef {
992 message: "Branch name cannot contain '..' or '\\'".to_string(),
993 });
994 }
995
996 for segment in branch_name.split('/') {
997 if segment.is_empty() {
998 return Err(Error::InvalidRef {
999 message: "Branch name cannot have empty segments between '/'".to_string(),
1000 });
1001 }
1002 if !segment
1003 .chars()
1004 .all(|c| c.is_alphanumeric() || c == '.' || c == '-' || c == '_')
1005 {
1006 return Err(Error::InvalidRef {
1007 message: format!(
1008 "Branch segment '{}' contains invalid characters. Only alphanumeric, '.', '-', '_' are allowed.",
1009 segment
1010 ),
1011 });
1012 }
1013 }
1014
1015 if branch_name.ends_with(".lock") {
1016 return Err(Error::InvalidRef {
1017 message: "Branch name cannot end with '.lock'".to_string(),
1018 });
1019 }
1020
1021 if branch_name.eq("main") {
1022 return Err(Error::InvalidRef {
1023 message: "Branch name cannot be 'main'".to_string(),
1024 });
1025 }
1026 Ok(())
1027}
1028
1029pub fn check_valid_tag(s: &str) -> Result<()> {
1030 if s.is_empty() {
1031 return Err(Error::InvalidRef {
1032 message: "Ref cannot be empty".to_string(),
1033 });
1034 }
1035
1036 if !s
1037 .chars()
1038 .all(|c| c.is_alphanumeric() || c == '.' || c == '-' || c == '_')
1039 {
1040 return Err(Error::InvalidRef {
1041 message: "Ref characters must be either alphanumeric, '.', '-' or '_'".to_string(),
1042 });
1043 }
1044
1045 if s.starts_with('.') {
1046 return Err(Error::InvalidRef {
1047 message: "Ref cannot begin with a dot".to_string(),
1048 });
1049 }
1050
1051 if s.ends_with('.') {
1052 return Err(Error::InvalidRef {
1053 message: "Ref cannot end with a dot".to_string(),
1054 });
1055 }
1056
1057 if s.ends_with(".lock") {
1058 return Err(Error::InvalidRef {
1059 message: "Ref cannot end with .lock".to_string(),
1060 });
1061 }
1062
1063 if s.contains("..") {
1064 return Err(Error::InvalidRef {
1065 message: "Ref cannot have two consecutive dots".to_string(),
1066 });
1067 }
1068
1069 Ok(())
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074 use super::*;
1075 use datafusion::common::assert_contains;
1076
1077 use rstest::rstest;
1078
1079 #[rstest]
1080 fn test_ok_ref(
1081 #[values(
1082 "ref",
1083 "ref-with-dashes",
1084 "ref.extension",
1085 "ref_with_underscores",
1086 "v1.2.3-rc4"
1087 )]
1088 r: &str,
1089 ) {
1090 check_valid_tag(r).unwrap();
1091 }
1092
1093 #[rstest]
1094 fn test_err_ref(
1095 #[values(
1096 "",
1097 "../ref",
1098 ".ref",
1099 "/ref",
1100 "@",
1101 "deeply/nested/ref",
1102 "nested//ref",
1103 "nested/ref",
1104 "nested\\ref",
1105 "ref*",
1106 "ref.lock",
1107 "ref/",
1108 "ref?",
1109 "ref@{ref",
1110 "ref[",
1111 "ref^",
1112 "~/ref",
1113 "ref.",
1114 "ref..ref"
1115 )]
1116 r: &str,
1117 ) {
1118 assert_contains!(
1119 check_valid_tag(r).err().unwrap().to_string(),
1120 "Ref is invalid: Ref"
1121 );
1122 }
1123
1124 #[rstest]
1125 fn test_valid_branch_names(
1126 #[values(
1127 "feature/login",
1128 "bugfix/issue-123",
1129 "release/v1.2.3",
1130 "user/someone/my-feature",
1131 "normal",
1132 "with-dash",
1133 "with_underscore",
1134 "with.dot"
1135 )]
1136 branch_name: &str,
1137 ) {
1138 assert!(
1139 check_valid_branch(branch_name).is_ok(),
1140 "Branch name '{}' should be valid",
1141 branch_name
1142 );
1143 }
1144
1145 #[rstest]
1146 fn test_invalid_branch_names(
1147 #[values(
1148 "",
1149 "/start-with-slash",
1150 "end-with-slash/",
1151 "have//consecutive-slash",
1152 "have..dot-dot",
1153 "have\\backslash",
1154 "segment/",
1155 "/segment",
1156 "segment//empty",
1157 "name.lock",
1158 "bad@character",
1159 "bad segment"
1160 )]
1161 branch_name: &str,
1162 ) {
1163 assert!(
1164 check_valid_branch(branch_name).is_err(),
1165 "Branch name '{}' should be invalid",
1166 branch_name
1167 );
1168 }
1169
1170 #[test]
1171 fn test_path_functions() {
1172 let base_path = Path::from("dataset");
1173
1174 let tags_path = base_tags_path(&base_path);
1176 assert_eq!(tags_path, Path::from("dataset/_refs/tags"));
1177
1178 let branches_path = base_branches_contents_path(&base_path);
1180 assert_eq!(branches_path, Path::from("dataset/_refs/branches"));
1181
1182 let tag_file_path = tag_path(&base_path, "v1.0.0");
1184 assert_eq!(tag_file_path, Path::from("dataset/_refs/tags/v1.0.0.json"));
1185
1186 let branch_file_path = branch_contents_path(&base_path, "feature");
1188 assert_eq!(
1189 branch_file_path,
1190 Path::from("dataset/_refs/branches/feature.json")
1191 );
1192 }
1193
1194 #[tokio::test]
1195 async fn test_refs_from_traits() {
1196 let version_ref: Ref = 42u64.into();
1198 match version_ref {
1199 VersionNumber(version_number) => {
1200 assert_eq!(version_number, 42);
1201 }
1202 _ => panic!("Expected Version variant"),
1203 }
1204
1205 let tag_ref: Ref = "test_tag".into();
1207 match tag_ref {
1208 Tag(name) => assert_eq!(name, "test_tag"),
1209 _ => panic!("Expected Tag variant"),
1210 }
1211
1212 let branch_ref: Ref = ("test_branch", 10u64).into();
1214 match branch_ref {
1215 Version(name, version) => {
1216 assert_eq!(name.unwrap(), "test_branch");
1217 assert_eq!(version, Some(10));
1218 }
1219 _ => panic!("Expected Branch variant"),
1220 }
1221 }
1222
1223 #[tokio::test]
1224 async fn test_branch_contents_serialization() {
1225 let branch_contents = BranchContents {
1226 parent_branch: Some("main".to_string()),
1227 identifier: BranchIdentifier::missing_identifier_sentinel(),
1228 parent_version: 42,
1229 create_at: 1234567890,
1230 manifest_size: 1024,
1231 metadata: HashMap::from([("description".to_string(), "production branch".to_string())]),
1232 };
1233
1234 let json = serde_json::to_string(&branch_contents).unwrap();
1236 assert!(json.contains("parentBranch"));
1237 assert!(json.contains("parentVersion"));
1238 assert!(json.contains("createAt"));
1239 assert!(json.contains("manifestSize"));
1240 assert!(json.contains("metadata"));
1241
1242 let deserialized: BranchContents = serde_json::from_str(&json).unwrap();
1244 assert_eq!(deserialized.parent_branch, branch_contents.parent_branch);
1245 assert_eq!(deserialized.parent_version, branch_contents.parent_version);
1246 assert_eq!(deserialized.create_at, branch_contents.create_at);
1247 assert_eq!(deserialized.manifest_size, branch_contents.manifest_size);
1248 assert_eq!(deserialized.metadata, branch_contents.metadata);
1249
1250 let legacy_json = r#"{"parentBranch":"main","parentVersion":42,"createAt":1234567890,"manifestSize":1024}"#;
1252 let legacy_deserialized: BranchContents = serde_json::from_str(legacy_json).unwrap();
1253 assert!(legacy_deserialized.metadata.is_empty());
1254 }
1255
1256 #[tokio::test]
1257 async fn test_branch_synthetic_uuid_is_stable() {
1258 let legacy_json = r#"{"parentBranch":"main","parentVersion":42,"createAt":1234567890,"manifestSize":1024}"#;
1259 let store = ObjectStore::memory();
1260 let base_path = Path::from("dataset");
1261 let first_path = branch_contents_path(&base_path, "legacy_branch");
1262 store
1263 .put(&first_path, legacy_json.as_bytes())
1264 .await
1265 .unwrap();
1266 let second_path = branch_contents_path(&base_path, "legacy_branch_other");
1267 store
1268 .put(&second_path, legacy_json.as_bytes())
1269 .await
1270 .unwrap();
1271
1272 let first = BranchContents::from_path(&first_path, &store, "legacy_branch")
1273 .await
1274 .unwrap();
1275 let second = BranchContents::from_path(&first_path, &store, "legacy_branch")
1276 .await
1277 .unwrap();
1278 assert_eq!(first.identifier, second.identifier);
1279 assert_ne!(
1280 first.identifier,
1281 BranchIdentifier::missing_identifier_sentinel()
1282 );
1283 assert_eq!(first.identifier.version_mapping[0].1.len(), 32);
1284 assert!(
1285 first.identifier.version_mapping[0]
1286 .1
1287 .chars()
1288 .all(|ch| ch.is_ascii_hexdigit() && !ch.is_ascii_uppercase())
1289 );
1290
1291 let other = BranchContents::from_path(&second_path, &store, "legacy_branch_other")
1292 .await
1293 .unwrap();
1294 assert_ne!(first.identifier, other.identifier);
1295 }
1296
1297 #[tokio::test]
1298 async fn test_tag_contents_serialization() {
1299 let tag_contents = TagContents {
1300 branch: Some("feature".to_string()),
1301 version: 10,
1302 created_at: Some(chrono::DateTime::from_timestamp(1_234_567_000, 456_000_000).unwrap()),
1303 updated_at: Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()),
1304 manifest_size: 2048,
1305 metadata: HashMap::from([("channel".to_string(), "release".to_string())]),
1306 };
1307
1308 let json = serde_json::to_string(&tag_contents).unwrap();
1310 assert!(json.contains("branch"));
1311 assert!(json.contains("version"));
1312 assert!(json.contains("createdAt"));
1313 assert!(json.contains("updatedAt"));
1314 assert!(json.contains("manifestSize"));
1315 assert!(json.contains("metadata"));
1316
1317 let deserialized: TagContents = serde_json::from_str(&json).unwrap();
1319 assert_eq!(deserialized.branch, tag_contents.branch);
1320 assert_eq!(deserialized.version, tag_contents.version);
1321 assert_eq!(deserialized.created_at, tag_contents.created_at);
1322 assert_eq!(deserialized.updated_at, tag_contents.updated_at);
1323 assert_eq!(deserialized.manifest_size, tag_contents.manifest_size);
1324 assert_eq!(deserialized.metadata, tag_contents.metadata);
1325
1326 let tag_contents_without_created_at = TagContents {
1327 branch: Some("feature".to_string()),
1328 version: 10,
1329 created_at: None,
1330 updated_at: Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()),
1331 manifest_size: 2048,
1332 metadata: HashMap::new(),
1333 };
1334 let json_without_created_at =
1335 serde_json::to_string(&tag_contents_without_created_at).unwrap();
1336 assert!(!json_without_created_at.contains("createdAt"));
1337 assert!(json_without_created_at.contains("updatedAt"));
1338
1339 let legacy_json = r#"{"branch":"feature","version":10,"manifestSize":2048}"#;
1341 let legacy_deserialized: TagContents = serde_json::from_str(legacy_json).unwrap();
1342 assert_eq!(legacy_deserialized.created_at, None);
1343 assert_eq!(legacy_deserialized.updated_at, None);
1344 assert!(legacy_deserialized.metadata.is_empty());
1345
1346 let legacy_updated_only_json = r#"{"branch":"feature","version":10,"updatedAt":"2009-02-13T23:31:30.123Z","manifestSize":2048}"#;
1347 let legacy_updated_only_deserialized: TagContents =
1348 serde_json::from_str(legacy_updated_only_json).unwrap();
1349 assert_eq!(legacy_updated_only_deserialized.created_at, None);
1350 assert_eq!(
1351 legacy_updated_only_deserialized.updated_at,
1352 Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap())
1353 );
1354 assert!(legacy_updated_only_deserialized.metadata.is_empty());
1355 }
1356
1357 #[rstest]
1358 #[case("feature/auth", &["feature/auth/sub"], None)]
1359 #[case("feature", &["feature/sub1", "feature/sub2"], None)]
1360 #[case("a/b", &["a/b/c", "b/c/d"], None)]
1361 #[case("main", &[], Some("main"))]
1362 #[case("a", &["a"], None)]
1363 #[case("feature/auth", &["feature/login", "feature/signup"], Some("feature/auth"))]
1364 #[case("feature/sub", &["feature", "other"], Some("feature/sub"))]
1365 #[case("very/long/common/prefix/branch1", &["very/long/common/prefix/branch2"], Some("very/long/common/prefix/branch1"))]
1366 #[case("feature/auth/module", &["feature/other"], Some("feature/auth"))]
1367 #[case("feature/dev", &["bugfix", "hotfix"], Some("feature"))]
1368 #[case("branch1", &["dev/branch2", "feature/nathan/branch3", "branch4"], Some("branch1"))]
1369 fn test_get_cleanup_path(
1370 #[case] branch_to_delete: &str,
1371 #[case] remaining_branches: &[&str],
1372 #[case] expected_relative_cleanup_path: Option<&str>,
1373 ) {
1374 let dataset_root_dir = "file:///var/balabala/dataset1".to_string();
1375 let base_location = BranchLocation {
1376 path: Path::from(format!("{}/tree/random_branch", dataset_root_dir.as_str())),
1377 uri: format!("{}/tree/random_branch", dataset_root_dir.as_str()),
1378 branch: Some("random_branch".to_string()),
1379 };
1380
1381 let result =
1382 Branches::get_cleanup_path(branch_to_delete, remaining_branches, &base_location)
1383 .unwrap();
1384
1385 match expected_relative_cleanup_path {
1386 Some(expected_relative) => {
1387 assert!(
1388 result.is_some(),
1389 "Expected cleanup path but got None for branch: {}",
1390 branch_to_delete
1391 );
1392 let expected_full_path = base_location
1393 .find_branch(Some(expected_relative))
1394 .unwrap()
1395 .path;
1396 assert_eq!(result.unwrap().as_ref(), expected_full_path.as_ref());
1397 }
1398 None => {
1399 assert!(
1400 result.is_none(),
1401 "Expected no cleanup but got: {:?} for branch: {}",
1402 result,
1403 branch_to_delete
1404 );
1405 }
1406 }
1407 }
1408
1409 fn build_mock_branch_contents() -> HashMap<String, BranchContents> {
1421 fn build(
1422 parent_name: Option<&str>,
1423 parent_branch: Option<&BranchContents>,
1424 parent_ver: u64,
1425 ) -> BranchContents {
1426 let parent_branch_id = if let Some(parent_branch) = parent_branch {
1427 parent_branch.identifier.clone()
1428 } else {
1429 BranchIdentifier::main()
1430 };
1431 BranchContents {
1432 parent_branch: parent_name.map(String::from),
1433 identifier: BranchIdentifier::new(&parent_branch_id, parent_ver),
1434 parent_version: parent_ver,
1435 create_at: 0,
1436 manifest_size: 1,
1437 metadata: HashMap::new(),
1438 }
1439 }
1440 let mut contents = HashMap::new();
1441 contents.insert("branch1".to_string(), build(None, None, 1));
1442 contents.insert(
1443 "dev/branch2".to_string(),
1444 build(Some("branch1"), contents.get("branch1"), 2),
1445 );
1446 contents.insert(
1447 "feature/nathan/branch3".to_string(),
1448 build(Some("dev/branch2"), contents.get("dev/branch2"), 3),
1449 );
1450 contents.insert("branch4".to_string(), build(None, None, 5));
1451 contents
1452 }
1453
1454 #[test]
1455 fn test_collect_children_for_branch3() {
1456 let all_branches = build_mock_branch_contents();
1457 let root_id = all_branches
1458 .get("feature/nathan/branch3")
1459 .unwrap()
1460 .identifier
1461 .clone();
1462 assert!(
1463 root_id
1464 .collect_referenced_versions(&all_branches)
1465 .is_empty()
1466 );
1467 }
1468
1469 #[test]
1470 fn test_collect_children_for_branch2() {
1471 let all_branches = build_mock_branch_contents();
1472 let root_id = all_branches.get("dev/branch2").unwrap().identifier.clone();
1473 let children = root_id.collect_referenced_versions(&all_branches);
1474
1475 assert_eq!(children.len(), 1);
1476 assert_eq!(children[0].0.as_str(), "feature/nathan/branch3");
1477 assert_eq!(children[0].1, 3);
1478 }
1479
1480 #[test]
1481 fn test_collect_children_for_branch1() {
1482 let all_branches = build_mock_branch_contents();
1483 let root_id = all_branches.get("branch1").unwrap().identifier.clone();
1484 let children = root_id.collect_referenced_versions(&all_branches);
1485
1486 assert_eq!(children.len(), 2);
1487 assert_eq!(children[0].0.as_str(), "feature/nathan/branch3");
1488 assert_eq!(children[1].0.as_str(), "dev/branch2");
1489 assert_eq!(children[0].1, 2);
1490 assert_eq!(children[1].1, 2);
1491 }
1492
1493 #[test]
1494 fn test_collect_children_for_main() {
1495 let all_branches = build_mock_branch_contents();
1496 let root_id = BranchIdentifier::main();
1497 let children = root_id.collect_referenced_versions(&all_branches);
1498
1499 assert_eq!(children.len(), 4);
1500 assert_eq!(children[0].0.as_str(), "branch4");
1501 assert_eq!(children[1].0.as_str(), "feature/nathan/branch3");
1502 assert_eq!(children[2].0.as_str(), "dev/branch2");
1503 assert_eq!(children[3].0.as_str(), "branch1");
1504 assert_eq!(children[0].1, 5);
1505 assert_eq!(children[1].1, 1);
1506 assert_eq!(children[2].1, 1);
1507 assert_eq!(children[3].1, 1);
1508 }
1509}