1use std::io;
26use std::sync::atomic::AtomicBool;
27use std::sync::Arc;
28use std::{fmt::Debug, fs::DirEntry};
29
30use futures::{
31 future::{self, BoxFuture},
32 stream::BoxStream,
33 StreamExt, TryStreamExt,
34};
35use lance_io::object_writer::WriteResult;
36use log::warn;
37use object_store::PutOptions;
38use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
39use snafu::location;
40use url::Url;
41
42#[cfg(feature = "dynamodb")]
43pub mod dynamodb;
44pub mod external_manifest;
45
46use lance_core::{Error, Result};
47use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
48
49#[cfg(feature = "dynamodb")]
50use {
51 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
52 aws_credential_types::provider::error::CredentialsError,
53 aws_credential_types::provider::ProvideCredentials,
54 lance_io::object_store::{providers::aws::build_aws_credential, StorageOptions},
55 object_store::aws::AmazonS3ConfigKey,
56 object_store::aws::AwsCredentialProvider,
57 std::borrow::Cow,
58 std::time::{Duration, SystemTime},
59};
60
61use crate::format::{is_detached_version, Index, Manifest};
62
63const VERSIONS_DIR: &str = "_versions";
64const MANIFEST_EXTENSION: &str = "manifest";
65const DETACHED_VERSION_PREFIX: &str = "d";
66
67#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69pub enum ManifestNamingScheme {
70 V1,
72 V2,
76}
77
78impl ManifestNamingScheme {
79 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
80 let directory = base.child(VERSIONS_DIR);
81 if is_detached_version(version) {
82 let directory = base.child(VERSIONS_DIR);
87 directory.child(format!(
88 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
89 ))
90 } else {
91 match self {
92 Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
93 Self::V2 => {
94 let inverted_version = u64::MAX - version;
95 directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
96 }
97 }
98 }
99 }
100
101 pub fn parse_version(&self, filename: &str) -> Option<u64> {
102 let file_number = filename
103 .split_once('.')
104 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
106 match self {
107 Self::V1 => file_number,
108 Self::V2 => file_number.map(|v| u64::MAX - v),
109 }
110 }
111
112 pub fn detect_scheme(filename: &str) -> Option<Self> {
113 if filename.starts_with(DETACHED_VERSION_PREFIX) {
114 return Some(Self::V2);
116 }
117 if filename.ends_with(MANIFEST_EXTENSION) {
118 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
119 if filename.len() == V2_LEN {
120 Some(Self::V2)
121 } else {
122 Some(Self::V1)
123 }
124 } else {
125 None
126 }
127 }
128
129 pub fn detect_scheme_staging(filename: &str) -> Self {
130 if filename.chars().nth(20) == Some('.') {
133 Self::V2
134 } else {
135 Self::V1
136 }
137 }
138}
139
140pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
150 object_store
151 .inner
152 .list(Some(&dataset_base.child(VERSIONS_DIR)))
153 .try_filter(|res| {
154 let res = if let Some(filename) = res.location.filename() {
155 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
156 } else {
157 false
158 };
159 future::ready(res)
160 })
161 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
162 let filename = meta.location.filename().unwrap();
163 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
164 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
165 object_store.inner.rename(&meta.location, &path).await?;
166 Ok(())
167 })
168 .await?;
169
170 Ok(())
171}
172
173pub type ManifestWriter = for<'a> fn(
177 object_store: &'a ObjectStore,
178 manifest: &'a mut Manifest,
179 indices: Option<Vec<Index>>,
180 path: &'a Path,
181) -> BoxFuture<'a, Result<WriteResult>>;
182
183#[derive(Debug, Clone)]
184pub struct ManifestLocation {
185 pub version: u64,
187 pub path: Path,
189 pub size: Option<u64>,
191 pub naming_scheme: ManifestNamingScheme,
193 pub e_tag: Option<String>,
197}
198
199impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
200 type Error = Error;
201
202 fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
203 let filename = meta.location.filename().ok_or_else(|| Error::Internal {
204 message: "ObjectMeta location does not have a filename".to_string(),
205 location: location!(),
206 })?;
207 let scheme =
208 ManifestNamingScheme::detect_scheme(filename).ok_or_else(|| Error::Internal {
209 message: format!("Invalid manifest filename: '{}'", filename),
210 location: location!(),
211 })?;
212 let version = scheme
213 .parse_version(filename)
214 .ok_or_else(|| Error::Internal {
215 message: format!("Invalid manifest filename: '{}'", filename),
216 location: location!(),
217 })?;
218 Ok(Self {
219 version,
220 path: meta.location,
221 size: Some(meta.size as u64),
222 naming_scheme: scheme,
223 e_tag: meta.e_tag,
224 })
225 }
226}
227
228async fn current_manifest_path(
230 object_store: &ObjectStore,
231 base: &Path,
232) -> Result<ManifestLocation> {
233 if object_store.is_local() {
234 if let Ok(Some(location)) = current_manifest_local(base) {
235 return Ok(location);
236 }
237 }
238
239 let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
240
241 let mut valid_manifests = manifest_files.try_filter_map(|res| {
242 if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
243 {
244 future::ready(Ok(Some((scheme, res))))
245 } else {
246 future::ready(Ok(None))
247 }
248 });
249
250 let first = valid_manifests.next().await.transpose()?;
251 match (first, object_store.list_is_lexically_ordered) {
252 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
255 let version = scheme
256 .parse_version(meta.location.filename().unwrap())
257 .unwrap();
258
259 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
263 if scheme != ManifestNamingScheme::V2 {
264 warn!(
265 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
266 to migrate the directory."
267 );
268 break;
269 }
270 let next_version = scheme
271 .parse_version(meta.location.filename().unwrap())
272 .unwrap();
273 if next_version >= version {
274 warn!(
275 "List operation was expected to be lexically ordered, but was not. This \
276 could mean a corrupt read. Please make a bug report on the lancedb/lance \
277 GitHub repository."
278 );
279 break;
280 }
281 }
282
283 Ok(ManifestLocation {
284 version,
285 path: meta.location,
286 size: Some(meta.size as u64),
287 naming_scheme: scheme,
288 e_tag: meta.e_tag,
289 })
290 }
291 (Some((scheme, meta)), _) => {
295 let mut current_version = scheme
296 .parse_version(meta.location.filename().unwrap())
297 .unwrap();
298 let mut current_meta = meta;
299
300 while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
301 if matches!(scheme, ManifestNamingScheme::V2) {
302 return Err(Error::Internal {
303 message: "Found V2 manifest in a V1 manifest directory".to_string(),
304 location: location!(),
305 });
306 }
307 let version = scheme
308 .parse_version(meta.location.filename().unwrap())
309 .unwrap();
310 if version > current_version {
311 current_version = version;
312 current_meta = meta;
313 }
314 }
315 Ok(ManifestLocation {
316 version: current_version,
317 path: current_meta.location,
318 size: Some(current_meta.size as u64),
319 naming_scheme: scheme,
320 e_tag: current_meta.e_tag,
321 })
322 }
323 (None, _) => Err(Error::NotFound {
324 uri: base.child(VERSIONS_DIR).to_string(),
325 location: location!(),
326 }),
327 }
328}
329
330fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
334 let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
335 let entries = std::fs::read_dir(path)?;
336
337 let mut latest_entry: Option<(u64, DirEntry)> = None;
338
339 let mut scheme: Option<ManifestNamingScheme> = None;
340
341 for entry in entries {
342 let entry = entry?;
343 let filename_raw = entry.file_name();
344 let filename = filename_raw.to_string_lossy();
345
346 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
347 continue;
350 };
351
352 if let Some(scheme) = scheme {
353 if scheme != entry_scheme {
354 return Err(io::Error::new(
355 io::ErrorKind::InvalidData,
356 format!(
357 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
358 scheme, entry_scheme
359 ),
360 ));
361 }
362 } else {
363 scheme = Some(entry_scheme);
364 }
365
366 let Some(version) = entry_scheme.parse_version(&filename) else {
367 continue;
368 };
369
370 if let Some((latest_version, _)) = &latest_entry {
371 if version > *latest_version {
372 latest_entry = Some((version, entry));
373 }
374 } else {
375 latest_entry = Some((version, entry));
376 }
377 }
378
379 if let Some((version, entry)) = latest_entry {
380 let path = Path::from_filesystem_path(entry.path())
381 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
382 let metadata = entry.metadata()?;
383 Ok(Some(ManifestLocation {
384 version,
385 path,
386 size: Some(metadata.len()),
387 naming_scheme: scheme.unwrap(),
388 e_tag: Some(get_etag(&metadata)),
389 }))
390 } else {
391 Ok(None)
392 }
393}
394
395fn get_etag(metadata: &std::fs::Metadata) -> String {
397 let inode = get_inode(metadata);
398 let size = metadata.len();
399 let mtime = metadata
400 .modified()
401 .ok()
402 .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
403 .unwrap_or_default()
404 .as_micros();
405
406 format!("{inode:x}-{mtime:x}-{size:x}")
410}
411
412#[cfg(unix)]
413fn get_inode(metadata: &std::fs::Metadata) -> u64 {
416 std::os::unix::fs::MetadataExt::ino(metadata)
417}
418
419#[cfg(not(unix))]
420fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
422 0
423}
424
425async fn list_manifests<'a>(
426 base_path: &Path,
427 object_store: &'a dyn OSObjectStore,
428) -> Result<BoxStream<'a, Result<ManifestLocation>>> {
429 Ok(object_store
430 .read_dir_all(&base_path.child(VERSIONS_DIR), None)
431 .await?
432 .filter_map(|obj_meta| {
433 futures::future::ready(
434 obj_meta
435 .map(|m| ManifestLocation::try_from(m).ok())
436 .transpose(),
437 )
438 })
439 .boxed())
440}
441
442fn make_staging_manifest_path(base: &Path) -> Result<Path> {
443 let id = uuid::Uuid::new_v4().to_string();
444 Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
445 source: Box::new(e),
446 location: location!(),
447 })
448}
449
450#[cfg(feature = "dynamodb")]
451const DDB_URL_QUERY_KEY: &str = "ddbTableName";
452
453#[async_trait::async_trait]
462pub trait CommitHandler: Debug + Send + Sync {
463 async fn resolve_latest_location(
464 &self,
465 base_path: &Path,
466 object_store: &ObjectStore,
467 ) -> Result<ManifestLocation> {
468 Ok(current_manifest_path(object_store, base_path).await?)
469 }
470
471 async fn resolve_version_location(
472 &self,
473 base_path: &Path,
474 version: u64,
475 object_store: &dyn OSObjectStore,
476 ) -> Result<ManifestLocation> {
477 default_resolve_version(base_path, version, object_store).await
478 }
479
480 async fn list_manifest_locations<'a>(
481 &self,
482 base_path: &Path,
483 object_store: &'a dyn OSObjectStore,
484 ) -> Result<BoxStream<'a, Result<ManifestLocation>>> {
485 list_manifests(base_path, object_store).await
486 }
487
488 async fn commit(
493 &self,
494 manifest: &mut Manifest,
495 indices: Option<Vec<Index>>,
496 base_path: &Path,
497 object_store: &ObjectStore,
498 manifest_writer: ManifestWriter,
499 naming_scheme: ManifestNamingScheme,
500 ) -> std::result::Result<ManifestLocation, CommitError>;
501
502 async fn delete(&self, _base_path: &Path) -> Result<()> {
504 Ok(())
505 }
506}
507
508async fn default_resolve_version(
509 base_path: &Path,
510 version: u64,
511 object_store: &dyn OSObjectStore,
512) -> Result<ManifestLocation> {
513 if is_detached_version(version) {
514 return Ok(ManifestLocation {
515 version,
516 naming_scheme: ManifestNamingScheme::V2,
519 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
521 size: None,
522 e_tag: None,
523 });
524 }
525
526 let scheme = ManifestNamingScheme::V2;
528 let path = scheme.manifest_path(base_path, version);
529 match object_store.head(&path).await {
530 Ok(meta) => Ok(ManifestLocation {
531 version,
532 path,
533 size: Some(meta.size as u64),
534 naming_scheme: scheme,
535 e_tag: meta.e_tag,
536 }),
537 Err(ObjectStoreError::NotFound { .. }) => {
538 let scheme = ManifestNamingScheme::V1;
540 Ok(ManifestLocation {
541 version,
542 path: scheme.manifest_path(base_path, version),
543 size: None,
544 naming_scheme: scheme,
545 e_tag: None,
546 })
547 }
548 Err(e) => Err(e.into()),
549 }
550}
551#[cfg(feature = "dynamodb")]
553#[derive(Debug)]
554struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
555
556#[cfg(feature = "dynamodb")]
557impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
558 fn provide_credentials<'a>(
559 &'a self,
560 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
561 where
562 Self: 'a,
563 {
564 aws_credential_types::provider::future::ProvideCredentials::new(async {
565 let creds = self
566 .0
567 .get_credential()
568 .await
569 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
570 Ok(aws_credential_types::Credentials::new(
571 &creds.key_id,
572 &creds.secret_key,
573 creds.token.clone(),
574 Some(
575 SystemTime::now()
576 .checked_add(Duration::from_secs(
577 60 * 10, ))
579 .expect("overflow"),
580 ),
581 "",
582 ))
583 })
584 }
585}
586
587#[cfg(feature = "dynamodb")]
588async fn build_dynamodb_external_store(
589 table_name: &str,
590 creds: AwsCredentialProvider,
591 region: &str,
592 endpoint: Option<String>,
593 app_name: &str,
594) -> Result<Arc<dyn ExternalManifestStore>> {
595 use super::commit::dynamodb::DynamoDBExternalManifestStore;
596 use aws_sdk_dynamodb::{
597 config::{IdentityCache, Region},
598 Client,
599 };
600
601 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
602 .behavior_version_latest()
603 .region(Some(Region::new(region.to_string())))
604 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
605 .identity_cache(IdentityCache::no_cache());
607
608 if let Some(endpoint) = endpoint {
609 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
610 }
611 let client = Client::from_conf(dynamodb_config.build());
612
613 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
614}
615
616pub async fn commit_handler_from_url(
617 url_or_path: &str,
618 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
620) -> Result<Arc<dyn CommitHandler>> {
621 let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
622 Arc::new(RenameCommitHandler)
623 } else {
624 Arc::new(ConditionalPutCommitHandler)
625 };
626
627 let url = match Url::parse(url_or_path) {
628 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
629 return Ok(local_handler);
631 }
632 Ok(url) => url,
633 Err(_) => {
634 return Ok(local_handler);
635 }
636 };
637
638 match url.scheme() {
639 "file" | "file-object-store" => Ok(local_handler),
640 "s3" | "gs" | "az" | "memory" => Ok(Arc::new(ConditionalPutCommitHandler)),
641 #[cfg(not(feature = "dynamodb"))]
642 "s3+ddb" => Err(Error::InvalidInput {
643 source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
644 location: location!(),
645 }),
646 #[cfg(feature = "dynamodb")]
647 "s3+ddb" => {
648 if url.query_pairs().count() != 1 {
649 return Err(Error::InvalidInput {
650 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
651 .into(),
652 location: location!(),
653 });
654 }
655 let table_name = match url.query_pairs().next() {
656 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
657 if key == DDB_URL_QUERY_KEY =>
658 {
659 if table_name.is_empty() {
660 return Err(Error::InvalidInput {
661 source: "`s3+ddb://` scheme requires non empty dynamodb table name"
662 .into(),
663 location: location!(),
664 });
665 }
666 table_name
667 }
668 _ => {
669 return Err(Error::InvalidInput {
670 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
671 .into(),
672 location: location!(),
673 });
674 }
675 };
676 let options = options.clone().unwrap_or_default();
677 let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
678 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
679 let storage_options = storage_options.as_s3_options();
680
681 let region = storage_options
682 .get(&AmazonS3ConfigKey::Region)
683 .map(|s| s.to_string());
684
685 let (aws_creds, region) = build_aws_credential(
686 options.s3_credentials_refresh_offset,
687 options.aws_credentials.clone(),
688 Some(&storage_options),
689 region,
690 )
691 .await?;
692
693 Ok(Arc::new(ExternalManifestCommitHandler {
694 external_manifest_store: build_dynamodb_external_store(
695 table_name,
696 aws_creds.clone(),
697 ®ion,
698 dynamo_endpoint,
699 "lancedb",
700 )
701 .await?,
702 }))
703 }
704 _ => Ok(Arc::new(UnsafeCommitHandler)),
705 }
706}
707
708#[cfg(feature = "dynamodb")]
709fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
710 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
711 Some(endpoint.to_string())
712 } else {
713 std::env::var("DYNAMODB_ENDPOINT").ok()
714 }
715}
716
717#[derive(Debug)]
719pub enum CommitError {
720 CommitConflict,
722 OtherError(Error),
724}
725
726impl From<Error> for CommitError {
727 fn from(e: Error) -> Self {
728 Self::OtherError(e)
729 }
730}
731
732impl From<CommitError> for Error {
733 fn from(e: CommitError) -> Self {
734 match e {
735 CommitError::CommitConflict => Self::Internal {
736 message: "Commit conflict".to_string(),
737 location: location!(),
738 },
739 CommitError::OtherError(e) => e,
740 }
741 }
742}
743
744static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
746
747pub struct UnsafeCommitHandler;
751
752#[async_trait::async_trait]
753impl CommitHandler for UnsafeCommitHandler {
754 async fn commit(
755 &self,
756 manifest: &mut Manifest,
757 indices: Option<Vec<Index>>,
758 base_path: &Path,
759 object_store: &ObjectStore,
760 manifest_writer: ManifestWriter,
761 naming_scheme: ManifestNamingScheme,
762 ) -> std::result::Result<ManifestLocation, CommitError> {
763 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
765 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
766 log::warn!(
767 "Using unsafe commit handler. Concurrent writes may result in data loss. \
768 Consider providing a commit handler that prevents conflicting writes."
769 );
770 }
771
772 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
773 let res = manifest_writer(object_store, manifest, indices, &version_path).await?;
775
776 Ok(ManifestLocation {
777 version: manifest.version,
778 size: Some(res.size as u64),
779 naming_scheme,
780 path: version_path,
781 e_tag: res.e_tag,
782 })
783 }
784}
785
786impl Debug for UnsafeCommitHandler {
787 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
788 f.debug_struct("UnsafeCommitHandler").finish()
789 }
790}
791
792#[async_trait::async_trait]
794pub trait CommitLock: Debug {
795 type Lease: CommitLease;
796
797 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
810}
811
812#[async_trait::async_trait]
813pub trait CommitLease: Send + Sync {
814 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
816}
817
818#[async_trait::async_trait]
819impl<T: CommitLock + Send + Sync> CommitHandler for T {
820 async fn commit(
821 &self,
822 manifest: &mut Manifest,
823 indices: Option<Vec<Index>>,
824 base_path: &Path,
825 object_store: &ObjectStore,
826 manifest_writer: ManifestWriter,
827 naming_scheme: ManifestNamingScheme,
828 ) -> std::result::Result<ManifestLocation, CommitError> {
829 let path = naming_scheme.manifest_path(base_path, manifest.version);
830 let lease = self.lock(manifest.version).await?;
833
834 match object_store.inner.head(&path).await {
836 Ok(_) => {
837 lease.release(false).await?;
840
841 return Err(CommitError::CommitConflict);
842 }
843 Err(ObjectStoreError::NotFound { .. }) => {}
844 Err(e) => {
845 lease.release(false).await?;
848
849 return Err(CommitError::OtherError(e.into()));
850 }
851 }
852 let res = manifest_writer(object_store, manifest, indices, &path).await;
853
854 lease.release(res.is_ok()).await?;
856
857 let res = res?;
858 Ok(ManifestLocation {
859 version: manifest.version,
860 size: Some(res.size as u64),
861 naming_scheme,
862 path,
863 e_tag: res.e_tag,
864 })
865 }
866}
867
868#[async_trait::async_trait]
869impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
870 async fn commit(
871 &self,
872 manifest: &mut Manifest,
873 indices: Option<Vec<Index>>,
874 base_path: &Path,
875 object_store: &ObjectStore,
876 manifest_writer: ManifestWriter,
877 naming_scheme: ManifestNamingScheme,
878 ) -> std::result::Result<ManifestLocation, CommitError> {
879 self.as_ref()
880 .commit(
881 manifest,
882 indices,
883 base_path,
884 object_store,
885 manifest_writer,
886 naming_scheme,
887 )
888 .await
889 }
890}
891
892pub struct RenameCommitHandler;
896
897#[async_trait::async_trait]
898impl CommitHandler for RenameCommitHandler {
899 async fn commit(
900 &self,
901 manifest: &mut Manifest,
902 indices: Option<Vec<Index>>,
903 base_path: &Path,
904 object_store: &ObjectStore,
905 manifest_writer: ManifestWriter,
906 naming_scheme: ManifestNamingScheme,
907 ) -> std::result::Result<ManifestLocation, CommitError> {
908 let path = naming_scheme.manifest_path(base_path, manifest.version);
912 let tmp_path = make_staging_manifest_path(&path)?;
913
914 let res = manifest_writer(object_store, manifest, indices, &tmp_path).await?;
916
917 match object_store
918 .inner
919 .rename_if_not_exists(&tmp_path, &path)
920 .await
921 {
922 Ok(_) => {
923 Ok(ManifestLocation {
925 version: manifest.version,
926 path,
927 size: Some(res.size as u64),
928 naming_scheme,
929 e_tag: None, })
931 }
932 Err(ObjectStoreError::AlreadyExists { .. }) => {
933 let _ = object_store.delete(&tmp_path).await;
936
937 return Err(CommitError::CommitConflict);
938 }
939 Err(e) => {
940 return Err(CommitError::OtherError(e.into()));
942 }
943 }
944 }
945}
946
947impl Debug for RenameCommitHandler {
948 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
949 f.debug_struct("RenameCommitHandler").finish()
950 }
951}
952
953pub struct ConditionalPutCommitHandler;
954
955#[async_trait::async_trait]
956impl CommitHandler for ConditionalPutCommitHandler {
957 async fn commit(
958 &self,
959 manifest: &mut Manifest,
960 indices: Option<Vec<Index>>,
961 base_path: &Path,
962 object_store: &ObjectStore,
963 manifest_writer: ManifestWriter,
964 naming_scheme: ManifestNamingScheme,
965 ) -> std::result::Result<ManifestLocation, CommitError> {
966 let path = naming_scheme.manifest_path(base_path, manifest.version);
967
968 let memory_store = ObjectStore::memory();
969 let dummy_path = "dummy";
970 manifest_writer(&memory_store, manifest, indices, &dummy_path.into()).await?;
971 let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
972 let size = dummy_data.len() as u64;
973 let res = object_store
974 .inner
975 .put_opts(
976 &path,
977 dummy_data.into(),
978 PutOptions {
979 mode: object_store::PutMode::Create,
980 ..Default::default()
981 },
982 )
983 .await
984 .map_err(|err| match err {
985 ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
986 CommitError::CommitConflict
987 }
988 _ => CommitError::OtherError(err.into()),
989 })?;
990
991 Ok(ManifestLocation {
992 version: manifest.version,
993 path,
994 size: Some(size),
995 naming_scheme,
996 e_tag: res.e_tag,
997 })
998 }
999}
1000
1001impl Debug for ConditionalPutCommitHandler {
1002 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1003 f.debug_struct("ConditionalPutCommitHandler").finish()
1004 }
1005}
1006
1007#[derive(Debug, Clone)]
1008pub struct CommitConfig {
1009 pub num_retries: u32,
1010 }
1012
1013impl Default for CommitConfig {
1014 fn default() -> Self {
1015 Self { num_retries: 20 }
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022
1023 #[test]
1024 fn test_manifest_naming_scheme() {
1025 let v1 = ManifestNamingScheme::V1;
1026 let v2 = ManifestNamingScheme::V2;
1027
1028 assert_eq!(
1029 v1.manifest_path(&Path::from("base"), 0),
1030 Path::from("base/_versions/0.manifest")
1031 );
1032 assert_eq!(
1033 v1.manifest_path(&Path::from("base"), 42),
1034 Path::from("base/_versions/42.manifest")
1035 );
1036
1037 assert_eq!(
1038 v2.manifest_path(&Path::from("base"), 0),
1039 Path::from("base/_versions/18446744073709551615.manifest")
1040 );
1041 assert_eq!(
1042 v2.manifest_path(&Path::from("base"), 42),
1043 Path::from("base/_versions/18446744073709551573.manifest")
1044 );
1045
1046 assert_eq!(v1.parse_version("0.manifest"), Some(0));
1047 assert_eq!(v1.parse_version("42.manifest"), Some(42));
1048 assert_eq!(
1049 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1050 Some(42)
1051 );
1052
1053 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1054 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1055 assert_eq!(
1056 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1057 Some(42)
1058 );
1059
1060 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1061 assert_eq!(
1062 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1063 Some(v2)
1064 );
1065 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1066 }
1067
1068 #[tokio::test]
1069 async fn test_manifest_naming_migration() {
1070 let object_store = ObjectStore::memory();
1071 let base = Path::from("base");
1072 let versions_dir = base.child(VERSIONS_DIR);
1073
1074 let original_files = vec![
1076 versions_dir.child("irrelevant"),
1077 ManifestNamingScheme::V1.manifest_path(&base, 0),
1078 ManifestNamingScheme::V2.manifest_path(&base, 1),
1079 ];
1080 for path in original_files {
1081 object_store.put(&path, b"".as_slice()).await.unwrap();
1082 }
1083
1084 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1085
1086 let expected_files = vec![
1087 ManifestNamingScheme::V2.manifest_path(&base, 1),
1088 ManifestNamingScheme::V2.manifest_path(&base, 0),
1089 versions_dir.child("irrelevant"),
1090 ];
1091 let actual_files = object_store
1092 .inner
1093 .list(Some(&versions_dir))
1094 .map_ok(|res| res.location)
1095 .try_collect::<Vec<_>>()
1096 .await
1097 .unwrap();
1098 assert_eq!(actual_files, expected_files);
1099 }
1100}