1use std::io;
26use std::pin::Pin;
27use std::sync::atomic::AtomicBool;
28use std::sync::Arc;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::future::Either;
33use futures::Stream;
34use futures::{
35 future::{self, BoxFuture},
36 stream::BoxStream,
37 StreamExt, TryStreamExt,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult};
41use log::warn;
42use object_store::PutOptions;
43use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
44use snafu::location;
45use tracing::info;
46use url::Url;
47
48#[cfg(feature = "dynamodb")]
49pub mod dynamodb;
50pub mod external_manifest;
51
52use lance_core::{Error, Result};
53use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
54use lance_io::traits::WriteExt;
55
56use crate::format::{is_detached_version, IndexMetadata, Manifest, Transaction};
57use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
58#[cfg(feature = "dynamodb")]
59use {
60 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
61 aws_credential_types::provider::error::CredentialsError,
62 aws_credential_types::provider::ProvideCredentials,
63 lance_io::object_store::{providers::aws::build_aws_credential, StorageOptions},
64 object_store::aws::AmazonS3ConfigKey,
65 object_store::aws::AwsCredentialProvider,
66 std::borrow::Cow,
67 std::time::{Duration, SystemTime},
68};
69
70const VERSIONS_DIR: &str = "_versions";
71const MANIFEST_EXTENSION: &str = "manifest";
72const DETACHED_VERSION_PREFIX: &str = "d";
73
74#[derive(Clone, Copy, Debug, PartialEq, Eq)]
76pub enum ManifestNamingScheme {
77 V1,
79 V2,
83}
84
85impl ManifestNamingScheme {
86 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
87 let directory = base.child(VERSIONS_DIR);
88 if is_detached_version(version) {
89 let directory = base.child(VERSIONS_DIR);
94 directory.child(format!(
95 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
96 ))
97 } else {
98 match self {
99 Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
100 Self::V2 => {
101 let inverted_version = u64::MAX - version;
102 directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
103 }
104 }
105 }
106 }
107
108 pub fn parse_version(&self, filename: &str) -> Option<u64> {
109 let file_number = filename
110 .split_once('.')
111 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
113 match self {
114 Self::V1 => file_number,
115 Self::V2 => file_number.map(|v| u64::MAX - v),
116 }
117 }
118
119 pub fn detect_scheme(filename: &str) -> Option<Self> {
120 if filename.starts_with(DETACHED_VERSION_PREFIX) {
121 return Some(Self::V2);
123 }
124 if filename.ends_with(MANIFEST_EXTENSION) {
125 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
126 if filename.len() == V2_LEN {
127 Some(Self::V2)
128 } else {
129 Some(Self::V1)
130 }
131 } else {
132 None
133 }
134 }
135
136 pub fn detect_scheme_staging(filename: &str) -> Self {
137 if filename.chars().nth(20) == Some('.') {
140 Self::V2
141 } else {
142 Self::V1
143 }
144 }
145}
146
147pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
157 object_store
158 .inner
159 .list(Some(&dataset_base.child(VERSIONS_DIR)))
160 .try_filter(|res| {
161 let res = if let Some(filename) = res.location.filename() {
162 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
163 } else {
164 false
165 };
166 future::ready(res)
167 })
168 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
169 let filename = meta.location.filename().unwrap();
170 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
171 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
172 object_store.inner.rename(&meta.location, &path).await?;
173 Ok(())
174 })
175 .await?;
176
177 Ok(())
178}
179
180pub type ManifestWriter = for<'a> fn(
184 object_store: &'a ObjectStore,
185 manifest: &'a mut Manifest,
186 indices: Option<Vec<IndexMetadata>>,
187 path: &'a Path,
188 transaction: Option<Transaction>,
189) -> BoxFuture<'a, Result<WriteResult>>;
190
191pub fn write_manifest_file_to_path<'a>(
195 object_store: &'a ObjectStore,
196 manifest: &'a mut Manifest,
197 indices: Option<Vec<IndexMetadata>>,
198 path: &'a Path,
199 transaction: Option<Transaction>,
200) -> BoxFuture<'a, Result<WriteResult>> {
201 Box::pin(async move {
202 let mut object_writer = ObjectWriter::new(object_store, path).await?;
203 let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
204 object_writer
205 .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
206 .await?;
207 let res = object_writer.shutdown().await?;
208 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
209 Ok(res)
210 })
211}
212
213#[derive(Debug, Clone)]
214pub struct ManifestLocation {
215 pub version: u64,
217 pub path: Path,
219 pub size: Option<u64>,
221 pub naming_scheme: ManifestNamingScheme,
223 pub e_tag: Option<String>,
227}
228
229impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
230 type Error = Error;
231
232 fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
233 let filename = meta.location.filename().ok_or_else(|| Error::Internal {
234 message: "ObjectMeta location does not have a filename".to_string(),
235 location: location!(),
236 })?;
237 let scheme =
238 ManifestNamingScheme::detect_scheme(filename).ok_or_else(|| Error::Internal {
239 message: format!("Invalid manifest filename: '{}'", filename),
240 location: location!(),
241 })?;
242 let version = scheme
243 .parse_version(filename)
244 .ok_or_else(|| Error::Internal {
245 message: format!("Invalid manifest filename: '{}'", filename),
246 location: location!(),
247 })?;
248 Ok(Self {
249 version,
250 path: meta.location,
251 size: Some(meta.size),
252 naming_scheme: scheme,
253 e_tag: meta.e_tag,
254 })
255 }
256}
257
258async fn current_manifest_path(
260 object_store: &ObjectStore,
261 base: &Path,
262) -> Result<ManifestLocation> {
263 if object_store.is_local() {
264 if let Ok(Some(location)) = current_manifest_local(base) {
265 return Ok(location);
266 }
267 }
268
269 let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
270
271 let mut valid_manifests = manifest_files.try_filter_map(|res| {
272 if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
273 {
274 future::ready(Ok(Some((scheme, res))))
275 } else {
276 future::ready(Ok(None))
277 }
278 });
279
280 let first = valid_manifests.next().await.transpose()?;
281 match (first, object_store.list_is_lexically_ordered) {
282 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
285 let version = scheme
286 .parse_version(meta.location.filename().unwrap())
287 .unwrap();
288
289 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
293 if scheme != ManifestNamingScheme::V2 {
294 warn!(
295 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
296 to migrate the directory."
297 );
298 break;
299 }
300 let next_version = scheme
301 .parse_version(meta.location.filename().unwrap())
302 .unwrap();
303 if next_version >= version {
304 warn!(
305 "List operation was expected to be lexically ordered, but was not. This \
306 could mean a corrupt read. Please make a bug report on the lance-format/lance \
307 GitHub repository."
308 );
309 break;
310 }
311 }
312
313 Ok(ManifestLocation {
314 version,
315 path: meta.location,
316 size: Some(meta.size),
317 naming_scheme: scheme,
318 e_tag: meta.e_tag,
319 })
320 }
321 (Some((first_scheme, meta)), _) => {
324 let mut current_version = first_scheme
325 .parse_version(meta.location.filename().unwrap())
326 .unwrap();
327 let mut current_meta = meta;
328 let scheme = first_scheme;
329
330 while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
331 if entry_scheme != scheme {
332 return Err(Error::Internal {
333 message: format!(
334 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
335 Use `migrate_manifest_paths_v2` to migrate the directory.",
336 scheme, entry_scheme
337 ),
338 location: location!(),
339 });
340 }
341 let version = entry_scheme
342 .parse_version(meta.location.filename().unwrap())
343 .unwrap();
344 if version > current_version {
345 current_version = version;
346 current_meta = meta;
347 }
348 }
349 Ok(ManifestLocation {
350 version: current_version,
351 path: current_meta.location,
352 size: Some(current_meta.size),
353 naming_scheme: scheme,
354 e_tag: current_meta.e_tag,
355 })
356 }
357 (None, _) => Err(Error::NotFound {
358 uri: base.child(VERSIONS_DIR).to_string(),
359 location: location!(),
360 }),
361 }
362}
363
364fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
368 let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
369 let entries = std::fs::read_dir(path)?;
370
371 let mut latest_entry: Option<(u64, DirEntry)> = None;
372
373 let mut scheme: Option<ManifestNamingScheme> = None;
374
375 for entry in entries {
376 let entry = entry?;
377 let filename_raw = entry.file_name();
378 let filename = filename_raw.to_string_lossy();
379
380 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
381 continue;
384 };
385
386 if let Some(scheme) = scheme {
387 if scheme != entry_scheme {
388 return Err(io::Error::new(
389 io::ErrorKind::InvalidData,
390 format!(
391 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
392 scheme, entry_scheme
393 ),
394 ));
395 }
396 } else {
397 scheme = Some(entry_scheme);
398 }
399
400 let Some(version) = entry_scheme.parse_version(&filename) else {
401 continue;
402 };
403
404 if let Some((latest_version, _)) = &latest_entry {
405 if version > *latest_version {
406 latest_entry = Some((version, entry));
407 }
408 } else {
409 latest_entry = Some((version, entry));
410 }
411 }
412
413 if let Some((version, entry)) = latest_entry {
414 let path = Path::from_filesystem_path(entry.path())
415 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
416 let metadata = entry.metadata()?;
417 Ok(Some(ManifestLocation {
418 version,
419 path,
420 size: Some(metadata.len()),
421 naming_scheme: scheme.unwrap(),
422 e_tag: Some(get_etag(&metadata)),
423 }))
424 } else {
425 Ok(None)
426 }
427}
428
429fn get_etag(metadata: &std::fs::Metadata) -> String {
431 let inode = get_inode(metadata);
432 let size = metadata.len();
433 let mtime = metadata
434 .modified()
435 .ok()
436 .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
437 .unwrap_or_default()
438 .as_micros();
439
440 format!("{inode:x}-{mtime:x}-{size:x}")
444}
445
446#[cfg(unix)]
447fn get_inode(metadata: &std::fs::Metadata) -> u64 {
450 std::os::unix::fs::MetadataExt::ino(metadata)
451}
452
453#[cfg(not(unix))]
454fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
456 0
457}
458
459fn list_manifests<'a>(
460 base_path: &Path,
461 object_store: &'a dyn OSObjectStore,
462) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
463 object_store
464 .read_dir_all(&base_path.child(VERSIONS_DIR), None)
465 .filter_map(|obj_meta| {
466 futures::future::ready(
467 obj_meta
468 .map(|m| ManifestLocation::try_from(m).ok())
469 .transpose(),
470 )
471 })
472 .boxed()
473}
474
475fn make_staging_manifest_path(base: &Path) -> Result<Path> {
476 let id = uuid::Uuid::new_v4().to_string();
477 Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
478 source: Box::new(e),
479 location: location!(),
480 })
481}
482
483#[cfg(feature = "dynamodb")]
484const DDB_URL_QUERY_KEY: &str = "ddbTableName";
485
486#[async_trait::async_trait]
495#[allow(clippy::too_many_arguments)]
496pub trait CommitHandler: Debug + Send + Sync {
497 async fn resolve_latest_location(
498 &self,
499 base_path: &Path,
500 object_store: &ObjectStore,
501 ) -> Result<ManifestLocation> {
502 Ok(current_manifest_path(object_store, base_path).await?)
503 }
504
505 async fn resolve_version_location(
506 &self,
507 base_path: &Path,
508 version: u64,
509 object_store: &dyn OSObjectStore,
510 ) -> Result<ManifestLocation> {
511 default_resolve_version(base_path, version, object_store).await
512 }
513
514 fn list_manifest_locations<'a>(
521 &self,
522 base_path: &Path,
523 object_store: &'a ObjectStore,
524 sorted_descending: bool,
525 ) -> BoxStream<'a, Result<ManifestLocation>> {
526 let underlying_stream = list_manifests(base_path, &object_store.inner);
527
528 if !sorted_descending {
529 return underlying_stream.boxed();
530 }
531
532 async fn sort_stream(
533 input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
534 ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
535 let mut locations = input_stream.try_collect::<Vec<_>>().await?;
536 locations.sort_by_key(|m| std::cmp::Reverse(m.version));
537 Ok(futures::stream::iter(locations.into_iter().map(Ok)))
538 }
539
540 if object_store.list_is_lexically_ordered {
543 let mut peekable = underlying_stream.peekable();
545
546 futures::stream::once(async move {
547 let naming_scheme = match Pin::new(&mut peekable).peek().await {
548 Some(Ok(m)) => m.naming_scheme,
549 Some(Err(_)) => ManifestNamingScheme::V2,
552 None => ManifestNamingScheme::V2,
553 };
554
555 if naming_scheme == ManifestNamingScheme::V2 {
556 Ok(Either::Left(peekable))
558 } else {
559 sort_stream(peekable).await.map(Either::Right)
560 }
561 })
562 .try_flatten()
563 .boxed()
564 } else {
565 futures::stream::once(sort_stream(underlying_stream))
570 .try_flatten()
571 .boxed()
572 }
573 }
574
575 async fn commit(
580 &self,
581 manifest: &mut Manifest,
582 indices: Option<Vec<IndexMetadata>>,
583 base_path: &Path,
584 object_store: &ObjectStore,
585 manifest_writer: ManifestWriter,
586 naming_scheme: ManifestNamingScheme,
587 transaction: Option<Transaction>,
588 ) -> std::result::Result<ManifestLocation, CommitError>;
589
590 async fn delete(&self, _base_path: &Path) -> Result<()> {
592 Ok(())
593 }
594}
595
596async fn default_resolve_version(
597 base_path: &Path,
598 version: u64,
599 object_store: &dyn OSObjectStore,
600) -> Result<ManifestLocation> {
601 if is_detached_version(version) {
602 return Ok(ManifestLocation {
603 version,
604 naming_scheme: ManifestNamingScheme::V2,
607 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
609 size: None,
610 e_tag: None,
611 });
612 }
613
614 let scheme = ManifestNamingScheme::V2;
616 let path = scheme.manifest_path(base_path, version);
617 match object_store.head(&path).await {
618 Ok(meta) => Ok(ManifestLocation {
619 version,
620 path,
621 size: Some(meta.size),
622 naming_scheme: scheme,
623 e_tag: meta.e_tag,
624 }),
625 Err(ObjectStoreError::NotFound { .. }) => {
626 let scheme = ManifestNamingScheme::V1;
628 Ok(ManifestLocation {
629 version,
630 path: scheme.manifest_path(base_path, version),
631 size: None,
632 naming_scheme: scheme,
633 e_tag: None,
634 })
635 }
636 Err(e) => Err(e.into()),
637 }
638}
639#[cfg(feature = "dynamodb")]
641#[derive(Debug)]
642struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
643
644#[cfg(feature = "dynamodb")]
645impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
646 fn provide_credentials<'a>(
647 &'a self,
648 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
649 where
650 Self: 'a,
651 {
652 aws_credential_types::provider::future::ProvideCredentials::new(async {
653 let creds = self
654 .0
655 .get_credential()
656 .await
657 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
658 Ok(aws_credential_types::Credentials::new(
659 &creds.key_id,
660 &creds.secret_key,
661 creds.token.clone(),
662 Some(
663 SystemTime::now()
664 .checked_add(Duration::from_secs(
665 60 * 10, ))
667 .expect("overflow"),
668 ),
669 "",
670 ))
671 })
672 }
673}
674
675#[cfg(feature = "dynamodb")]
676async fn build_dynamodb_external_store(
677 table_name: &str,
678 creds: AwsCredentialProvider,
679 region: &str,
680 endpoint: Option<String>,
681 app_name: &str,
682) -> Result<Arc<dyn ExternalManifestStore>> {
683 use super::commit::dynamodb::DynamoDBExternalManifestStore;
684 use aws_sdk_dynamodb::{
685 config::{retry::RetryConfig, IdentityCache, Region},
686 Client,
687 };
688
689 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
690 .behavior_version_latest()
691 .region(Some(Region::new(region.to_string())))
692 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
693 .identity_cache(IdentityCache::no_cache())
695 .retry_config(RetryConfig::standard().with_max_attempts(5));
698
699 if let Some(endpoint) = endpoint {
700 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
701 }
702 let client = Client::from_conf(dynamodb_config.build());
703
704 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
705}
706
707pub async fn commit_handler_from_url(
708 url_or_path: &str,
709 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
711) -> Result<Arc<dyn CommitHandler>> {
712 let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
713 Arc::new(RenameCommitHandler)
714 } else {
715 Arc::new(ConditionalPutCommitHandler)
716 };
717
718 let url = match Url::parse(url_or_path) {
719 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
720 return Ok(local_handler);
722 }
723 Ok(url) => url,
724 Err(_) => {
725 return Ok(local_handler);
726 }
727 };
728
729 match url.scheme() {
730 "file" | "file-object-store" => Ok(local_handler),
731 "s3" | "gs" | "az" | "memory" | "oss" => Ok(Arc::new(ConditionalPutCommitHandler)),
732 #[cfg(not(feature = "dynamodb"))]
733 "s3+ddb" => Err(Error::InvalidInput {
734 source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
735 location: location!(),
736 }),
737 #[cfg(feature = "dynamodb")]
738 "s3+ddb" => {
739 if url.query_pairs().count() != 1 {
740 return Err(Error::InvalidInput {
741 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
742 .into(),
743 location: location!(),
744 });
745 }
746 let table_name = match url.query_pairs().next() {
747 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
748 if key == DDB_URL_QUERY_KEY =>
749 {
750 if table_name.is_empty() {
751 return Err(Error::InvalidInput {
752 source: "`s3+ddb://` scheme requires non empty dynamodb table name"
753 .into(),
754 location: location!(),
755 });
756 }
757 table_name
758 }
759 _ => {
760 return Err(Error::InvalidInput {
761 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
762 .into(),
763 location: location!(),
764 });
765 }
766 };
767 let options = options.clone().unwrap_or_default();
768 let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
769 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
770 let expires_at_millis = storage_options.expires_at_millis();
771 let storage_options = storage_options.as_s3_options();
772
773 let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
774
775 let (aws_creds, region) = build_aws_credential(
776 options.s3_credentials_refresh_offset,
777 options.aws_credentials.clone(),
778 Some(&storage_options),
779 region,
780 options.storage_options_provider.clone(),
781 expires_at_millis,
782 )
783 .await?;
784
785 Ok(Arc::new(ExternalManifestCommitHandler {
786 external_manifest_store: build_dynamodb_external_store(
787 table_name,
788 aws_creds.clone(),
789 ®ion,
790 dynamo_endpoint,
791 "lancedb",
792 )
793 .await?,
794 }))
795 }
796 _ => Ok(Arc::new(UnsafeCommitHandler)),
797 }
798}
799
800#[cfg(feature = "dynamodb")]
801fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
802 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
803 Some(endpoint.clone())
804 } else {
805 std::env::var("DYNAMODB_ENDPOINT").ok()
806 }
807}
808
809#[derive(Debug)]
811pub enum CommitError {
812 CommitConflict,
814 OtherError(Error),
816}
817
818impl From<Error> for CommitError {
819 fn from(e: Error) -> Self {
820 Self::OtherError(e)
821 }
822}
823
824impl From<CommitError> for Error {
825 fn from(e: CommitError) -> Self {
826 match e {
827 CommitError::CommitConflict => Self::Internal {
828 message: "Commit conflict".to_string(),
829 location: location!(),
830 },
831 CommitError::OtherError(e) => e,
832 }
833 }
834}
835
836static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
838
839pub struct UnsafeCommitHandler;
843
844#[async_trait::async_trait]
845#[allow(clippy::too_many_arguments)]
846impl CommitHandler for UnsafeCommitHandler {
847 async fn commit(
848 &self,
849 manifest: &mut Manifest,
850 indices: Option<Vec<IndexMetadata>>,
851 base_path: &Path,
852 object_store: &ObjectStore,
853 manifest_writer: ManifestWriter,
854 naming_scheme: ManifestNamingScheme,
855 transaction: Option<Transaction>,
856 ) -> std::result::Result<ManifestLocation, CommitError> {
857 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
859 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
860 log::warn!(
861 "Using unsafe commit handler. Concurrent writes may result in data loss. \
862 Consider providing a commit handler that prevents conflicting writes."
863 );
864 }
865
866 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
867 let res =
868 manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
869
870 Ok(ManifestLocation {
871 version: manifest.version,
872 size: Some(res.size as u64),
873 naming_scheme,
874 path: version_path,
875 e_tag: res.e_tag,
876 })
877 }
878}
879
880impl Debug for UnsafeCommitHandler {
881 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
882 f.debug_struct("UnsafeCommitHandler").finish()
883 }
884}
885
886#[async_trait::async_trait]
888pub trait CommitLock: Debug {
889 type Lease: CommitLease;
890
891 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
904}
905
906#[async_trait::async_trait]
907pub trait CommitLease: Send + Sync {
908 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
910}
911
912#[async_trait::async_trait]
913impl<T: CommitLock + Send + Sync> CommitHandler for T {
914 async fn commit(
915 &self,
916 manifest: &mut Manifest,
917 indices: Option<Vec<IndexMetadata>>,
918 base_path: &Path,
919 object_store: &ObjectStore,
920 manifest_writer: ManifestWriter,
921 naming_scheme: ManifestNamingScheme,
922 transaction: Option<Transaction>,
923 ) -> std::result::Result<ManifestLocation, CommitError> {
924 let path = naming_scheme.manifest_path(base_path, manifest.version);
925 let lease = self.lock(manifest.version).await?;
928
929 match object_store.inner.head(&path).await {
931 Ok(_) => {
932 lease.release(false).await?;
935
936 return Err(CommitError::CommitConflict);
937 }
938 Err(ObjectStoreError::NotFound { .. }) => {}
939 Err(e) => {
940 lease.release(false).await?;
943
944 return Err(CommitError::OtherError(e.into()));
945 }
946 }
947 let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
948
949 lease.release(res.is_ok()).await?;
951
952 let res = res?;
953 Ok(ManifestLocation {
954 version: manifest.version,
955 size: Some(res.size as u64),
956 naming_scheme,
957 path,
958 e_tag: res.e_tag,
959 })
960 }
961}
962
963#[async_trait::async_trait]
964impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
965 async fn commit(
966 &self,
967 manifest: &mut Manifest,
968 indices: Option<Vec<IndexMetadata>>,
969 base_path: &Path,
970 object_store: &ObjectStore,
971 manifest_writer: ManifestWriter,
972 naming_scheme: ManifestNamingScheme,
973 transaction: Option<Transaction>,
974 ) -> std::result::Result<ManifestLocation, CommitError> {
975 self.as_ref()
976 .commit(
977 manifest,
978 indices,
979 base_path,
980 object_store,
981 manifest_writer,
982 naming_scheme,
983 transaction,
984 )
985 .await
986 }
987}
988
989pub struct RenameCommitHandler;
993
994#[async_trait::async_trait]
995impl CommitHandler for RenameCommitHandler {
996 async fn commit(
997 &self,
998 manifest: &mut Manifest,
999 indices: Option<Vec<IndexMetadata>>,
1000 base_path: &Path,
1001 object_store: &ObjectStore,
1002 manifest_writer: ManifestWriter,
1003 naming_scheme: ManifestNamingScheme,
1004 transaction: Option<Transaction>,
1005 ) -> std::result::Result<ManifestLocation, CommitError> {
1006 let path = naming_scheme.manifest_path(base_path, manifest.version);
1010 let tmp_path = make_staging_manifest_path(&path)?;
1011
1012 let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1013
1014 match object_store
1015 .inner
1016 .rename_if_not_exists(&tmp_path, &path)
1017 .await
1018 {
1019 Ok(_) => {
1020 Ok(ManifestLocation {
1022 version: manifest.version,
1023 path,
1024 size: Some(res.size as u64),
1025 naming_scheme,
1026 e_tag: None, })
1028 }
1029 Err(ObjectStoreError::AlreadyExists { .. }) => {
1030 let _ = object_store.delete(&tmp_path).await;
1033
1034 return Err(CommitError::CommitConflict);
1035 }
1036 Err(e) => {
1037 return Err(CommitError::OtherError(e.into()));
1039 }
1040 }
1041 }
1042}
1043
1044impl Debug for RenameCommitHandler {
1045 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1046 f.debug_struct("RenameCommitHandler").finish()
1047 }
1048}
1049
1050pub struct ConditionalPutCommitHandler;
1051
1052#[async_trait::async_trait]
1053impl CommitHandler for ConditionalPutCommitHandler {
1054 async fn commit(
1055 &self,
1056 manifest: &mut Manifest,
1057 indices: Option<Vec<IndexMetadata>>,
1058 base_path: &Path,
1059 object_store: &ObjectStore,
1060 manifest_writer: ManifestWriter,
1061 naming_scheme: ManifestNamingScheme,
1062 transaction: Option<Transaction>,
1063 ) -> std::result::Result<ManifestLocation, CommitError> {
1064 let path = naming_scheme.manifest_path(base_path, manifest.version);
1065
1066 let memory_store = ObjectStore::memory();
1067 let dummy_path = "dummy";
1068 manifest_writer(
1069 &memory_store,
1070 manifest,
1071 indices,
1072 &dummy_path.into(),
1073 transaction,
1074 )
1075 .await?;
1076 let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1077 let size = dummy_data.len() as u64;
1078 let res = object_store
1079 .inner
1080 .put_opts(
1081 &path,
1082 dummy_data.into(),
1083 PutOptions {
1084 mode: object_store::PutMode::Create,
1085 ..Default::default()
1086 },
1087 )
1088 .await
1089 .map_err(|err| match err {
1090 ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1091 CommitError::CommitConflict
1092 }
1093 _ => CommitError::OtherError(err.into()),
1094 })?;
1095
1096 Ok(ManifestLocation {
1097 version: manifest.version,
1098 path,
1099 size: Some(size),
1100 naming_scheme,
1101 e_tag: res.e_tag,
1102 })
1103 }
1104}
1105
1106impl Debug for ConditionalPutCommitHandler {
1107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1108 f.debug_struct("ConditionalPutCommitHandler").finish()
1109 }
1110}
1111
1112#[derive(Debug, Clone)]
1113pub struct CommitConfig {
1114 pub num_retries: u32,
1115 pub skip_auto_cleanup: bool,
1116 }
1118
1119impl Default for CommitConfig {
1120 fn default() -> Self {
1121 Self {
1122 num_retries: 20,
1123 skip_auto_cleanup: false,
1124 }
1125 }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130 use lance_core::utils::tempfile::TempObjDir;
1131
1132 use super::*;
1133
1134 #[test]
1135 fn test_manifest_naming_scheme() {
1136 let v1 = ManifestNamingScheme::V1;
1137 let v2 = ManifestNamingScheme::V2;
1138
1139 assert_eq!(
1140 v1.manifest_path(&Path::from("base"), 0),
1141 Path::from("base/_versions/0.manifest")
1142 );
1143 assert_eq!(
1144 v1.manifest_path(&Path::from("base"), 42),
1145 Path::from("base/_versions/42.manifest")
1146 );
1147
1148 assert_eq!(
1149 v2.manifest_path(&Path::from("base"), 0),
1150 Path::from("base/_versions/18446744073709551615.manifest")
1151 );
1152 assert_eq!(
1153 v2.manifest_path(&Path::from("base"), 42),
1154 Path::from("base/_versions/18446744073709551573.manifest")
1155 );
1156
1157 assert_eq!(v1.parse_version("0.manifest"), Some(0));
1158 assert_eq!(v1.parse_version("42.manifest"), Some(42));
1159 assert_eq!(
1160 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1161 Some(42)
1162 );
1163
1164 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1165 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1166 assert_eq!(
1167 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1168 Some(42)
1169 );
1170
1171 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1172 assert_eq!(
1173 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1174 Some(v2)
1175 );
1176 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1177 }
1178
1179 #[tokio::test]
1180 async fn test_manifest_naming_migration() {
1181 let object_store = ObjectStore::memory();
1182 let base = Path::from("base");
1183 let versions_dir = base.child(VERSIONS_DIR);
1184
1185 let original_files = vec![
1187 versions_dir.child("irrelevant"),
1188 ManifestNamingScheme::V1.manifest_path(&base, 0),
1189 ManifestNamingScheme::V2.manifest_path(&base, 1),
1190 ];
1191 for path in original_files {
1192 object_store.put(&path, b"".as_slice()).await.unwrap();
1193 }
1194
1195 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1196
1197 let expected_files = vec![
1198 ManifestNamingScheme::V2.manifest_path(&base, 1),
1199 ManifestNamingScheme::V2.manifest_path(&base, 0),
1200 versions_dir.child("irrelevant"),
1201 ];
1202 let actual_files = object_store
1203 .inner
1204 .list(Some(&versions_dir))
1205 .map_ok(|res| res.location)
1206 .try_collect::<Vec<_>>()
1207 .await
1208 .unwrap();
1209 assert_eq!(actual_files, expected_files);
1210 }
1211
1212 #[tokio::test]
1213 #[rstest::rstest]
1214 async fn test_list_manifests_sorted(
1215 #[values(true, false)] lexical_list_store: bool,
1216 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1217 naming_scheme: ManifestNamingScheme,
1218 ) {
1219 let tempdir;
1220 let (object_store, base) = if lexical_list_store {
1221 (Box::new(ObjectStore::memory()), Path::from("base"))
1222 } else {
1223 tempdir = TempObjDir::default();
1224 let path = tempdir.child("base");
1225 let store = Box::new(ObjectStore::local());
1226 assert!(!store.list_is_lexically_ordered);
1227 (store, path)
1228 };
1229
1230 let mut expected_paths = Vec::new();
1232 for i in (0..12).rev() {
1233 let path = naming_scheme.manifest_path(&base, i);
1234 object_store.put(&path, b"".as_slice()).await.unwrap();
1235 expected_paths.push(path);
1236 }
1237
1238 let actual_versions = ConditionalPutCommitHandler
1239 .list_manifest_locations(&base, &object_store, true)
1240 .map_ok(|location| location.path)
1241 .try_collect::<Vec<_>>()
1242 .await
1243 .unwrap();
1244
1245 assert_eq!(actual_versions, expected_paths);
1246 }
1247
1248 #[tokio::test]
1249 #[rstest::rstest]
1250 async fn test_current_manifest_path(
1251 #[values(true, false)] lexical_list_store: bool,
1252 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1253 naming_scheme: ManifestNamingScheme,
1254 ) {
1255 let mut object_store = ObjectStore::memory();
1258 object_store.list_is_lexically_ordered = lexical_list_store;
1259 let object_store = Box::new(object_store);
1260 let base = Path::from("base");
1261
1262 for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1264 let path = naming_scheme.manifest_path(&base, version);
1265 object_store.put(&path, b"".as_slice()).await.unwrap();
1266 }
1267
1268 let location = current_manifest_path(&object_store, &base).await.unwrap();
1269
1270 assert_eq!(location.version, 11);
1271 assert_eq!(location.naming_scheme, naming_scheme);
1272 assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1273 }
1274}