1use std::io;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::AtomicBool;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::Stream;
33use futures::future::Either;
34use futures::{
35 StreamExt, TryStreamExt,
36 future::{self, BoxFuture},
37 stream::BoxStream,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult, get_etag};
41use log::warn;
42use object_store::PutOptions;
43use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
44use tracing::info;
45use url::Url;
46
47#[cfg(feature = "dynamodb")]
48pub mod dynamodb;
49pub mod external_manifest;
50
51use lance_core::{Error, Result};
52use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
53use lance_io::traits::{WriteExt, Writer};
54
55use crate::format::{IndexMetadata, Manifest, Transaction, is_detached_version};
56use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
57#[cfg(feature = "dynamodb")]
58use {
59 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
60 aws_credential_types::provider::ProvideCredentials,
61 aws_credential_types::provider::error::CredentialsError,
62 lance_io::object_store::{StorageOptions, providers::aws::build_aws_credential},
63 object_store::aws::AmazonS3ConfigKey,
64 object_store::aws::AwsCredentialProvider,
65 std::borrow::Cow,
66 std::time::{Duration, SystemTime},
67};
68
69pub const VERSIONS_DIR: &str = "_versions";
70const MANIFEST_EXTENSION: &str = "manifest";
71const DETACHED_VERSION_PREFIX: &str = "d";
72
73#[derive(Clone, Copy, Debug, PartialEq, Eq)]
75pub enum ManifestNamingScheme {
76 V1,
78 V2,
82}
83
84impl ManifestNamingScheme {
85 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
86 let directory = base.child(VERSIONS_DIR);
87 if is_detached_version(version) {
88 let directory = base.child(VERSIONS_DIR);
93 directory.child(format!(
94 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
95 ))
96 } else {
97 match self {
98 Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
99 Self::V2 => {
100 let inverted_version = u64::MAX - version;
101 directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
102 }
103 }
104 }
105 }
106
107 pub fn parse_version(&self, filename: &str) -> Option<u64> {
108 let file_number = filename
109 .split_once('.')
110 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
112 match self {
113 Self::V1 => file_number,
114 Self::V2 => file_number.map(|v| u64::MAX - v),
115 }
116 }
117
118 pub fn detect_scheme(filename: &str) -> Option<Self> {
119 if filename.starts_with(DETACHED_VERSION_PREFIX) {
120 return Some(Self::V2);
122 }
123 if filename.ends_with(MANIFEST_EXTENSION) {
124 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
125 if filename.len() == V2_LEN {
126 Some(Self::V2)
127 } else {
128 Some(Self::V1)
129 }
130 } else {
131 None
132 }
133 }
134
135 pub fn detect_scheme_staging(filename: &str) -> Self {
136 if filename.chars().nth(20) == Some('.') {
139 Self::V2
140 } else {
141 Self::V1
142 }
143 }
144}
145
146pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
156 object_store
157 .inner
158 .list(Some(&dataset_base.child(VERSIONS_DIR)))
159 .try_filter(|res| {
160 let res = if let Some(filename) = res.location.filename() {
161 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
162 } else {
163 false
164 };
165 future::ready(res)
166 })
167 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
168 let filename = meta.location.filename().unwrap();
169 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
170 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
171 object_store.inner.rename(&meta.location, &path).await?;
172 Ok(())
173 })
174 .await?;
175
176 Ok(())
177}
178
179pub type ManifestWriter = for<'a> fn(
183 object_store: &'a ObjectStore,
184 manifest: &'a mut Manifest,
185 indices: Option<Vec<IndexMetadata>>,
186 path: &'a Path,
187 transaction: Option<Transaction>,
188) -> BoxFuture<'a, Result<WriteResult>>;
189
190pub fn write_manifest_file_to_path<'a>(
194 object_store: &'a ObjectStore,
195 manifest: &'a mut Manifest,
196 indices: Option<Vec<IndexMetadata>>,
197 path: &'a Path,
198 transaction: Option<Transaction>,
199) -> BoxFuture<'a, Result<WriteResult>> {
200 Box::pin(async move {
201 let mut object_writer = ObjectWriter::new(object_store, path).await?;
202 let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
203 object_writer
204 .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
205 .await?;
206 let res = Writer::shutdown(&mut object_writer).await?;
207 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
208 Ok(res)
209 })
210}
211
212#[derive(Debug, Clone)]
213pub struct ManifestLocation {
214 pub version: u64,
216 pub path: Path,
218 pub size: Option<u64>,
220 pub naming_scheme: ManifestNamingScheme,
222 pub e_tag: Option<String>,
226}
227
228impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
229 type Error = Error;
230
231 fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
232 let filename = meta.location.filename().ok_or_else(|| {
233 Error::internal("ObjectMeta location does not have a filename".to_string())
234 })?;
235 let scheme = ManifestNamingScheme::detect_scheme(filename)
236 .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
237 let version = scheme
238 .parse_version(filename)
239 .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
240 Ok(Self {
241 version,
242 path: meta.location,
243 size: Some(meta.size),
244 naming_scheme: scheme,
245 e_tag: meta.e_tag,
246 })
247 }
248}
249
250async fn current_manifest_path(
252 object_store: &ObjectStore,
253 base: &Path,
254) -> Result<ManifestLocation> {
255 if object_store.is_local()
256 && let Ok(Some(location)) = current_manifest_local(base)
257 {
258 return Ok(location);
259 }
260
261 let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
262
263 let mut valid_manifests = manifest_files.try_filter_map(|res| {
264 if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
265 {
266 future::ready(Ok(Some((scheme, res))))
267 } else {
268 future::ready(Ok(None))
269 }
270 });
271
272 let first = valid_manifests.next().await.transpose()?;
273 match (first, object_store.list_is_lexically_ordered) {
274 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
277 let version = scheme
278 .parse_version(meta.location.filename().unwrap())
279 .unwrap();
280
281 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
285 if scheme != ManifestNamingScheme::V2 {
286 warn!(
287 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
288 to migrate the directory."
289 );
290 break;
291 }
292 let next_version = scheme
293 .parse_version(meta.location.filename().unwrap())
294 .unwrap();
295 if next_version >= version {
296 warn!(
297 "List operation was expected to be lexically ordered, but was not. This \
298 could mean a corrupt read. Please make a bug report on the lance-format/lance \
299 GitHub repository."
300 );
301 break;
302 }
303 }
304
305 Ok(ManifestLocation {
306 version,
307 path: meta.location,
308 size: Some(meta.size),
309 naming_scheme: scheme,
310 e_tag: meta.e_tag,
311 })
312 }
313 (Some((first_scheme, meta)), _) => {
316 let mut current_version = first_scheme
317 .parse_version(meta.location.filename().unwrap())
318 .unwrap();
319 let mut current_meta = meta;
320 let scheme = first_scheme;
321
322 while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
323 if entry_scheme != scheme {
324 return Err(Error::internal(format!(
325 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
326 Use `migrate_manifest_paths_v2` to migrate the directory.",
327 scheme, entry_scheme
328 )));
329 }
330 let version = entry_scheme
331 .parse_version(meta.location.filename().unwrap())
332 .unwrap();
333 if version > current_version {
334 current_version = version;
335 current_meta = meta;
336 }
337 }
338 Ok(ManifestLocation {
339 version: current_version,
340 path: current_meta.location,
341 size: Some(current_meta.size),
342 naming_scheme: scheme,
343 e_tag: current_meta.e_tag,
344 })
345 }
346 (None, _) => Err(Error::not_found(base.child(VERSIONS_DIR).to_string())),
347 }
348}
349
350fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
354 let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
355 let entries = std::fs::read_dir(path)?;
356
357 let mut latest_entry: Option<(u64, DirEntry)> = None;
358
359 let mut scheme: Option<ManifestNamingScheme> = None;
360
361 for entry in entries {
362 let entry = entry?;
363 let filename_raw = entry.file_name();
364 let filename = filename_raw.to_string_lossy();
365
366 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
367 continue;
370 };
371
372 if let Some(scheme) = scheme {
373 if scheme != entry_scheme {
374 return Err(io::Error::new(
375 io::ErrorKind::InvalidData,
376 format!(
377 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
378 scheme, entry_scheme
379 ),
380 ));
381 }
382 } else {
383 scheme = Some(entry_scheme);
384 }
385
386 let Some(version) = entry_scheme.parse_version(&filename) else {
387 continue;
388 };
389
390 if let Some((latest_version, _)) = &latest_entry {
391 if version > *latest_version {
392 latest_entry = Some((version, entry));
393 }
394 } else {
395 latest_entry = Some((version, entry));
396 }
397 }
398
399 if let Some((version, entry)) = latest_entry {
400 let path = Path::from_filesystem_path(entry.path())
401 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
402 let metadata = entry.metadata()?;
403 Ok(Some(ManifestLocation {
404 version,
405 path,
406 size: Some(metadata.len()),
407 naming_scheme: scheme.unwrap(),
408 e_tag: Some(get_etag(&metadata)),
409 }))
410 } else {
411 Ok(None)
412 }
413}
414
415fn list_manifests<'a>(
416 base_path: &Path,
417 object_store: &'a dyn OSObjectStore,
418) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
419 object_store
420 .read_dir_all(&base_path.child(VERSIONS_DIR), None)
421 .filter_map(|obj_meta| {
422 futures::future::ready(
423 obj_meta
424 .map(|m| ManifestLocation::try_from(m).ok())
425 .transpose(),
426 )
427 })
428 .boxed()
429}
430
431fn make_staging_manifest_path(base: &Path) -> Result<Path> {
432 let id = uuid::Uuid::new_v4().to_string();
433 Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e)))
434}
435
436#[cfg(feature = "dynamodb")]
437const DDB_URL_QUERY_KEY: &str = "ddbTableName";
438
439#[async_trait::async_trait]
448#[allow(clippy::too_many_arguments)]
449pub trait CommitHandler: Debug + Send + Sync {
450 async fn resolve_latest_location(
451 &self,
452 base_path: &Path,
453 object_store: &ObjectStore,
454 ) -> Result<ManifestLocation> {
455 Ok(current_manifest_path(object_store, base_path).await?)
456 }
457
458 async fn resolve_version_location(
459 &self,
460 base_path: &Path,
461 version: u64,
462 object_store: &dyn OSObjectStore,
463 ) -> Result<ManifestLocation> {
464 default_resolve_version(base_path, version, object_store).await
465 }
466
467 fn list_manifest_locations<'a>(
474 &self,
475 base_path: &Path,
476 object_store: &'a ObjectStore,
477 sorted_descending: bool,
478 ) -> BoxStream<'a, Result<ManifestLocation>> {
479 let underlying_stream = list_manifests(base_path, &object_store.inner);
480
481 if !sorted_descending {
482 return underlying_stream.boxed();
483 }
484
485 async fn sort_stream(
486 input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
487 ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
488 let mut locations = input_stream.try_collect::<Vec<_>>().await?;
489 locations.sort_by_key(|m| std::cmp::Reverse(m.version));
490 Ok(futures::stream::iter(locations.into_iter().map(Ok)))
491 }
492
493 if object_store.list_is_lexically_ordered {
496 let mut peekable = underlying_stream.peekable();
498
499 futures::stream::once(async move {
500 let naming_scheme = match Pin::new(&mut peekable).peek().await {
501 Some(Ok(m)) => m.naming_scheme,
502 Some(Err(_)) => ManifestNamingScheme::V2,
505 None => ManifestNamingScheme::V2,
506 };
507
508 if naming_scheme == ManifestNamingScheme::V2 {
509 Ok(Either::Left(peekable))
511 } else {
512 sort_stream(peekable).await.map(Either::Right)
513 }
514 })
515 .try_flatten()
516 .boxed()
517 } else {
518 futures::stream::once(sort_stream(underlying_stream))
523 .try_flatten()
524 .boxed()
525 }
526 }
527
528 async fn commit(
533 &self,
534 manifest: &mut Manifest,
535 indices: Option<Vec<IndexMetadata>>,
536 base_path: &Path,
537 object_store: &ObjectStore,
538 manifest_writer: ManifestWriter,
539 naming_scheme: ManifestNamingScheme,
540 transaction: Option<Transaction>,
541 ) -> std::result::Result<ManifestLocation, CommitError>;
542
543 async fn delete(&self, _base_path: &Path) -> Result<()> {
545 Ok(())
546 }
547}
548
549async fn default_resolve_version(
550 base_path: &Path,
551 version: u64,
552 object_store: &dyn OSObjectStore,
553) -> Result<ManifestLocation> {
554 if is_detached_version(version) {
555 return Ok(ManifestLocation {
556 version,
557 naming_scheme: ManifestNamingScheme::V2,
560 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
562 size: None,
563 e_tag: None,
564 });
565 }
566
567 let scheme = ManifestNamingScheme::V2;
569 let path = scheme.manifest_path(base_path, version);
570 match object_store.head(&path).await {
571 Ok(meta) => Ok(ManifestLocation {
572 version,
573 path,
574 size: Some(meta.size),
575 naming_scheme: scheme,
576 e_tag: meta.e_tag,
577 }),
578 Err(ObjectStoreError::NotFound { .. }) => {
579 let scheme = ManifestNamingScheme::V1;
581 Ok(ManifestLocation {
582 version,
583 path: scheme.manifest_path(base_path, version),
584 size: None,
585 naming_scheme: scheme,
586 e_tag: None,
587 })
588 }
589 Err(e) => Err(e.into()),
590 }
591}
592#[cfg(feature = "dynamodb")]
594#[derive(Debug)]
595struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
596
597#[cfg(feature = "dynamodb")]
598impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
599 fn provide_credentials<'a>(
600 &'a self,
601 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
602 where
603 Self: 'a,
604 {
605 aws_credential_types::provider::future::ProvideCredentials::new(async {
606 let creds = self
607 .0
608 .get_credential()
609 .await
610 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
611 Ok(aws_credential_types::Credentials::new(
612 &creds.key_id,
613 &creds.secret_key,
614 creds.token.clone(),
615 Some(
616 SystemTime::now()
617 .checked_add(Duration::from_secs(
618 60 * 10, ))
620 .expect("overflow"),
621 ),
622 "",
623 ))
624 })
625 }
626}
627
628#[cfg(feature = "dynamodb")]
629async fn build_dynamodb_external_store(
630 table_name: &str,
631 creds: AwsCredentialProvider,
632 region: &str,
633 endpoint: Option<String>,
634 app_name: &str,
635) -> Result<Arc<dyn ExternalManifestStore>> {
636 use super::commit::dynamodb::DynamoDBExternalManifestStore;
637 use aws_sdk_dynamodb::{
638 Client,
639 config::{IdentityCache, Region, retry::RetryConfig},
640 };
641
642 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
643 .behavior_version_latest()
644 .region(Some(Region::new(region.to_string())))
645 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
646 .identity_cache(IdentityCache::no_cache())
648 .retry_config(RetryConfig::standard().with_max_attempts(5));
651
652 if let Some(endpoint) = endpoint {
653 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
654 }
655 let client = Client::from_conf(dynamodb_config.build());
656
657 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
658}
659
660pub async fn commit_handler_from_url(
661 url_or_path: &str,
662 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
664) -> Result<Arc<dyn CommitHandler>> {
665 let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
666 Arc::new(RenameCommitHandler)
667 } else {
668 Arc::new(ConditionalPutCommitHandler)
669 };
670
671 let url = match Url::parse(url_or_path) {
672 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
673 return Ok(local_handler);
675 }
676 Ok(url) => url,
677 Err(_) => {
678 return Ok(local_handler);
679 }
680 };
681
682 match url.scheme() {
683 "file" | "file-object-store" => Ok(local_handler),
684 "s3" | "gs" | "az" | "memory" | "oss" | "cos" => Ok(Arc::new(ConditionalPutCommitHandler)),
685 #[cfg(not(feature = "dynamodb"))]
686 "s3+ddb" => Err(Error::invalid_input_source(
687 "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
688 )),
689 #[cfg(feature = "dynamodb")]
690 "s3+ddb" => {
691 if url.query_pairs().count() != 1 {
692 return Err(Error::invalid_input_source(
693 "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
694 ));
695 }
696 let table_name = match url.query_pairs().next() {
697 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
698 if key == DDB_URL_QUERY_KEY =>
699 {
700 if table_name.is_empty() {
701 return Err(Error::invalid_input_source(
702 "`s3+ddb://` scheme requires non empty dynamodb table name".into(),
703 ));
704 }
705 table_name
706 }
707 _ => {
708 return Err(Error::invalid_input_source(
709 "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
710 ));
711 }
712 };
713 let options = options.clone().unwrap_or_default();
714 let storage_options_raw =
715 StorageOptions(options.storage_options().cloned().unwrap_or_default());
716 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options_raw);
717 let storage_options = storage_options_raw.as_s3_options();
718
719 let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
720
721 let accessor = options.get_accessor();
723
724 let (aws_creds, region) = build_aws_credential(
725 options.s3_credentials_refresh_offset,
726 options.aws_credentials.clone(),
727 Some(&storage_options),
728 region,
729 accessor,
730 )
731 .await?;
732
733 Ok(Arc::new(ExternalManifestCommitHandler {
734 external_manifest_store: build_dynamodb_external_store(
735 table_name,
736 aws_creds.clone(),
737 ®ion,
738 dynamo_endpoint,
739 "lancedb",
740 )
741 .await?,
742 }))
743 }
744 _ => Ok(Arc::new(UnsafeCommitHandler)),
745 }
746}
747
748#[cfg(feature = "dynamodb")]
749fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
750 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
751 Some(endpoint.clone())
752 } else {
753 std::env::var("DYNAMODB_ENDPOINT").ok()
754 }
755}
756
757#[derive(Debug)]
759pub enum CommitError {
760 CommitConflict,
762 OtherError(Error),
764}
765
766impl From<Error> for CommitError {
767 fn from(e: Error) -> Self {
768 Self::OtherError(e)
769 }
770}
771
772impl From<CommitError> for Error {
773 fn from(e: CommitError) -> Self {
774 match e {
775 CommitError::CommitConflict => Self::internal("Commit conflict".to_string()),
776 CommitError::OtherError(e) => e,
777 }
778 }
779}
780
781static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
783
784pub struct UnsafeCommitHandler;
788
789#[async_trait::async_trait]
790#[allow(clippy::too_many_arguments)]
791impl CommitHandler for UnsafeCommitHandler {
792 async fn commit(
793 &self,
794 manifest: &mut Manifest,
795 indices: Option<Vec<IndexMetadata>>,
796 base_path: &Path,
797 object_store: &ObjectStore,
798 manifest_writer: ManifestWriter,
799 naming_scheme: ManifestNamingScheme,
800 transaction: Option<Transaction>,
801 ) -> std::result::Result<ManifestLocation, CommitError> {
802 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
804 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
805 log::warn!(
806 "Using unsafe commit handler. Concurrent writes may result in data loss. \
807 Consider providing a commit handler that prevents conflicting writes."
808 );
809 }
810
811 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
812 let res =
813 manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
814
815 Ok(ManifestLocation {
816 version: manifest.version,
817 size: Some(res.size as u64),
818 naming_scheme,
819 path: version_path,
820 e_tag: res.e_tag,
821 })
822 }
823}
824
825impl Debug for UnsafeCommitHandler {
826 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
827 f.debug_struct("UnsafeCommitHandler").finish()
828 }
829}
830
831#[async_trait::async_trait]
833pub trait CommitLock: Debug {
834 type Lease: CommitLease;
835
836 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
849}
850
851#[async_trait::async_trait]
852pub trait CommitLease: Send + Sync {
853 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
855}
856
857#[async_trait::async_trait]
858impl<T: CommitLock + Send + Sync> CommitHandler for T {
859 async fn commit(
860 &self,
861 manifest: &mut Manifest,
862 indices: Option<Vec<IndexMetadata>>,
863 base_path: &Path,
864 object_store: &ObjectStore,
865 manifest_writer: ManifestWriter,
866 naming_scheme: ManifestNamingScheme,
867 transaction: Option<Transaction>,
868 ) -> std::result::Result<ManifestLocation, CommitError> {
869 let path = naming_scheme.manifest_path(base_path, manifest.version);
870 let lease = self.lock(manifest.version).await?;
873
874 match object_store.inner.head(&path).await {
876 Ok(_) => {
877 lease.release(false).await?;
880
881 return Err(CommitError::CommitConflict);
882 }
883 Err(ObjectStoreError::NotFound { .. }) => {}
884 Err(e) => {
885 lease.release(false).await?;
888
889 return Err(CommitError::OtherError(e.into()));
890 }
891 }
892 let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
893
894 lease.release(res.is_ok()).await?;
896
897 let res = res?;
898 Ok(ManifestLocation {
899 version: manifest.version,
900 size: Some(res.size as u64),
901 naming_scheme,
902 path,
903 e_tag: res.e_tag,
904 })
905 }
906}
907
908#[async_trait::async_trait]
909impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
910 async fn commit(
911 &self,
912 manifest: &mut Manifest,
913 indices: Option<Vec<IndexMetadata>>,
914 base_path: &Path,
915 object_store: &ObjectStore,
916 manifest_writer: ManifestWriter,
917 naming_scheme: ManifestNamingScheme,
918 transaction: Option<Transaction>,
919 ) -> std::result::Result<ManifestLocation, CommitError> {
920 self.as_ref()
921 .commit(
922 manifest,
923 indices,
924 base_path,
925 object_store,
926 manifest_writer,
927 naming_scheme,
928 transaction,
929 )
930 .await
931 }
932}
933
934pub struct RenameCommitHandler;
938
939#[async_trait::async_trait]
940impl CommitHandler for RenameCommitHandler {
941 async fn commit(
942 &self,
943 manifest: &mut Manifest,
944 indices: Option<Vec<IndexMetadata>>,
945 base_path: &Path,
946 object_store: &ObjectStore,
947 manifest_writer: ManifestWriter,
948 naming_scheme: ManifestNamingScheme,
949 transaction: Option<Transaction>,
950 ) -> std::result::Result<ManifestLocation, CommitError> {
951 let path = naming_scheme.manifest_path(base_path, manifest.version);
955 let tmp_path = make_staging_manifest_path(&path)?;
956
957 let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
958
959 match object_store
960 .inner
961 .rename_if_not_exists(&tmp_path, &path)
962 .await
963 {
964 Ok(_) => {
965 Ok(ManifestLocation {
967 version: manifest.version,
968 path,
969 size: Some(res.size as u64),
970 naming_scheme,
971 e_tag: None, })
973 }
974 Err(ObjectStoreError::AlreadyExists { .. }) => {
975 let _ = object_store.delete(&tmp_path).await;
978
979 return Err(CommitError::CommitConflict);
980 }
981 Err(e) => {
982 return Err(CommitError::OtherError(e.into()));
984 }
985 }
986 }
987}
988
989impl Debug for RenameCommitHandler {
990 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
991 f.debug_struct("RenameCommitHandler").finish()
992 }
993}
994
995pub struct ConditionalPutCommitHandler;
996
997#[async_trait::async_trait]
998impl CommitHandler for ConditionalPutCommitHandler {
999 async fn commit(
1000 &self,
1001 manifest: &mut Manifest,
1002 indices: Option<Vec<IndexMetadata>>,
1003 base_path: &Path,
1004 object_store: &ObjectStore,
1005 manifest_writer: ManifestWriter,
1006 naming_scheme: ManifestNamingScheme,
1007 transaction: Option<Transaction>,
1008 ) -> std::result::Result<ManifestLocation, CommitError> {
1009 let path = naming_scheme.manifest_path(base_path, manifest.version);
1010
1011 let memory_store = ObjectStore::memory();
1012 let dummy_path = "dummy";
1013 manifest_writer(
1014 &memory_store,
1015 manifest,
1016 indices,
1017 &dummy_path.into(),
1018 transaction,
1019 )
1020 .await?;
1021 let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1022 let size = dummy_data.len() as u64;
1023 let res = object_store
1024 .inner
1025 .put_opts(
1026 &path,
1027 dummy_data.into(),
1028 PutOptions {
1029 mode: object_store::PutMode::Create,
1030 ..Default::default()
1031 },
1032 )
1033 .await
1034 .map_err(|err| match err {
1035 ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1036 CommitError::CommitConflict
1037 }
1038 _ => CommitError::OtherError(err.into()),
1039 })?;
1040
1041 Ok(ManifestLocation {
1042 version: manifest.version,
1043 path,
1044 size: Some(size),
1045 naming_scheme,
1046 e_tag: res.e_tag,
1047 })
1048 }
1049}
1050
1051impl Debug for ConditionalPutCommitHandler {
1052 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1053 f.debug_struct("ConditionalPutCommitHandler").finish()
1054 }
1055}
1056
1057#[derive(Debug, Clone)]
1058pub struct CommitConfig {
1059 pub num_retries: u32,
1060 pub skip_auto_cleanup: bool,
1061 }
1063
1064impl Default for CommitConfig {
1065 fn default() -> Self {
1066 Self {
1067 num_retries: 20,
1068 skip_auto_cleanup: false,
1069 }
1070 }
1071}
1072
1073#[cfg(test)]
1074mod tests {
1075 use lance_core::utils::tempfile::TempObjDir;
1076
1077 use super::*;
1078
1079 #[test]
1080 fn test_manifest_naming_scheme() {
1081 let v1 = ManifestNamingScheme::V1;
1082 let v2 = ManifestNamingScheme::V2;
1083
1084 assert_eq!(
1085 v1.manifest_path(&Path::from("base"), 0),
1086 Path::from("base/_versions/0.manifest")
1087 );
1088 assert_eq!(
1089 v1.manifest_path(&Path::from("base"), 42),
1090 Path::from("base/_versions/42.manifest")
1091 );
1092
1093 assert_eq!(
1094 v2.manifest_path(&Path::from("base"), 0),
1095 Path::from("base/_versions/18446744073709551615.manifest")
1096 );
1097 assert_eq!(
1098 v2.manifest_path(&Path::from("base"), 42),
1099 Path::from("base/_versions/18446744073709551573.manifest")
1100 );
1101
1102 assert_eq!(v1.parse_version("0.manifest"), Some(0));
1103 assert_eq!(v1.parse_version("42.manifest"), Some(42));
1104 assert_eq!(
1105 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1106 Some(42)
1107 );
1108
1109 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1110 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1111 assert_eq!(
1112 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1113 Some(42)
1114 );
1115
1116 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1117 assert_eq!(
1118 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1119 Some(v2)
1120 );
1121 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1122 }
1123
1124 #[tokio::test]
1125 async fn test_manifest_naming_migration() {
1126 let object_store = ObjectStore::memory();
1127 let base = Path::from("base");
1128 let versions_dir = base.child(VERSIONS_DIR);
1129
1130 let original_files = vec![
1132 versions_dir.child("irrelevant"),
1133 ManifestNamingScheme::V1.manifest_path(&base, 0),
1134 ManifestNamingScheme::V2.manifest_path(&base, 1),
1135 ];
1136 for path in original_files {
1137 object_store.put(&path, b"".as_slice()).await.unwrap();
1138 }
1139
1140 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1141
1142 let expected_files = vec![
1143 ManifestNamingScheme::V2.manifest_path(&base, 1),
1144 ManifestNamingScheme::V2.manifest_path(&base, 0),
1145 versions_dir.child("irrelevant"),
1146 ];
1147 let actual_files = object_store
1148 .inner
1149 .list(Some(&versions_dir))
1150 .map_ok(|res| res.location)
1151 .try_collect::<Vec<_>>()
1152 .await
1153 .unwrap();
1154 assert_eq!(actual_files, expected_files);
1155 }
1156
1157 #[tokio::test]
1158 #[rstest::rstest]
1159 async fn test_list_manifests_sorted(
1160 #[values(true, false)] lexical_list_store: bool,
1161 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1162 naming_scheme: ManifestNamingScheme,
1163 ) {
1164 let tempdir;
1165 let (object_store, base) = if lexical_list_store {
1166 (Box::new(ObjectStore::memory()), Path::from("base"))
1167 } else {
1168 tempdir = TempObjDir::default();
1169 let path = tempdir.child("base");
1170 let store = Box::new(ObjectStore::local());
1171 assert!(!store.list_is_lexically_ordered);
1172 (store, path)
1173 };
1174
1175 let mut expected_paths = Vec::new();
1177 for i in (0..12).rev() {
1178 let path = naming_scheme.manifest_path(&base, i);
1179 object_store.put(&path, b"".as_slice()).await.unwrap();
1180 expected_paths.push(path);
1181 }
1182
1183 let actual_versions = ConditionalPutCommitHandler
1184 .list_manifest_locations(&base, &object_store, true)
1185 .map_ok(|location| location.path)
1186 .try_collect::<Vec<_>>()
1187 .await
1188 .unwrap();
1189
1190 assert_eq!(actual_versions, expected_paths);
1191 }
1192
1193 #[tokio::test]
1194 #[rstest::rstest]
1195 async fn test_current_manifest_path(
1196 #[values(true, false)] lexical_list_store: bool,
1197 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1198 naming_scheme: ManifestNamingScheme,
1199 ) {
1200 let mut object_store = ObjectStore::memory();
1203 object_store.list_is_lexically_ordered = lexical_list_store;
1204 let object_store = Box::new(object_store);
1205 let base = Path::from("base");
1206
1207 for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1209 let path = naming_scheme.manifest_path(&base, version);
1210 object_store.put(&path, b"".as_slice()).await.unwrap();
1211 }
1212
1213 let location = current_manifest_path(&object_store, &base).await.unwrap();
1214
1215 assert_eq!(location.version, 11);
1216 assert_eq!(location.naming_scheme, naming_scheme);
1217 assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1218 }
1219}