1use std::io;
26use std::sync::atomic::AtomicBool;
27use std::sync::Arc;
28use std::{fmt::Debug, fs::DirEntry};
29
30use futures::{
31 future::{self, BoxFuture},
32 stream::BoxStream,
33 StreamExt, TryStreamExt,
34};
35use log::warn;
36use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
37use snafu::location;
38use url::Url;
39
40#[cfg(feature = "dynamodb")]
41pub mod dynamodb;
42pub mod external_manifest;
43
44use lance_core::{Error, Result};
45use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
46
47#[cfg(feature = "dynamodb")]
48use {
49 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
50 aws_credential_types::provider::error::CredentialsError,
51 aws_credential_types::provider::ProvideCredentials,
52 lance_io::object_store::{build_aws_credential, StorageOptions},
53 object_store::aws::AmazonS3ConfigKey,
54 object_store::aws::AwsCredentialProvider,
55 std::borrow::Cow,
56 std::time::{Duration, SystemTime},
57};
58
59use crate::format::{is_detached_version, Index, Manifest};
60
61const VERSIONS_DIR: &str = "_versions";
62const MANIFEST_EXTENSION: &str = "manifest";
63const DETACHED_VERSION_PREFIX: &str = "d";
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum ManifestNamingScheme {
68 V1,
70 V2,
74}
75
76impl ManifestNamingScheme {
77 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
78 let directory = base.child(VERSIONS_DIR);
79 if is_detached_version(version) {
80 let directory = base.child(VERSIONS_DIR);
85 directory.child(format!(
86 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
87 ))
88 } else {
89 match self {
90 Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
91 Self::V2 => {
92 let inverted_version = u64::MAX - version;
93 directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
94 }
95 }
96 }
97 }
98
99 pub fn parse_version(&self, filename: &str) -> Option<u64> {
100 let file_number = filename
101 .split_once('.')
102 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
104 match self {
105 Self::V1 => file_number,
106 Self::V2 => file_number.map(|v| u64::MAX - v),
107 }
108 }
109
110 pub fn detect_scheme(filename: &str) -> Option<Self> {
111 if filename.starts_with(DETACHED_VERSION_PREFIX) {
112 return Some(Self::V2);
114 }
115 if filename.ends_with(MANIFEST_EXTENSION) {
116 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
117 if filename.len() == V2_LEN {
118 Some(Self::V2)
119 } else {
120 Some(Self::V1)
121 }
122 } else {
123 None
124 }
125 }
126
127 pub fn detect_scheme_staging(filename: &str) -> Self {
128 if filename.chars().nth(20) == Some('.') {
131 Self::V2
132 } else {
133 Self::V1
134 }
135 }
136}
137
138pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
148 object_store
149 .inner
150 .list(Some(&dataset_base.child(VERSIONS_DIR)))
151 .try_filter(|res| {
152 let res = if let Some(filename) = res.location.filename() {
153 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
154 } else {
155 false
156 };
157 future::ready(res)
158 })
159 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
160 let filename = meta.location.filename().unwrap();
161 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
162 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
163 object_store.inner.rename(&meta.location, &path).await?;
164 Ok(())
165 })
166 .await?;
167
168 Ok(())
169}
170
171pub type ManifestWriter = for<'a> fn(
173 object_store: &'a ObjectStore,
174 manifest: &'a mut Manifest,
175 indices: Option<Vec<Index>>,
176 path: &'a Path,
177) -> BoxFuture<'a, Result<()>>;
178
179#[derive(Debug)]
180pub struct ManifestLocation {
181 pub version: u64,
183 pub path: Path,
185 pub size: Option<u64>,
187 pub naming_scheme: ManifestNamingScheme,
189}
190
191async fn current_manifest_path(
193 object_store: &ObjectStore,
194 base: &Path,
195) -> Result<ManifestLocation> {
196 if object_store.is_local() {
197 if let Ok(Some(location)) = current_manifest_local(base) {
198 return Ok(location);
199 }
200 }
201
202 let manifest_files = object_store.inner.list(Some(&base.child(VERSIONS_DIR)));
203
204 let mut valid_manifests = manifest_files.try_filter_map(|res| {
205 if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
206 {
207 future::ready(Ok(Some((scheme, res))))
208 } else {
209 future::ready(Ok(None))
210 }
211 });
212
213 let first = valid_manifests.next().await.transpose()?;
214 match (first, object_store.list_is_lexically_ordered) {
215 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
218 let version = scheme
219 .parse_version(meta.location.filename().unwrap())
220 .unwrap();
221
222 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
226 if scheme != ManifestNamingScheme::V2 {
227 warn!(
228 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
229 to migrate the directory."
230 );
231 break;
232 }
233 let next_version = scheme
234 .parse_version(meta.location.filename().unwrap())
235 .unwrap();
236 if next_version >= version {
237 warn!(
238 "List operation was expected to be lexically ordered, but was not. This \
239 could mean a corrupt read. Please make a bug report on the lancedb/lance \
240 GitHub repository."
241 );
242 break;
243 }
244 }
245
246 Ok(ManifestLocation {
247 version,
248 path: meta.location,
249 size: Some(meta.size as u64),
250 naming_scheme: scheme,
251 })
252 }
253 (Some((scheme, meta)), _) => {
257 let mut current_version = scheme
258 .parse_version(meta.location.filename().unwrap())
259 .unwrap();
260 let mut current_meta = meta;
261
262 while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
263 if matches!(scheme, ManifestNamingScheme::V2) {
264 return Err(Error::Internal {
265 message: "Found V2 manifest in a V1 manifest directory".to_string(),
266 location: location!(),
267 });
268 }
269 let version = scheme
270 .parse_version(meta.location.filename().unwrap())
271 .unwrap();
272 if version > current_version {
273 current_version = version;
274 current_meta = meta;
275 }
276 }
277 Ok(ManifestLocation {
278 version: current_version,
279 path: current_meta.location,
280 size: Some(current_meta.size as u64),
281 naming_scheme: scheme,
282 })
283 }
284 (None, _) => Err(Error::NotFound {
285 uri: base.child(VERSIONS_DIR).to_string(),
286 location: location!(),
287 }),
288 }
289}
290
291fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
295 let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
296 let entries = std::fs::read_dir(path)?;
297
298 let mut latest_entry: Option<(u64, DirEntry)> = None;
299
300 let mut scheme: Option<ManifestNamingScheme> = None;
301
302 for entry in entries {
303 let entry = entry?;
304 let filename_raw = entry.file_name();
305 let filename = filename_raw.to_string_lossy();
306
307 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
308 continue;
311 };
312
313 if let Some(scheme) = scheme {
314 if scheme != entry_scheme {
315 return Err(io::Error::new(
316 io::ErrorKind::InvalidData,
317 format!(
318 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
319 scheme, entry_scheme
320 ),
321 ));
322 }
323 } else {
324 scheme = Some(entry_scheme);
325 }
326
327 let Some(version) = entry_scheme.parse_version(&filename) else {
328 continue;
329 };
330
331 if let Some((latest_version, _)) = &latest_entry {
332 if version > *latest_version {
333 latest_entry = Some((version, entry));
334 }
335 } else {
336 latest_entry = Some((version, entry));
337 }
338 }
339
340 if let Some((version, entry)) = latest_entry {
341 let path = Path::from_filesystem_path(entry.path())
342 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
343 Ok(Some(ManifestLocation {
344 version,
345 path,
346 size: Some(entry.metadata()?.len()),
347 naming_scheme: scheme.unwrap(),
348 }))
349 } else {
350 Ok(None)
351 }
352}
353
354async fn list_manifests<'a>(
355 base_path: &Path,
356 object_store: &'a dyn OSObjectStore,
357) -> Result<BoxStream<'a, Result<Path>>> {
358 Ok(object_store
359 .read_dir_all(&base_path.child(VERSIONS_DIR), None)
360 .await?
361 .try_filter_map(|obj_meta| {
362 if obj_meta.location.extension() == Some(MANIFEST_EXTENSION) {
363 future::ready(Ok(Some(obj_meta.location)))
364 } else {
365 future::ready(Ok(None))
366 }
367 })
368 .boxed())
369}
370
371pub fn parse_version_from_path(path: &Path) -> Result<u64> {
372 path.filename()
373 .and_then(|name| name.split_once('.'))
374 .filter(|(_, extension)| *extension == MANIFEST_EXTENSION)
375 .and_then(|(version, _)| version.parse::<u64>().ok())
376 .ok_or(Error::Internal {
377 message: format!("Expected manifest file, but found {}", path),
378 location: location!(),
379 })
380}
381
382fn make_staging_manifest_path(base: &Path) -> Result<Path> {
383 let id = uuid::Uuid::new_v4().to_string();
384 Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
385 source: Box::new(e),
386 location: location!(),
387 })
388}
389
390#[cfg(feature = "dynamodb")]
391const DDB_URL_QUERY_KEY: &str = "ddbTableName";
392
393#[async_trait::async_trait]
402pub trait CommitHandler: Debug + Send + Sync {
403 async fn resolve_latest_location(
404 &self,
405 base_path: &Path,
406 object_store: &ObjectStore,
407 ) -> Result<ManifestLocation> {
408 Ok(current_manifest_path(object_store, base_path).await?)
409 }
410
411 async fn resolve_latest_version(
413 &self,
414 base_path: &Path,
415 object_store: &ObjectStore,
416 ) -> std::result::Result<Path, Error> {
417 Ok(current_manifest_path(object_store, base_path).await?.path)
419 }
420
421 async fn resolve_latest_version_id(
423 &self,
424 base_path: &Path,
425 object_store: &ObjectStore,
426 ) -> Result<u64> {
427 Ok(current_manifest_path(object_store, base_path)
428 .await?
429 .version)
430 }
431
432 async fn resolve_version(
436 &self,
437 base_path: &Path,
438 version: u64,
439 object_store: &dyn OSObjectStore,
440 ) -> std::result::Result<Path, Error> {
441 Ok(default_resolve_version(base_path, version, object_store)
442 .await?
443 .path)
444 }
445
446 async fn resolve_version_location(
447 &self,
448 base_path: &Path,
449 version: u64,
450 object_store: &dyn OSObjectStore,
451 ) -> Result<ManifestLocation> {
452 default_resolve_version(base_path, version, object_store).await
453 }
454
455 async fn list_manifests<'a>(
457 &self,
458 base_path: &Path,
459 object_store: &'a dyn OSObjectStore,
460 ) -> Result<BoxStream<'a, Result<Path>>> {
461 list_manifests(base_path, object_store).await
462 }
463
464 async fn commit(
469 &self,
470 manifest: &mut Manifest,
471 indices: Option<Vec<Index>>,
472 base_path: &Path,
473 object_store: &ObjectStore,
474 manifest_writer: ManifestWriter,
475 naming_scheme: ManifestNamingScheme,
476 ) -> std::result::Result<Path, CommitError>;
477
478 async fn delete(&self, _base_path: &Path) -> Result<()> {
480 Ok(())
481 }
482}
483
484async fn default_resolve_version(
485 base_path: &Path,
486 version: u64,
487 object_store: &dyn OSObjectStore,
488) -> Result<ManifestLocation> {
489 if is_detached_version(version) {
490 return Ok(ManifestLocation {
491 version,
492 naming_scheme: ManifestNamingScheme::V2,
495 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
497 size: None,
498 });
499 }
500
501 let scheme = ManifestNamingScheme::V2;
503 let path = scheme.manifest_path(base_path, version);
504 match object_store.head(&path).await {
505 Ok(meta) => Ok(ManifestLocation {
506 version,
507 path,
508 size: Some(meta.size as u64),
509 naming_scheme: scheme,
510 }),
511 Err(ObjectStoreError::NotFound { .. }) => {
512 let scheme = ManifestNamingScheme::V1;
514 Ok(ManifestLocation {
515 version,
516 path: scheme.manifest_path(base_path, version),
517 size: None,
518 naming_scheme: scheme,
519 })
520 }
521 Err(e) => Err(e.into()),
522 }
523}
524#[cfg(feature = "dynamodb")]
526#[derive(Debug)]
527struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
528
529#[cfg(feature = "dynamodb")]
530impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
531 fn provide_credentials<'a>(
532 &'a self,
533 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
534 where
535 Self: 'a,
536 {
537 aws_credential_types::provider::future::ProvideCredentials::new(async {
538 let creds = self
539 .0
540 .get_credential()
541 .await
542 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
543 Ok(aws_credential_types::Credentials::new(
544 &creds.key_id,
545 &creds.secret_key,
546 creds.token.clone(),
547 Some(
548 SystemTime::now()
549 .checked_add(Duration::from_secs(
550 60 * 10, ))
552 .expect("overflow"),
553 ),
554 "",
555 ))
556 })
557 }
558}
559
560#[cfg(feature = "dynamodb")]
561async fn build_dynamodb_external_store(
562 table_name: &str,
563 creds: AwsCredentialProvider,
564 region: &str,
565 endpoint: Option<String>,
566 app_name: &str,
567) -> Result<Arc<dyn ExternalManifestStore>> {
568 use super::commit::dynamodb::DynamoDBExternalManifestStore;
569 use aws_sdk_dynamodb::{
570 config::{IdentityCache, Region},
571 Client,
572 };
573
574 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
575 .behavior_version_latest()
576 .region(Some(Region::new(region.to_string())))
577 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
578 .identity_cache(IdentityCache::no_cache());
580
581 if let Some(endpoint) = endpoint {
582 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
583 }
584 let client = Client::from_conf(dynamodb_config.build());
585
586 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
587}
588
589pub async fn commit_handler_from_url(
590 url_or_path: &str,
591 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
593) -> Result<Arc<dyn CommitHandler>> {
594 let url = match Url::parse(url_or_path) {
595 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
596 return Ok(Arc::new(RenameCommitHandler));
598 }
599 Ok(url) => url,
600 Err(_) => {
601 return Ok(Arc::new(RenameCommitHandler));
602 }
603 };
604
605 match url.scheme() {
606 "s3" => Ok(Arc::new(UnsafeCommitHandler)),
609 #[cfg(not(feature = "dynamodb"))]
610 "s3+ddb" => Err(Error::InvalidInput {
611 source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
612 location: location!(),
613 }),
614 #[cfg(feature = "dynamodb")]
615 "s3+ddb" => {
616 if url.query_pairs().count() != 1 {
617 return Err(Error::InvalidInput {
618 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
619 .into(),
620 location: location!(),
621 });
622 }
623 let table_name = match url.query_pairs().next() {
624 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
625 if key == DDB_URL_QUERY_KEY =>
626 {
627 if table_name.is_empty() {
628 return Err(Error::InvalidInput {
629 source: "`s3+ddb://` scheme requires non empty dynamodb table name"
630 .into(),
631 location: location!(),
632 });
633 }
634 table_name
635 }
636 _ => {
637 return Err(Error::InvalidInput {
638 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
639 .into(),
640 location: location!(),
641 });
642 }
643 };
644 let options = options.clone().unwrap_or_default();
645 let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
646 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
647 let storage_options = storage_options.as_s3_options();
648
649 let region = storage_options
650 .get(&AmazonS3ConfigKey::Region)
651 .map(|s| s.to_string());
652
653 let (aws_creds, region) = build_aws_credential(
654 options.s3_credentials_refresh_offset,
655 options.aws_credentials.clone(),
656 Some(&storage_options),
657 region,
658 )
659 .await?;
660
661 Ok(Arc::new(ExternalManifestCommitHandler {
662 external_manifest_store: build_dynamodb_external_store(
663 table_name,
664 aws_creds.clone(),
665 ®ion,
666 dynamo_endpoint,
667 "lancedb",
668 )
669 .await?,
670 }))
671 }
672 "gs" | "az" | "file" | "file-object-store" | "memory" => Ok(Arc::new(RenameCommitHandler)),
673 _ => Ok(Arc::new(UnsafeCommitHandler)),
674 }
675}
676
677#[cfg(feature = "dynamodb")]
678fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
679 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
680 Some(endpoint.to_string())
681 } else {
682 std::env::var("DYNAMODB_ENDPOINT").ok()
683 }
684}
685
686#[derive(Debug)]
688pub enum CommitError {
689 CommitConflict,
691 OtherError(Error),
693}
694
695impl From<Error> for CommitError {
696 fn from(e: Error) -> Self {
697 Self::OtherError(e)
698 }
699}
700
701impl From<CommitError> for Error {
702 fn from(e: CommitError) -> Self {
703 match e {
704 CommitError::CommitConflict => Self::Internal {
705 message: "Commit conflict".to_string(),
706 location: location!(),
707 },
708 CommitError::OtherError(e) => e,
709 }
710 }
711}
712
713static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
715
716pub struct UnsafeCommitHandler;
720
721#[async_trait::async_trait]
722impl CommitHandler for UnsafeCommitHandler {
723 async fn commit(
724 &self,
725 manifest: &mut Manifest,
726 indices: Option<Vec<Index>>,
727 base_path: &Path,
728 object_store: &ObjectStore,
729 manifest_writer: ManifestWriter,
730 naming_scheme: ManifestNamingScheme,
731 ) -> std::result::Result<Path, CommitError> {
732 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
734 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
735 log::warn!(
736 "Using unsafe commit handler. Concurrent writes may result in data loss. \
737 Consider providing a commit handler that prevents conflicting writes."
738 );
739 }
740
741 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
742 manifest_writer(object_store, manifest, indices, &version_path).await?;
744
745 Ok(version_path)
746 }
747}
748
749impl Debug for UnsafeCommitHandler {
750 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
751 f.debug_struct("UnsafeCommitHandler").finish()
752 }
753}
754
755#[async_trait::async_trait]
757pub trait CommitLock: Debug {
758 type Lease: CommitLease;
759
760 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
773}
774
775#[async_trait::async_trait]
776pub trait CommitLease: Send + Sync {
777 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
779}
780
781#[async_trait::async_trait]
782impl<T: CommitLock + Send + Sync> CommitHandler for T {
783 async fn commit(
784 &self,
785 manifest: &mut Manifest,
786 indices: Option<Vec<Index>>,
787 base_path: &Path,
788 object_store: &ObjectStore,
789 manifest_writer: ManifestWriter,
790 naming_scheme: ManifestNamingScheme,
791 ) -> std::result::Result<Path, CommitError> {
792 let path = naming_scheme.manifest_path(base_path, manifest.version);
793 let lease = self.lock(manifest.version).await?;
796
797 match object_store.inner.head(&path).await {
799 Ok(_) => {
800 lease.release(false).await?;
803
804 return Err(CommitError::CommitConflict);
805 }
806 Err(ObjectStoreError::NotFound { .. }) => {}
807 Err(e) => {
808 lease.release(false).await?;
811
812 return Err(CommitError::OtherError(e.into()));
813 }
814 }
815 let res = manifest_writer(object_store, manifest, indices, &path).await;
816
817 lease.release(res.is_ok()).await?;
819
820 res.map_err(|err| err.into()).map(|_| path)
821 }
822}
823
824#[async_trait::async_trait]
825impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
826 async fn commit(
827 &self,
828 manifest: &mut Manifest,
829 indices: Option<Vec<Index>>,
830 base_path: &Path,
831 object_store: &ObjectStore,
832 manifest_writer: ManifestWriter,
833 naming_scheme: ManifestNamingScheme,
834 ) -> std::result::Result<Path, CommitError> {
835 self.as_ref()
836 .commit(
837 manifest,
838 indices,
839 base_path,
840 object_store,
841 manifest_writer,
842 naming_scheme,
843 )
844 .await
845 }
846}
847
848pub struct RenameCommitHandler;
852
853#[async_trait::async_trait]
854impl CommitHandler for RenameCommitHandler {
855 async fn commit(
856 &self,
857 manifest: &mut Manifest,
858 indices: Option<Vec<Index>>,
859 base_path: &Path,
860 object_store: &ObjectStore,
861 manifest_writer: ManifestWriter,
862 naming_scheme: ManifestNamingScheme,
863 ) -> std::result::Result<Path, CommitError> {
864 let path = naming_scheme.manifest_path(base_path, manifest.version);
868 let tmp_path = make_staging_manifest_path(&path)?;
869
870 manifest_writer(object_store, manifest, indices, &tmp_path).await?;
872
873 match object_store
874 .inner
875 .rename_if_not_exists(&tmp_path, &path)
876 .await
877 {
878 Ok(_) => Ok(path),
879 Err(ObjectStoreError::AlreadyExists { .. }) => {
880 let _ = object_store.delete(&tmp_path).await;
883
884 return Err(CommitError::CommitConflict);
885 }
886 Err(e) => {
887 return Err(CommitError::OtherError(e.into()));
889 }
890 }
891 }
892}
893
894impl Debug for RenameCommitHandler {
895 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
896 f.debug_struct("RenameCommitHandler").finish()
897 }
898}
899
900#[derive(Debug, Clone)]
901pub struct CommitConfig {
902 pub num_retries: u32,
903 }
905
906impl Default for CommitConfig {
907 fn default() -> Self {
908 Self { num_retries: 20 }
909 }
910}
911
912#[cfg(test)]
913mod tests {
914 use super::*;
915
916 #[test]
917 fn test_manifest_naming_scheme() {
918 let v1 = ManifestNamingScheme::V1;
919 let v2 = ManifestNamingScheme::V2;
920
921 assert_eq!(
922 v1.manifest_path(&Path::from("base"), 0),
923 Path::from("base/_versions/0.manifest")
924 );
925 assert_eq!(
926 v1.manifest_path(&Path::from("base"), 42),
927 Path::from("base/_versions/42.manifest")
928 );
929
930 assert_eq!(
931 v2.manifest_path(&Path::from("base"), 0),
932 Path::from("base/_versions/18446744073709551615.manifest")
933 );
934 assert_eq!(
935 v2.manifest_path(&Path::from("base"), 42),
936 Path::from("base/_versions/18446744073709551573.manifest")
937 );
938
939 assert_eq!(v1.parse_version("0.manifest"), Some(0));
940 assert_eq!(v1.parse_version("42.manifest"), Some(42));
941 assert_eq!(
942 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
943 Some(42)
944 );
945
946 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
947 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
948 assert_eq!(
949 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
950 Some(42)
951 );
952
953 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
954 assert_eq!(
955 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
956 Some(v2)
957 );
958 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
959 }
960
961 #[tokio::test]
962 async fn test_manifest_naming_migration() {
963 let object_store = ObjectStore::memory();
964 let base = Path::from("base");
965 let versions_dir = base.child(VERSIONS_DIR);
966
967 let original_files = vec![
969 versions_dir.child("irrelevant"),
970 ManifestNamingScheme::V1.manifest_path(&base, 0),
971 ManifestNamingScheme::V2.manifest_path(&base, 1),
972 ];
973 for path in original_files {
974 object_store.put(&path, b"".as_slice()).await.unwrap();
975 }
976
977 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
978
979 let expected_files = vec![
980 ManifestNamingScheme::V2.manifest_path(&base, 1),
981 ManifestNamingScheme::V2.manifest_path(&base, 0),
982 versions_dir.child("irrelevant"),
983 ];
984 let actual_files = object_store
985 .inner
986 .list(Some(&versions_dir))
987 .map_ok(|res| res.location)
988 .try_collect::<Vec<_>>()
989 .await
990 .unwrap();
991 assert_eq!(actual_files, expected_files);
992 }
993}