1use std::io;
26use std::sync::atomic::AtomicBool;
27use std::sync::Arc;
28use std::{fmt::Debug, fs::DirEntry};
29
30use futures::{
31 future::{self, BoxFuture},
32 stream::BoxStream,
33 StreamExt, TryStreamExt,
34};
35use log::warn;
36use object_store::PutOptions;
37use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
38use snafu::location;
39use url::Url;
40
41#[cfg(feature = "dynamodb")]
42pub mod dynamodb;
43pub mod external_manifest;
44
45use lance_core::{Error, Result};
46use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
47
48#[cfg(feature = "dynamodb")]
49use {
50 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
51 aws_credential_types::provider::error::CredentialsError,
52 aws_credential_types::provider::ProvideCredentials,
53 lance_io::object_store::{build_aws_credential, StorageOptions},
54 object_store::aws::AmazonS3ConfigKey,
55 object_store::aws::AwsCredentialProvider,
56 std::borrow::Cow,
57 std::time::{Duration, SystemTime},
58};
59
60use crate::format::{is_detached_version, Index, Manifest};
61
62const VERSIONS_DIR: &str = "_versions";
63const MANIFEST_EXTENSION: &str = "manifest";
64const DETACHED_VERSION_PREFIX: &str = "d";
65
66#[derive(Clone, Copy, Debug, PartialEq, Eq)]
68pub enum ManifestNamingScheme {
69 V1,
71 V2,
75}
76
77impl ManifestNamingScheme {
78 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
79 let directory = base.child(VERSIONS_DIR);
80 if is_detached_version(version) {
81 let directory = base.child(VERSIONS_DIR);
86 directory.child(format!(
87 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
88 ))
89 } else {
90 match self {
91 Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
92 Self::V2 => {
93 let inverted_version = u64::MAX - version;
94 directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
95 }
96 }
97 }
98 }
99
100 pub fn parse_version(&self, filename: &str) -> Option<u64> {
101 let file_number = filename
102 .split_once('.')
103 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
105 match self {
106 Self::V1 => file_number,
107 Self::V2 => file_number.map(|v| u64::MAX - v),
108 }
109 }
110
111 pub fn detect_scheme(filename: &str) -> Option<Self> {
112 if filename.starts_with(DETACHED_VERSION_PREFIX) {
113 return Some(Self::V2);
115 }
116 if filename.ends_with(MANIFEST_EXTENSION) {
117 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
118 if filename.len() == V2_LEN {
119 Some(Self::V2)
120 } else {
121 Some(Self::V1)
122 }
123 } else {
124 None
125 }
126 }
127
128 pub fn detect_scheme_staging(filename: &str) -> Self {
129 if filename.chars().nth(20) == Some('.') {
132 Self::V2
133 } else {
134 Self::V1
135 }
136 }
137}
138
139pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
149 object_store
150 .inner
151 .list(Some(&dataset_base.child(VERSIONS_DIR)))
152 .try_filter(|res| {
153 let res = if let Some(filename) = res.location.filename() {
154 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
155 } else {
156 false
157 };
158 future::ready(res)
159 })
160 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
161 let filename = meta.location.filename().unwrap();
162 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
163 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
164 object_store.inner.rename(&meta.location, &path).await?;
165 Ok(())
166 })
167 .await?;
168
169 Ok(())
170}
171
172pub type ManifestWriter = for<'a> fn(
176 object_store: &'a ObjectStore,
177 manifest: &'a mut Manifest,
178 indices: Option<Vec<Index>>,
179 path: &'a Path,
180) -> BoxFuture<'a, Result<u64>>;
181
182#[derive(Debug)]
183pub struct ManifestLocation {
184 pub version: u64,
186 pub path: Path,
188 pub size: Option<u64>,
190 pub naming_scheme: ManifestNamingScheme,
192}
193
194async fn current_manifest_path(
196 object_store: &ObjectStore,
197 base: &Path,
198) -> Result<ManifestLocation> {
199 if object_store.is_local() {
200 if let Ok(Some(location)) = current_manifest_local(base) {
201 return Ok(location);
202 }
203 }
204
205 let manifest_files = object_store.inner.list(Some(&base.child(VERSIONS_DIR)));
206
207 let mut valid_manifests = manifest_files.try_filter_map(|res| {
208 if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
209 {
210 future::ready(Ok(Some((scheme, res))))
211 } else {
212 future::ready(Ok(None))
213 }
214 });
215
216 let first = valid_manifests.next().await.transpose()?;
217 match (first, object_store.list_is_lexically_ordered) {
218 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
221 let version = scheme
222 .parse_version(meta.location.filename().unwrap())
223 .unwrap();
224
225 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
229 if scheme != ManifestNamingScheme::V2 {
230 warn!(
231 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
232 to migrate the directory."
233 );
234 break;
235 }
236 let next_version = scheme
237 .parse_version(meta.location.filename().unwrap())
238 .unwrap();
239 if next_version >= version {
240 warn!(
241 "List operation was expected to be lexically ordered, but was not. This \
242 could mean a corrupt read. Please make a bug report on the lancedb/lance \
243 GitHub repository."
244 );
245 break;
246 }
247 }
248
249 Ok(ManifestLocation {
250 version,
251 path: meta.location,
252 size: Some(meta.size as u64),
253 naming_scheme: scheme,
254 })
255 }
256 (Some((scheme, meta)), _) => {
260 let mut current_version = scheme
261 .parse_version(meta.location.filename().unwrap())
262 .unwrap();
263 let mut current_meta = meta;
264
265 while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
266 if matches!(scheme, ManifestNamingScheme::V2) {
267 return Err(Error::Internal {
268 message: "Found V2 manifest in a V1 manifest directory".to_string(),
269 location: location!(),
270 });
271 }
272 let version = scheme
273 .parse_version(meta.location.filename().unwrap())
274 .unwrap();
275 if version > current_version {
276 current_version = version;
277 current_meta = meta;
278 }
279 }
280 Ok(ManifestLocation {
281 version: current_version,
282 path: current_meta.location,
283 size: Some(current_meta.size as u64),
284 naming_scheme: scheme,
285 })
286 }
287 (None, _) => Err(Error::NotFound {
288 uri: base.child(VERSIONS_DIR).to_string(),
289 location: location!(),
290 }),
291 }
292}
293
294fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
298 let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
299 let entries = std::fs::read_dir(path)?;
300
301 let mut latest_entry: Option<(u64, DirEntry)> = None;
302
303 let mut scheme: Option<ManifestNamingScheme> = None;
304
305 for entry in entries {
306 let entry = entry?;
307 let filename_raw = entry.file_name();
308 let filename = filename_raw.to_string_lossy();
309
310 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
311 continue;
314 };
315
316 if let Some(scheme) = scheme {
317 if scheme != entry_scheme {
318 return Err(io::Error::new(
319 io::ErrorKind::InvalidData,
320 format!(
321 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
322 scheme, entry_scheme
323 ),
324 ));
325 }
326 } else {
327 scheme = Some(entry_scheme);
328 }
329
330 let Some(version) = entry_scheme.parse_version(&filename) else {
331 continue;
332 };
333
334 if let Some((latest_version, _)) = &latest_entry {
335 if version > *latest_version {
336 latest_entry = Some((version, entry));
337 }
338 } else {
339 latest_entry = Some((version, entry));
340 }
341 }
342
343 if let Some((version, entry)) = latest_entry {
344 let path = Path::from_filesystem_path(entry.path())
345 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
346 Ok(Some(ManifestLocation {
347 version,
348 path,
349 size: Some(entry.metadata()?.len()),
350 naming_scheme: scheme.unwrap(),
351 }))
352 } else {
353 Ok(None)
354 }
355}
356
357async fn list_manifests<'a>(
358 base_path: &Path,
359 object_store: &'a dyn OSObjectStore,
360) -> Result<BoxStream<'a, Result<Path>>> {
361 Ok(object_store
362 .read_dir_all(&base_path.child(VERSIONS_DIR), None)
363 .await?
364 .try_filter_map(|obj_meta| {
365 if obj_meta.location.extension() == Some(MANIFEST_EXTENSION) {
366 future::ready(Ok(Some(obj_meta.location)))
367 } else {
368 future::ready(Ok(None))
369 }
370 })
371 .boxed())
372}
373
374pub fn parse_version_from_path(path: &Path) -> Result<u64> {
375 path.filename()
376 .and_then(|name| name.split_once('.'))
377 .filter(|(_, extension)| *extension == MANIFEST_EXTENSION)
378 .and_then(|(version, _)| version.parse::<u64>().ok())
379 .ok_or(Error::Internal {
380 message: format!("Expected manifest file, but found {}", path),
381 location: location!(),
382 })
383}
384
385fn make_staging_manifest_path(base: &Path) -> Result<Path> {
386 let id = uuid::Uuid::new_v4().to_string();
387 Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
388 source: Box::new(e),
389 location: location!(),
390 })
391}
392
393#[cfg(feature = "dynamodb")]
394const DDB_URL_QUERY_KEY: &str = "ddbTableName";
395
396#[async_trait::async_trait]
405pub trait CommitHandler: Debug + Send + Sync {
406 async fn resolve_latest_location(
407 &self,
408 base_path: &Path,
409 object_store: &ObjectStore,
410 ) -> Result<ManifestLocation> {
411 Ok(current_manifest_path(object_store, base_path).await?)
412 }
413
414 async fn resolve_latest_version(
416 &self,
417 base_path: &Path,
418 object_store: &ObjectStore,
419 ) -> std::result::Result<Path, Error> {
420 Ok(current_manifest_path(object_store, base_path).await?.path)
422 }
423
424 async fn resolve_latest_version_id(
426 &self,
427 base_path: &Path,
428 object_store: &ObjectStore,
429 ) -> Result<u64> {
430 Ok(current_manifest_path(object_store, base_path)
431 .await?
432 .version)
433 }
434
435 async fn resolve_version(
439 &self,
440 base_path: &Path,
441 version: u64,
442 object_store: &dyn OSObjectStore,
443 ) -> std::result::Result<Path, Error> {
444 Ok(default_resolve_version(base_path, version, object_store)
445 .await?
446 .path)
447 }
448
449 async fn resolve_version_location(
450 &self,
451 base_path: &Path,
452 version: u64,
453 object_store: &dyn OSObjectStore,
454 ) -> Result<ManifestLocation> {
455 default_resolve_version(base_path, version, object_store).await
456 }
457
458 async fn list_manifests<'a>(
460 &self,
461 base_path: &Path,
462 object_store: &'a dyn OSObjectStore,
463 ) -> Result<BoxStream<'a, Result<Path>>> {
464 list_manifests(base_path, object_store).await
465 }
466
467 async fn commit(
472 &self,
473 manifest: &mut Manifest,
474 indices: Option<Vec<Index>>,
475 base_path: &Path,
476 object_store: &ObjectStore,
477 manifest_writer: ManifestWriter,
478 naming_scheme: ManifestNamingScheme,
479 ) -> std::result::Result<Path, CommitError>;
480
481 async fn delete(&self, _base_path: &Path) -> Result<()> {
483 Ok(())
484 }
485}
486
487async fn default_resolve_version(
488 base_path: &Path,
489 version: u64,
490 object_store: &dyn OSObjectStore,
491) -> Result<ManifestLocation> {
492 if is_detached_version(version) {
493 return Ok(ManifestLocation {
494 version,
495 naming_scheme: ManifestNamingScheme::V2,
498 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
500 size: None,
501 });
502 }
503
504 let scheme = ManifestNamingScheme::V2;
506 let path = scheme.manifest_path(base_path, version);
507 match object_store.head(&path).await {
508 Ok(meta) => Ok(ManifestLocation {
509 version,
510 path,
511 size: Some(meta.size as u64),
512 naming_scheme: scheme,
513 }),
514 Err(ObjectStoreError::NotFound { .. }) => {
515 let scheme = ManifestNamingScheme::V1;
517 Ok(ManifestLocation {
518 version,
519 path: scheme.manifest_path(base_path, version),
520 size: None,
521 naming_scheme: scheme,
522 })
523 }
524 Err(e) => Err(e.into()),
525 }
526}
527#[cfg(feature = "dynamodb")]
529#[derive(Debug)]
530struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
531
532#[cfg(feature = "dynamodb")]
533impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
534 fn provide_credentials<'a>(
535 &'a self,
536 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
537 where
538 Self: 'a,
539 {
540 aws_credential_types::provider::future::ProvideCredentials::new(async {
541 let creds = self
542 .0
543 .get_credential()
544 .await
545 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
546 Ok(aws_credential_types::Credentials::new(
547 &creds.key_id,
548 &creds.secret_key,
549 creds.token.clone(),
550 Some(
551 SystemTime::now()
552 .checked_add(Duration::from_secs(
553 60 * 10, ))
555 .expect("overflow"),
556 ),
557 "",
558 ))
559 })
560 }
561}
562
563#[cfg(feature = "dynamodb")]
564async fn build_dynamodb_external_store(
565 table_name: &str,
566 creds: AwsCredentialProvider,
567 region: &str,
568 endpoint: Option<String>,
569 app_name: &str,
570) -> Result<Arc<dyn ExternalManifestStore>> {
571 use super::commit::dynamodb::DynamoDBExternalManifestStore;
572 use aws_sdk_dynamodb::{
573 config::{IdentityCache, Region},
574 Client,
575 };
576
577 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
578 .behavior_version_latest()
579 .region(Some(Region::new(region.to_string())))
580 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
581 .identity_cache(IdentityCache::no_cache());
583
584 if let Some(endpoint) = endpoint {
585 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
586 }
587 let client = Client::from_conf(dynamodb_config.build());
588
589 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
590}
591
592pub async fn commit_handler_from_url(
593 url_or_path: &str,
594 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
596) -> Result<Arc<dyn CommitHandler>> {
597 let url = match Url::parse(url_or_path) {
598 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
599 return Ok(Arc::new(RenameCommitHandler));
601 }
602 Ok(url) => url,
603 Err(_) => {
604 return Ok(Arc::new(RenameCommitHandler));
605 }
606 };
607
608 match url.scheme() {
609 "s3" | "gs" | "az" | "memory" | "file" | "file-object-store" => {
610 Ok(Arc::new(ConditionalPutCommitHandler))
611 }
612 #[cfg(not(feature = "dynamodb"))]
613 "s3+ddb" => Err(Error::InvalidInput {
614 source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
615 location: location!(),
616 }),
617 #[cfg(feature = "dynamodb")]
618 "s3+ddb" => {
619 if url.query_pairs().count() != 1 {
620 return Err(Error::InvalidInput {
621 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
622 .into(),
623 location: location!(),
624 });
625 }
626 let table_name = match url.query_pairs().next() {
627 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
628 if key == DDB_URL_QUERY_KEY =>
629 {
630 if table_name.is_empty() {
631 return Err(Error::InvalidInput {
632 source: "`s3+ddb://` scheme requires non empty dynamodb table name"
633 .into(),
634 location: location!(),
635 });
636 }
637 table_name
638 }
639 _ => {
640 return Err(Error::InvalidInput {
641 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
642 .into(),
643 location: location!(),
644 });
645 }
646 };
647 let options = options.clone().unwrap_or_default();
648 let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
649 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
650 let storage_options = storage_options.as_s3_options();
651
652 let region = storage_options
653 .get(&AmazonS3ConfigKey::Region)
654 .map(|s| s.to_string());
655
656 let (aws_creds, region) = build_aws_credential(
657 options.s3_credentials_refresh_offset,
658 options.aws_credentials.clone(),
659 Some(&storage_options),
660 region,
661 )
662 .await?;
663
664 Ok(Arc::new(ExternalManifestCommitHandler {
665 external_manifest_store: build_dynamodb_external_store(
666 table_name,
667 aws_creds.clone(),
668 ®ion,
669 dynamo_endpoint,
670 "lancedb",
671 )
672 .await?,
673 }))
674 }
675 _ => Ok(Arc::new(UnsafeCommitHandler)),
676 }
677}
678
679#[cfg(feature = "dynamodb")]
680fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
681 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
682 Some(endpoint.to_string())
683 } else {
684 std::env::var("DYNAMODB_ENDPOINT").ok()
685 }
686}
687
688#[derive(Debug)]
690pub enum CommitError {
691 CommitConflict,
693 OtherError(Error),
695}
696
697impl From<Error> for CommitError {
698 fn from(e: Error) -> Self {
699 Self::OtherError(e)
700 }
701}
702
703impl From<CommitError> for Error {
704 fn from(e: CommitError) -> Self {
705 match e {
706 CommitError::CommitConflict => Self::Internal {
707 message: "Commit conflict".to_string(),
708 location: location!(),
709 },
710 CommitError::OtherError(e) => e,
711 }
712 }
713}
714
715static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
717
718pub struct UnsafeCommitHandler;
722
723#[async_trait::async_trait]
724impl CommitHandler for UnsafeCommitHandler {
725 async fn commit(
726 &self,
727 manifest: &mut Manifest,
728 indices: Option<Vec<Index>>,
729 base_path: &Path,
730 object_store: &ObjectStore,
731 manifest_writer: ManifestWriter,
732 naming_scheme: ManifestNamingScheme,
733 ) -> std::result::Result<Path, CommitError> {
734 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
736 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
737 log::warn!(
738 "Using unsafe commit handler. Concurrent writes may result in data loss. \
739 Consider providing a commit handler that prevents conflicting writes."
740 );
741 }
742
743 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
744 manifest_writer(object_store, manifest, indices, &version_path).await?;
746
747 Ok(version_path)
748 }
749}
750
751impl Debug for UnsafeCommitHandler {
752 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753 f.debug_struct("UnsafeCommitHandler").finish()
754 }
755}
756
757#[async_trait::async_trait]
759pub trait CommitLock: Debug {
760 type Lease: CommitLease;
761
762 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
775}
776
777#[async_trait::async_trait]
778pub trait CommitLease: Send + Sync {
779 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
781}
782
783#[async_trait::async_trait]
784impl<T: CommitLock + Send + Sync> CommitHandler for T {
785 async fn commit(
786 &self,
787 manifest: &mut Manifest,
788 indices: Option<Vec<Index>>,
789 base_path: &Path,
790 object_store: &ObjectStore,
791 manifest_writer: ManifestWriter,
792 naming_scheme: ManifestNamingScheme,
793 ) -> std::result::Result<Path, CommitError> {
794 let path = naming_scheme.manifest_path(base_path, manifest.version);
795 let lease = self.lock(manifest.version).await?;
798
799 match object_store.inner.head(&path).await {
801 Ok(_) => {
802 lease.release(false).await?;
805
806 return Err(CommitError::CommitConflict);
807 }
808 Err(ObjectStoreError::NotFound { .. }) => {}
809 Err(e) => {
810 lease.release(false).await?;
813
814 return Err(CommitError::OtherError(e.into()));
815 }
816 }
817 let res = manifest_writer(object_store, manifest, indices, &path).await;
818
819 lease.release(res.is_ok()).await?;
821
822 res.map_err(|err| err.into()).map(|_| path)
823 }
824}
825
826#[async_trait::async_trait]
827impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
828 async fn commit(
829 &self,
830 manifest: &mut Manifest,
831 indices: Option<Vec<Index>>,
832 base_path: &Path,
833 object_store: &ObjectStore,
834 manifest_writer: ManifestWriter,
835 naming_scheme: ManifestNamingScheme,
836 ) -> std::result::Result<Path, CommitError> {
837 self.as_ref()
838 .commit(
839 manifest,
840 indices,
841 base_path,
842 object_store,
843 manifest_writer,
844 naming_scheme,
845 )
846 .await
847 }
848}
849
850pub struct RenameCommitHandler;
854
855#[async_trait::async_trait]
856impl CommitHandler for RenameCommitHandler {
857 async fn commit(
858 &self,
859 manifest: &mut Manifest,
860 indices: Option<Vec<Index>>,
861 base_path: &Path,
862 object_store: &ObjectStore,
863 manifest_writer: ManifestWriter,
864 naming_scheme: ManifestNamingScheme,
865 ) -> std::result::Result<Path, CommitError> {
866 let path = naming_scheme.manifest_path(base_path, manifest.version);
870 let tmp_path = make_staging_manifest_path(&path)?;
871
872 manifest_writer(object_store, manifest, indices, &tmp_path).await?;
874
875 match object_store
876 .inner
877 .rename_if_not_exists(&tmp_path, &path)
878 .await
879 {
880 Ok(_) => Ok(path),
881 Err(ObjectStoreError::AlreadyExists { .. }) => {
882 let _ = object_store.delete(&tmp_path).await;
885
886 return Err(CommitError::CommitConflict);
887 }
888 Err(e) => {
889 return Err(CommitError::OtherError(e.into()));
891 }
892 }
893 }
894}
895
896impl Debug for RenameCommitHandler {
897 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
898 f.debug_struct("RenameCommitHandler").finish()
899 }
900}
901
902pub struct ConditionalPutCommitHandler;
903
904#[async_trait::async_trait]
905impl CommitHandler for ConditionalPutCommitHandler {
906 async fn commit(
907 &self,
908 manifest: &mut Manifest,
909 indices: Option<Vec<Index>>,
910 base_path: &Path,
911 object_store: &ObjectStore,
912 manifest_writer: ManifestWriter,
913 naming_scheme: ManifestNamingScheme,
914 ) -> std::result::Result<Path, CommitError> {
915 let path = naming_scheme.manifest_path(base_path, manifest.version);
916
917 let memory_store = ObjectStore::memory();
918 let dummy_path = "dummy";
919 manifest_writer(&memory_store, manifest, indices, &dummy_path.into()).await?;
920 let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
921 object_store
922 .inner
923 .put_opts(
924 &path,
925 dummy_data.into(),
926 PutOptions {
927 mode: object_store::PutMode::Create,
928 ..Default::default()
929 },
930 )
931 .await
932 .map_err(|err| match err {
933 ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
934 CommitError::CommitConflict
935 }
936 _ => CommitError::OtherError(err.into()),
937 })?;
938
939 Ok(path)
940 }
941}
942
943impl Debug for ConditionalPutCommitHandler {
944 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
945 f.debug_struct("ConditionalPutCommitHandler").finish()
946 }
947}
948
949#[derive(Debug, Clone)]
950pub struct CommitConfig {
951 pub num_retries: u32,
952 }
954
955impl Default for CommitConfig {
956 fn default() -> Self {
957 Self { num_retries: 20 }
958 }
959}
960
961#[cfg(test)]
962mod tests {
963 use super::*;
964
965 #[test]
966 fn test_manifest_naming_scheme() {
967 let v1 = ManifestNamingScheme::V1;
968 let v2 = ManifestNamingScheme::V2;
969
970 assert_eq!(
971 v1.manifest_path(&Path::from("base"), 0),
972 Path::from("base/_versions/0.manifest")
973 );
974 assert_eq!(
975 v1.manifest_path(&Path::from("base"), 42),
976 Path::from("base/_versions/42.manifest")
977 );
978
979 assert_eq!(
980 v2.manifest_path(&Path::from("base"), 0),
981 Path::from("base/_versions/18446744073709551615.manifest")
982 );
983 assert_eq!(
984 v2.manifest_path(&Path::from("base"), 42),
985 Path::from("base/_versions/18446744073709551573.manifest")
986 );
987
988 assert_eq!(v1.parse_version("0.manifest"), Some(0));
989 assert_eq!(v1.parse_version("42.manifest"), Some(42));
990 assert_eq!(
991 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
992 Some(42)
993 );
994
995 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
996 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
997 assert_eq!(
998 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
999 Some(42)
1000 );
1001
1002 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1003 assert_eq!(
1004 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1005 Some(v2)
1006 );
1007 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1008 }
1009
1010 #[tokio::test]
1011 async fn test_manifest_naming_migration() {
1012 let object_store = ObjectStore::memory();
1013 let base = Path::from("base");
1014 let versions_dir = base.child(VERSIONS_DIR);
1015
1016 let original_files = vec![
1018 versions_dir.child("irrelevant"),
1019 ManifestNamingScheme::V1.manifest_path(&base, 0),
1020 ManifestNamingScheme::V2.manifest_path(&base, 1),
1021 ];
1022 for path in original_files {
1023 object_store.put(&path, b"".as_slice()).await.unwrap();
1024 }
1025
1026 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1027
1028 let expected_files = vec![
1029 ManifestNamingScheme::V2.manifest_path(&base, 1),
1030 ManifestNamingScheme::V2.manifest_path(&base, 0),
1031 versions_dir.child("irrelevant"),
1032 ];
1033 let actual_files = object_store
1034 .inner
1035 .list(Some(&versions_dir))
1036 .map_ok(|res| res.location)
1037 .try_collect::<Vec<_>>()
1038 .await
1039 .unwrap();
1040 assert_eq!(actual_files, expected_files);
1041 }
1042}