1use std::io;
26use std::pin::Pin;
27use std::sync::atomic::AtomicBool;
28use std::sync::Arc;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::future::Either;
33use futures::Stream;
34use futures::{
35 future::{self, BoxFuture},
36 stream::BoxStream,
37 StreamExt, TryStreamExt,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult};
41use log::warn;
42use object_store::PutOptions;
43use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
44use snafu::location;
45use tracing::info;
46use url::Url;
47
48#[cfg(feature = "dynamodb")]
49pub mod dynamodb;
50pub mod external_manifest;
51
52use lance_core::{Error, Result};
53use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
54use lance_io::traits::WriteExt;
55
56use crate::format::{is_detached_version, IndexMetadata, Manifest, Transaction};
57use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
58#[cfg(feature = "dynamodb")]
59use {
60 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
61 aws_credential_types::provider::error::CredentialsError,
62 aws_credential_types::provider::ProvideCredentials,
63 lance_io::object_store::{providers::aws::build_aws_credential, StorageOptions},
64 object_store::aws::AmazonS3ConfigKey,
65 object_store::aws::AwsCredentialProvider,
66 std::borrow::Cow,
67 std::time::{Duration, SystemTime},
68};
69
70const VERSIONS_DIR: &str = "_versions";
71const MANIFEST_EXTENSION: &str = "manifest";
72const DETACHED_VERSION_PREFIX: &str = "d";
73
74#[derive(Clone, Copy, Debug, PartialEq, Eq)]
76pub enum ManifestNamingScheme {
77 V1,
79 V2,
83}
84
85impl ManifestNamingScheme {
86 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
87 let directory = base.child(VERSIONS_DIR);
88 if is_detached_version(version) {
89 let directory = base.child(VERSIONS_DIR);
94 directory.child(format!(
95 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
96 ))
97 } else {
98 match self {
99 Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
100 Self::V2 => {
101 let inverted_version = u64::MAX - version;
102 directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
103 }
104 }
105 }
106 }
107
108 pub fn parse_version(&self, filename: &str) -> Option<u64> {
109 let file_number = filename
110 .split_once('.')
111 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
113 match self {
114 Self::V1 => file_number,
115 Self::V2 => file_number.map(|v| u64::MAX - v),
116 }
117 }
118
119 pub fn detect_scheme(filename: &str) -> Option<Self> {
120 if filename.starts_with(DETACHED_VERSION_PREFIX) {
121 return Some(Self::V2);
123 }
124 if filename.ends_with(MANIFEST_EXTENSION) {
125 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
126 if filename.len() == V2_LEN {
127 Some(Self::V2)
128 } else {
129 Some(Self::V1)
130 }
131 } else {
132 None
133 }
134 }
135
136 pub fn detect_scheme_staging(filename: &str) -> Self {
137 if filename.chars().nth(20) == Some('.') {
140 Self::V2
141 } else {
142 Self::V1
143 }
144 }
145}
146
147pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
157 object_store
158 .inner
159 .list(Some(&dataset_base.child(VERSIONS_DIR)))
160 .try_filter(|res| {
161 let res = if let Some(filename) = res.location.filename() {
162 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
163 } else {
164 false
165 };
166 future::ready(res)
167 })
168 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
169 let filename = meta.location.filename().unwrap();
170 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
171 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
172 object_store.inner.rename(&meta.location, &path).await?;
173 Ok(())
174 })
175 .await?;
176
177 Ok(())
178}
179
180pub type ManifestWriter = for<'a> fn(
184 object_store: &'a ObjectStore,
185 manifest: &'a mut Manifest,
186 indices: Option<Vec<IndexMetadata>>,
187 path: &'a Path,
188 transaction: Option<Transaction>,
189) -> BoxFuture<'a, Result<WriteResult>>;
190
191pub fn write_manifest_file_to_path<'a>(
195 object_store: &'a ObjectStore,
196 manifest: &'a mut Manifest,
197 indices: Option<Vec<IndexMetadata>>,
198 path: &'a Path,
199 transaction: Option<Transaction>,
200) -> BoxFuture<'a, Result<WriteResult>> {
201 Box::pin(async move {
202 let mut object_writer = ObjectWriter::new(object_store, path).await?;
203 let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
204 object_writer
205 .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
206 .await?;
207 let res = object_writer.shutdown().await?;
208 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
209 Ok(res)
210 })
211}
212
213#[derive(Debug, Clone)]
214pub struct ManifestLocation {
215 pub version: u64,
217 pub path: Path,
219 pub size: Option<u64>,
221 pub naming_scheme: ManifestNamingScheme,
223 pub e_tag: Option<String>,
227}
228
229impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
230 type Error = Error;
231
232 fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
233 let filename = meta.location.filename().ok_or_else(|| Error::Internal {
234 message: "ObjectMeta location does not have a filename".to_string(),
235 location: location!(),
236 })?;
237 let scheme =
238 ManifestNamingScheme::detect_scheme(filename).ok_or_else(|| Error::Internal {
239 message: format!("Invalid manifest filename: '{}'", filename),
240 location: location!(),
241 })?;
242 let version = scheme
243 .parse_version(filename)
244 .ok_or_else(|| Error::Internal {
245 message: format!("Invalid manifest filename: '{}'", filename),
246 location: location!(),
247 })?;
248 Ok(Self {
249 version,
250 path: meta.location,
251 size: Some(meta.size),
252 naming_scheme: scheme,
253 e_tag: meta.e_tag,
254 })
255 }
256}
257
258async fn current_manifest_path(
260 object_store: &ObjectStore,
261 base: &Path,
262) -> Result<ManifestLocation> {
263 if object_store.is_local() {
264 if let Ok(Some(location)) = current_manifest_local(base) {
265 return Ok(location);
266 }
267 }
268
269 let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
270
271 let mut valid_manifests = manifest_files.try_filter_map(|res| {
272 if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
273 {
274 future::ready(Ok(Some((scheme, res))))
275 } else {
276 future::ready(Ok(None))
277 }
278 });
279
280 let first = valid_manifests.next().await.transpose()?;
281 match (first, object_store.list_is_lexically_ordered) {
282 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
285 let version = scheme
286 .parse_version(meta.location.filename().unwrap())
287 .unwrap();
288
289 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
293 if scheme != ManifestNamingScheme::V2 {
294 warn!(
295 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
296 to migrate the directory."
297 );
298 break;
299 }
300 let next_version = scheme
301 .parse_version(meta.location.filename().unwrap())
302 .unwrap();
303 if next_version >= version {
304 warn!(
305 "List operation was expected to be lexically ordered, but was not. This \
306 could mean a corrupt read. Please make a bug report on the lance-format/lance \
307 GitHub repository."
308 );
309 break;
310 }
311 }
312
313 Ok(ManifestLocation {
314 version,
315 path: meta.location,
316 size: Some(meta.size),
317 naming_scheme: scheme,
318 e_tag: meta.e_tag,
319 })
320 }
321 (Some((scheme, meta)), _) => {
325 let mut current_version = scheme
326 .parse_version(meta.location.filename().unwrap())
327 .unwrap();
328 let mut current_meta = meta;
329
330 while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
331 if matches!(scheme, ManifestNamingScheme::V2) {
332 return Err(Error::Internal {
333 message: "Found V2 manifest in a V1 manifest directory".to_string(),
334 location: location!(),
335 });
336 }
337 let version = scheme
338 .parse_version(meta.location.filename().unwrap())
339 .unwrap();
340 if version > current_version {
341 current_version = version;
342 current_meta = meta;
343 }
344 }
345 Ok(ManifestLocation {
346 version: current_version,
347 path: current_meta.location,
348 size: Some(current_meta.size),
349 naming_scheme: scheme,
350 e_tag: current_meta.e_tag,
351 })
352 }
353 (None, _) => Err(Error::NotFound {
354 uri: base.child(VERSIONS_DIR).to_string(),
355 location: location!(),
356 }),
357 }
358}
359
360fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
364 let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
365 let entries = std::fs::read_dir(path)?;
366
367 let mut latest_entry: Option<(u64, DirEntry)> = None;
368
369 let mut scheme: Option<ManifestNamingScheme> = None;
370
371 for entry in entries {
372 let entry = entry?;
373 let filename_raw = entry.file_name();
374 let filename = filename_raw.to_string_lossy();
375
376 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
377 continue;
380 };
381
382 if let Some(scheme) = scheme {
383 if scheme != entry_scheme {
384 return Err(io::Error::new(
385 io::ErrorKind::InvalidData,
386 format!(
387 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
388 scheme, entry_scheme
389 ),
390 ));
391 }
392 } else {
393 scheme = Some(entry_scheme);
394 }
395
396 let Some(version) = entry_scheme.parse_version(&filename) else {
397 continue;
398 };
399
400 if let Some((latest_version, _)) = &latest_entry {
401 if version > *latest_version {
402 latest_entry = Some((version, entry));
403 }
404 } else {
405 latest_entry = Some((version, entry));
406 }
407 }
408
409 if let Some((version, entry)) = latest_entry {
410 let path = Path::from_filesystem_path(entry.path())
411 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
412 let metadata = entry.metadata()?;
413 Ok(Some(ManifestLocation {
414 version,
415 path,
416 size: Some(metadata.len()),
417 naming_scheme: scheme.unwrap(),
418 e_tag: Some(get_etag(&metadata)),
419 }))
420 } else {
421 Ok(None)
422 }
423}
424
425fn get_etag(metadata: &std::fs::Metadata) -> String {
427 let inode = get_inode(metadata);
428 let size = metadata.len();
429 let mtime = metadata
430 .modified()
431 .ok()
432 .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
433 .unwrap_or_default()
434 .as_micros();
435
436 format!("{inode:x}-{mtime:x}-{size:x}")
440}
441
442#[cfg(unix)]
443fn get_inode(metadata: &std::fs::Metadata) -> u64 {
446 std::os::unix::fs::MetadataExt::ino(metadata)
447}
448
449#[cfg(not(unix))]
450fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
452 0
453}
454
455fn list_manifests<'a>(
456 base_path: &Path,
457 object_store: &'a dyn OSObjectStore,
458) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
459 object_store
460 .read_dir_all(&base_path.child(VERSIONS_DIR), None)
461 .filter_map(|obj_meta| {
462 futures::future::ready(
463 obj_meta
464 .map(|m| ManifestLocation::try_from(m).ok())
465 .transpose(),
466 )
467 })
468 .boxed()
469}
470
471fn make_staging_manifest_path(base: &Path) -> Result<Path> {
472 let id = uuid::Uuid::new_v4().to_string();
473 Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
474 source: Box::new(e),
475 location: location!(),
476 })
477}
478
479#[cfg(feature = "dynamodb")]
480const DDB_URL_QUERY_KEY: &str = "ddbTableName";
481
482#[async_trait::async_trait]
491#[allow(clippy::too_many_arguments)]
492pub trait CommitHandler: Debug + Send + Sync {
493 async fn resolve_latest_location(
494 &self,
495 base_path: &Path,
496 object_store: &ObjectStore,
497 ) -> Result<ManifestLocation> {
498 Ok(current_manifest_path(object_store, base_path).await?)
499 }
500
501 async fn resolve_version_location(
502 &self,
503 base_path: &Path,
504 version: u64,
505 object_store: &dyn OSObjectStore,
506 ) -> Result<ManifestLocation> {
507 default_resolve_version(base_path, version, object_store).await
508 }
509
510 fn list_manifest_locations<'a>(
517 &self,
518 base_path: &Path,
519 object_store: &'a ObjectStore,
520 sorted_descending: bool,
521 ) -> BoxStream<'a, Result<ManifestLocation>> {
522 let underlying_stream = list_manifests(base_path, &object_store.inner);
523
524 if !sorted_descending {
525 return underlying_stream.boxed();
526 }
527
528 async fn sort_stream(
529 input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
530 ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
531 let mut locations = input_stream.try_collect::<Vec<_>>().await?;
532 locations.sort_by_key(|m| std::cmp::Reverse(m.version));
533 Ok(futures::stream::iter(locations.into_iter().map(Ok)))
534 }
535
536 if object_store.list_is_lexically_ordered {
539 let mut peekable = underlying_stream.peekable();
541
542 futures::stream::once(async move {
543 let naming_scheme = match Pin::new(&mut peekable).peek().await {
544 Some(Ok(m)) => m.naming_scheme,
545 Some(Err(_)) => ManifestNamingScheme::V2,
548 None => ManifestNamingScheme::V2,
549 };
550
551 if naming_scheme == ManifestNamingScheme::V2 {
552 Ok(Either::Left(peekable))
554 } else {
555 sort_stream(peekable).await.map(Either::Right)
556 }
557 })
558 .try_flatten()
559 .boxed()
560 } else {
561 futures::stream::once(sort_stream(underlying_stream))
566 .try_flatten()
567 .boxed()
568 }
569 }
570
571 async fn commit(
576 &self,
577 manifest: &mut Manifest,
578 indices: Option<Vec<IndexMetadata>>,
579 base_path: &Path,
580 object_store: &ObjectStore,
581 manifest_writer: ManifestWriter,
582 naming_scheme: ManifestNamingScheme,
583 transaction: Option<Transaction>,
584 ) -> std::result::Result<ManifestLocation, CommitError>;
585
586 async fn delete(&self, _base_path: &Path) -> Result<()> {
588 Ok(())
589 }
590}
591
592async fn default_resolve_version(
593 base_path: &Path,
594 version: u64,
595 object_store: &dyn OSObjectStore,
596) -> Result<ManifestLocation> {
597 if is_detached_version(version) {
598 return Ok(ManifestLocation {
599 version,
600 naming_scheme: ManifestNamingScheme::V2,
603 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
605 size: None,
606 e_tag: None,
607 });
608 }
609
610 let scheme = ManifestNamingScheme::V2;
612 let path = scheme.manifest_path(base_path, version);
613 match object_store.head(&path).await {
614 Ok(meta) => Ok(ManifestLocation {
615 version,
616 path,
617 size: Some(meta.size),
618 naming_scheme: scheme,
619 e_tag: meta.e_tag,
620 }),
621 Err(ObjectStoreError::NotFound { .. }) => {
622 let scheme = ManifestNamingScheme::V1;
624 Ok(ManifestLocation {
625 version,
626 path: scheme.manifest_path(base_path, version),
627 size: None,
628 naming_scheme: scheme,
629 e_tag: None,
630 })
631 }
632 Err(e) => Err(e.into()),
633 }
634}
635#[cfg(feature = "dynamodb")]
637#[derive(Debug)]
638struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
639
640#[cfg(feature = "dynamodb")]
641impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
642 fn provide_credentials<'a>(
643 &'a self,
644 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
645 where
646 Self: 'a,
647 {
648 aws_credential_types::provider::future::ProvideCredentials::new(async {
649 let creds = self
650 .0
651 .get_credential()
652 .await
653 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
654 Ok(aws_credential_types::Credentials::new(
655 &creds.key_id,
656 &creds.secret_key,
657 creds.token.clone(),
658 Some(
659 SystemTime::now()
660 .checked_add(Duration::from_secs(
661 60 * 10, ))
663 .expect("overflow"),
664 ),
665 "",
666 ))
667 })
668 }
669}
670
671#[cfg(feature = "dynamodb")]
672async fn build_dynamodb_external_store(
673 table_name: &str,
674 creds: AwsCredentialProvider,
675 region: &str,
676 endpoint: Option<String>,
677 app_name: &str,
678) -> Result<Arc<dyn ExternalManifestStore>> {
679 use super::commit::dynamodb::DynamoDBExternalManifestStore;
680 use aws_sdk_dynamodb::{
681 config::{retry::RetryConfig, IdentityCache, Region},
682 Client,
683 };
684
685 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
686 .behavior_version_latest()
687 .region(Some(Region::new(region.to_string())))
688 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
689 .identity_cache(IdentityCache::no_cache())
691 .retry_config(RetryConfig::standard().with_max_attempts(5));
694
695 if let Some(endpoint) = endpoint {
696 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
697 }
698 let client = Client::from_conf(dynamodb_config.build());
699
700 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
701}
702
703pub async fn commit_handler_from_url(
704 url_or_path: &str,
705 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
707) -> Result<Arc<dyn CommitHandler>> {
708 let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
709 Arc::new(RenameCommitHandler)
710 } else {
711 Arc::new(ConditionalPutCommitHandler)
712 };
713
714 let url = match Url::parse(url_or_path) {
715 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
716 return Ok(local_handler);
718 }
719 Ok(url) => url,
720 Err(_) => {
721 return Ok(local_handler);
722 }
723 };
724
725 match url.scheme() {
726 "file" | "file-object-store" => Ok(local_handler),
727 "s3" | "gs" | "az" | "memory" | "oss" => Ok(Arc::new(ConditionalPutCommitHandler)),
728 #[cfg(not(feature = "dynamodb"))]
729 "s3+ddb" => Err(Error::InvalidInput {
730 source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
731 location: location!(),
732 }),
733 #[cfg(feature = "dynamodb")]
734 "s3+ddb" => {
735 if url.query_pairs().count() != 1 {
736 return Err(Error::InvalidInput {
737 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
738 .into(),
739 location: location!(),
740 });
741 }
742 let table_name = match url.query_pairs().next() {
743 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
744 if key == DDB_URL_QUERY_KEY =>
745 {
746 if table_name.is_empty() {
747 return Err(Error::InvalidInput {
748 source: "`s3+ddb://` scheme requires non empty dynamodb table name"
749 .into(),
750 location: location!(),
751 });
752 }
753 table_name
754 }
755 _ => {
756 return Err(Error::InvalidInput {
757 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
758 .into(),
759 location: location!(),
760 });
761 }
762 };
763 let options = options.clone().unwrap_or_default();
764 let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
765 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
766 let expires_at_millis = storage_options.expires_at_millis();
767 let storage_options = storage_options.as_s3_options();
768
769 let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
770
771 let (aws_creds, region) = build_aws_credential(
772 options.s3_credentials_refresh_offset,
773 options.aws_credentials.clone(),
774 Some(&storage_options),
775 region,
776 options.storage_options_provider.clone(),
777 expires_at_millis,
778 )
779 .await?;
780
781 Ok(Arc::new(ExternalManifestCommitHandler {
782 external_manifest_store: build_dynamodb_external_store(
783 table_name,
784 aws_creds.clone(),
785 ®ion,
786 dynamo_endpoint,
787 "lancedb",
788 )
789 .await?,
790 }))
791 }
792 _ => Ok(Arc::new(UnsafeCommitHandler)),
793 }
794}
795
796#[cfg(feature = "dynamodb")]
797fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
798 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
799 Some(endpoint.clone())
800 } else {
801 std::env::var("DYNAMODB_ENDPOINT").ok()
802 }
803}
804
805#[derive(Debug)]
807pub enum CommitError {
808 CommitConflict,
810 OtherError(Error),
812}
813
814impl From<Error> for CommitError {
815 fn from(e: Error) -> Self {
816 Self::OtherError(e)
817 }
818}
819
820impl From<CommitError> for Error {
821 fn from(e: CommitError) -> Self {
822 match e {
823 CommitError::CommitConflict => Self::Internal {
824 message: "Commit conflict".to_string(),
825 location: location!(),
826 },
827 CommitError::OtherError(e) => e,
828 }
829 }
830}
831
832static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
834
835pub struct UnsafeCommitHandler;
839
840#[async_trait::async_trait]
841#[allow(clippy::too_many_arguments)]
842impl CommitHandler for UnsafeCommitHandler {
843 async fn commit(
844 &self,
845 manifest: &mut Manifest,
846 indices: Option<Vec<IndexMetadata>>,
847 base_path: &Path,
848 object_store: &ObjectStore,
849 manifest_writer: ManifestWriter,
850 naming_scheme: ManifestNamingScheme,
851 transaction: Option<Transaction>,
852 ) -> std::result::Result<ManifestLocation, CommitError> {
853 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
855 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
856 log::warn!(
857 "Using unsafe commit handler. Concurrent writes may result in data loss. \
858 Consider providing a commit handler that prevents conflicting writes."
859 );
860 }
861
862 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
863 let res =
864 manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
865
866 Ok(ManifestLocation {
867 version: manifest.version,
868 size: Some(res.size as u64),
869 naming_scheme,
870 path: version_path,
871 e_tag: res.e_tag,
872 })
873 }
874}
875
876impl Debug for UnsafeCommitHandler {
877 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
878 f.debug_struct("UnsafeCommitHandler").finish()
879 }
880}
881
882#[async_trait::async_trait]
884pub trait CommitLock: Debug {
885 type Lease: CommitLease;
886
887 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
900}
901
902#[async_trait::async_trait]
903pub trait CommitLease: Send + Sync {
904 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
906}
907
908#[async_trait::async_trait]
909impl<T: CommitLock + Send + Sync> CommitHandler for T {
910 async fn commit(
911 &self,
912 manifest: &mut Manifest,
913 indices: Option<Vec<IndexMetadata>>,
914 base_path: &Path,
915 object_store: &ObjectStore,
916 manifest_writer: ManifestWriter,
917 naming_scheme: ManifestNamingScheme,
918 transaction: Option<Transaction>,
919 ) -> std::result::Result<ManifestLocation, CommitError> {
920 let path = naming_scheme.manifest_path(base_path, manifest.version);
921 let lease = self.lock(manifest.version).await?;
924
925 match object_store.inner.head(&path).await {
927 Ok(_) => {
928 lease.release(false).await?;
931
932 return Err(CommitError::CommitConflict);
933 }
934 Err(ObjectStoreError::NotFound { .. }) => {}
935 Err(e) => {
936 lease.release(false).await?;
939
940 return Err(CommitError::OtherError(e.into()));
941 }
942 }
943 let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
944
945 lease.release(res.is_ok()).await?;
947
948 let res = res?;
949 Ok(ManifestLocation {
950 version: manifest.version,
951 size: Some(res.size as u64),
952 naming_scheme,
953 path,
954 e_tag: res.e_tag,
955 })
956 }
957}
958
959#[async_trait::async_trait]
960impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
961 async fn commit(
962 &self,
963 manifest: &mut Manifest,
964 indices: Option<Vec<IndexMetadata>>,
965 base_path: &Path,
966 object_store: &ObjectStore,
967 manifest_writer: ManifestWriter,
968 naming_scheme: ManifestNamingScheme,
969 transaction: Option<Transaction>,
970 ) -> std::result::Result<ManifestLocation, CommitError> {
971 self.as_ref()
972 .commit(
973 manifest,
974 indices,
975 base_path,
976 object_store,
977 manifest_writer,
978 naming_scheme,
979 transaction,
980 )
981 .await
982 }
983}
984
985pub struct RenameCommitHandler;
989
990#[async_trait::async_trait]
991impl CommitHandler for RenameCommitHandler {
992 async fn commit(
993 &self,
994 manifest: &mut Manifest,
995 indices: Option<Vec<IndexMetadata>>,
996 base_path: &Path,
997 object_store: &ObjectStore,
998 manifest_writer: ManifestWriter,
999 naming_scheme: ManifestNamingScheme,
1000 transaction: Option<Transaction>,
1001 ) -> std::result::Result<ManifestLocation, CommitError> {
1002 let path = naming_scheme.manifest_path(base_path, manifest.version);
1006 let tmp_path = make_staging_manifest_path(&path)?;
1007
1008 let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1009
1010 match object_store
1011 .inner
1012 .rename_if_not_exists(&tmp_path, &path)
1013 .await
1014 {
1015 Ok(_) => {
1016 Ok(ManifestLocation {
1018 version: manifest.version,
1019 path,
1020 size: Some(res.size as u64),
1021 naming_scheme,
1022 e_tag: None, })
1024 }
1025 Err(ObjectStoreError::AlreadyExists { .. }) => {
1026 let _ = object_store.delete(&tmp_path).await;
1029
1030 return Err(CommitError::CommitConflict);
1031 }
1032 Err(e) => {
1033 return Err(CommitError::OtherError(e.into()));
1035 }
1036 }
1037 }
1038}
1039
1040impl Debug for RenameCommitHandler {
1041 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1042 f.debug_struct("RenameCommitHandler").finish()
1043 }
1044}
1045
1046pub struct ConditionalPutCommitHandler;
1047
1048#[async_trait::async_trait]
1049impl CommitHandler for ConditionalPutCommitHandler {
1050 async fn commit(
1051 &self,
1052 manifest: &mut Manifest,
1053 indices: Option<Vec<IndexMetadata>>,
1054 base_path: &Path,
1055 object_store: &ObjectStore,
1056 manifest_writer: ManifestWriter,
1057 naming_scheme: ManifestNamingScheme,
1058 transaction: Option<Transaction>,
1059 ) -> std::result::Result<ManifestLocation, CommitError> {
1060 let path = naming_scheme.manifest_path(base_path, manifest.version);
1061
1062 let memory_store = ObjectStore::memory();
1063 let dummy_path = "dummy";
1064 manifest_writer(
1065 &memory_store,
1066 manifest,
1067 indices,
1068 &dummy_path.into(),
1069 transaction,
1070 )
1071 .await?;
1072 let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1073 let size = dummy_data.len() as u64;
1074 let res = object_store
1075 .inner
1076 .put_opts(
1077 &path,
1078 dummy_data.into(),
1079 PutOptions {
1080 mode: object_store::PutMode::Create,
1081 ..Default::default()
1082 },
1083 )
1084 .await
1085 .map_err(|err| match err {
1086 ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1087 CommitError::CommitConflict
1088 }
1089 _ => CommitError::OtherError(err.into()),
1090 })?;
1091
1092 Ok(ManifestLocation {
1093 version: manifest.version,
1094 path,
1095 size: Some(size),
1096 naming_scheme,
1097 e_tag: res.e_tag,
1098 })
1099 }
1100}
1101
1102impl Debug for ConditionalPutCommitHandler {
1103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104 f.debug_struct("ConditionalPutCommitHandler").finish()
1105 }
1106}
1107
1108#[derive(Debug, Clone)]
1109pub struct CommitConfig {
1110 pub num_retries: u32,
1111 pub skip_auto_cleanup: bool,
1112 }
1114
1115impl Default for CommitConfig {
1116 fn default() -> Self {
1117 Self {
1118 num_retries: 20,
1119 skip_auto_cleanup: false,
1120 }
1121 }
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126 use lance_core::utils::tempfile::TempObjDir;
1127
1128 use super::*;
1129
1130 #[test]
1131 fn test_manifest_naming_scheme() {
1132 let v1 = ManifestNamingScheme::V1;
1133 let v2 = ManifestNamingScheme::V2;
1134
1135 assert_eq!(
1136 v1.manifest_path(&Path::from("base"), 0),
1137 Path::from("base/_versions/0.manifest")
1138 );
1139 assert_eq!(
1140 v1.manifest_path(&Path::from("base"), 42),
1141 Path::from("base/_versions/42.manifest")
1142 );
1143
1144 assert_eq!(
1145 v2.manifest_path(&Path::from("base"), 0),
1146 Path::from("base/_versions/18446744073709551615.manifest")
1147 );
1148 assert_eq!(
1149 v2.manifest_path(&Path::from("base"), 42),
1150 Path::from("base/_versions/18446744073709551573.manifest")
1151 );
1152
1153 assert_eq!(v1.parse_version("0.manifest"), Some(0));
1154 assert_eq!(v1.parse_version("42.manifest"), Some(42));
1155 assert_eq!(
1156 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1157 Some(42)
1158 );
1159
1160 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1161 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1162 assert_eq!(
1163 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1164 Some(42)
1165 );
1166
1167 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1168 assert_eq!(
1169 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1170 Some(v2)
1171 );
1172 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1173 }
1174
1175 #[tokio::test]
1176 async fn test_manifest_naming_migration() {
1177 let object_store = ObjectStore::memory();
1178 let base = Path::from("base");
1179 let versions_dir = base.child(VERSIONS_DIR);
1180
1181 let original_files = vec![
1183 versions_dir.child("irrelevant"),
1184 ManifestNamingScheme::V1.manifest_path(&base, 0),
1185 ManifestNamingScheme::V2.manifest_path(&base, 1),
1186 ];
1187 for path in original_files {
1188 object_store.put(&path, b"".as_slice()).await.unwrap();
1189 }
1190
1191 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1192
1193 let expected_files = vec![
1194 ManifestNamingScheme::V2.manifest_path(&base, 1),
1195 ManifestNamingScheme::V2.manifest_path(&base, 0),
1196 versions_dir.child("irrelevant"),
1197 ];
1198 let actual_files = object_store
1199 .inner
1200 .list(Some(&versions_dir))
1201 .map_ok(|res| res.location)
1202 .try_collect::<Vec<_>>()
1203 .await
1204 .unwrap();
1205 assert_eq!(actual_files, expected_files);
1206 }
1207
1208 #[tokio::test]
1209 #[rstest::rstest]
1210 async fn test_list_manifests_sorted(
1211 #[values(true, false)] lexical_list_store: bool,
1212 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1213 naming_scheme: ManifestNamingScheme,
1214 ) {
1215 let tempdir;
1216 let (object_store, base) = if lexical_list_store {
1217 (Box::new(ObjectStore::memory()), Path::from("base"))
1218 } else {
1219 tempdir = TempObjDir::default();
1220 let path = tempdir.child("base");
1221 let store = Box::new(ObjectStore::local());
1222 assert!(!store.list_is_lexically_ordered);
1223 (store, path)
1224 };
1225
1226 let mut expected_paths = Vec::new();
1228 for i in (0..12).rev() {
1229 let path = naming_scheme.manifest_path(&base, i);
1230 object_store.put(&path, b"".as_slice()).await.unwrap();
1231 expected_paths.push(path);
1232 }
1233
1234 let actual_versions = ConditionalPutCommitHandler
1235 .list_manifest_locations(&base, &object_store, true)
1236 .map_ok(|location| location.path)
1237 .try_collect::<Vec<_>>()
1238 .await
1239 .unwrap();
1240
1241 assert_eq!(actual_versions, expected_paths);
1242 }
1243}